Tage Johansson
2023-Aug-10 11:24 UTC
[Libguestfs] [libnbd PATCH v7 9/9] rust: async: Add an example
This patch adds an example using the asynchronous Rust bindings. --- rust/Cargo.toml | 1 + rust/examples/concurrent-read-write.rs | 135 +++++++++++++++++++++++++ rust/run-tests.sh.in | 2 + 3 files changed, 138 insertions(+) create mode 100644 rust/examples/concurrent-read-write.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index d001248..4332783 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -54,5 +54,6 @@ default = ["log", "tokio"] anyhow = "1.0.72" once_cell = "1.18.0" pretty-hex = "0.3.0" +rand = { version = "0.8.5", default-features = false, features = ["small_rng", "min_const_gen"] } tempfile = "3.6.0" tokio = { version = "1.29.1", default-features = false, features = ["rt-multi-thread", "macros"] } diff --git a/rust/examples/concurrent-read-write.rs b/rust/examples/concurrent-read-write.rs new file mode 100644 index 0000000..a1c3e8a --- /dev/null +++ b/rust/examples/concurrent-read-write.rs @@ -0,0 +1,135 @@ +//! Example usage with nbdkit: +//! +//! nbdkit -U - memory 100M \ +//! --run 'cargo run --example concurrent-read-write -- $unixsocket' +//! +//! This will read and write randomly over the first megabyte of the +//! plugin using multi-conn, multiple threads and multiple requests in +//! flight on each thread. + +#![deny(warnings)] +use rand::prelude::*; +use std::env; +use std::path::PathBuf; +use std::sync::Arc; +use tokio::task::JoinSet; + +/// Number of simultaneous connections to the NBD server. +/// +/// Note that some servers only support a limited number of +/// simultaneous connections, and/or have a configurable thread pool +/// internally, and if you exceed those limits then something will break. +const NR_MULTI_CONN: usize = 8; + +/// Number of commands that can be "in flight" at the same time on each +/// connection. (Therefore the total number of requests in flight may +/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT). +const MAX_IN_FLIGHT: usize = 16; + +/// The size of large reads and writes, must be > 512. +const BUFFER_SIZE: usize = 1024; + +/// Number of commands we issue (per [task][tokio::task]). +const NR_CYCLES: usize = 32; + +/// Statistics gathered during the run. +#[derive(Debug, Default)] +struct Stats { + /// The total number of requests made. + requests: usize, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = env::args_os().collect::<Vec<_>>(); + if args.len() != 2 { + anyhow::bail!("Usage: {:?} socket", args[0]); + } + let socket = &args[1]; + + // We begin by making a connection to the server to get the export size + // and ensure that it supports multiple connections and is writable. + let nbd = libnbd::Handle::new()?; + nbd.connect_unix(&socket)?; + let export_size = nbd.get_size()?; + anyhow::ensure!( + (BUFFER_SIZE as u64) < export_size, + "export is {export_size}B, must be larger than {BUFFER_SIZE}B" + ); + anyhow::ensure!( + !nbd.is_read_only()?, + "error: this NBD export is read-only" + ); + anyhow::ensure!( + nbd.can_multi_conn()?, + "error: this NBD export does not support multi-conn" + ); + drop(nbd); // Close the connection. + + // Start the worker tasks, one per connection. + let mut tasks = JoinSet::new(); + for i in 0..NR_MULTI_CONN { + tasks.spawn(run_thread(i, socket.clone().into(), export_size)); + } + + // Wait for the tasks to complete. + let mut stats = Stats::default(); + while !tasks.is_empty() { + let this_stats = tasks.join_next().await.unwrap().unwrap()?; + stats.requests += this_stats.requests; + } + + // Make sure the number of requests that were required matches what + // we expect. + assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES); + + Ok(()) +} + +async fn run_thread( + task_idx: usize, + socket: PathBuf, + export_size: u64, +) -> anyhow::Result<Stats> { + // Start a new connection to the server. + // We shall spawn many commands concurrently on different tasks and those + // futures must be `'static`, hence we wrap the handle in an [Arc]. + let nbd = Arc::new(libnbd::AsyncHandle::new()?); + nbd.connect_unix(socket).await?; + + let mut rng = SmallRng::seed_from_u64(44 as u64); + + // Issue commands. + let mut stats = Stats::default(); + let mut join_set = JoinSet::new(); + //tokio::time::sleep(std::time::Duration::from_secs(1)).await; + while stats.requests < NR_CYCLES || !join_set.is_empty() { + while stats.requests < NR_CYCLES && join_set.len() < MAX_IN_FLIGHT { + // If we want to issue another request, do so. Note that we reuse + // the same buffer for multiple in-flight requests. It doesn't + // matter here because we're just trying to write random stuff, + // but that would be Very Bad in a real application. + // Simulate a mix of large and small requests. + let size = if rng.gen() { BUFFER_SIZE } else { 512 }; + let offset = rng.gen_range(0..export_size - size as u64); + + let mut buf = [0u8; BUFFER_SIZE]; + let nbd = nbd.clone(); + if rng.gen() { + join_set.spawn(async move { + nbd.pread(&mut buf, offset, None).await + }); + } else { + // Fill the buf with random data. + rng.fill(&mut buf); + join_set + .spawn(async move { nbd.pwrite(&buf, offset, None).await }); + } + stats.requests += 1; + } + join_set.join_next().await.unwrap().unwrap()?; + } + + if task_idx == 0 {} + Ok(stats) +} diff --git a/rust/run-tests.sh.in b/rust/run-tests.sh.in index 3ebf9a1..661c018 100755 --- a/rust/run-tests.sh.in +++ b/rust/run-tests.sh.in @@ -32,6 +32,8 @@ if [ -z "$VG" ]; then --run '@CARGO@ run --example get-size -- $unixsocket' @NBDKIT@ -U - floppy . \ --run '@CARGO@ run --example fetch-first-sector -- $unixsocket' + @NBDKIT@ -U - memory 10M \ + --run '@CARGO@ run --example concurrent-read-write -- $unixsocket' else @CARGO@ test --config "target.'cfg(all())'.runner = \"$VG\"" -- --nocapture fi -- 2.41.0
Richard W.M. Jones
2023-Aug-15 09:00 UTC
[Libguestfs] [libnbd PATCH v7 9/9] rust: async: Add an example
On Thu, Aug 10, 2023 at 11:24:36AM +0000, Tage Johansson wrote:> This patch adds an example using the asynchronous Rust bindings. > --- > rust/Cargo.toml | 1 + > rust/examples/concurrent-read-write.rs | 135 +++++++++++++++++++++++++ > rust/run-tests.sh.in | 2 + > 3 files changed, 138 insertions(+) > create mode 100644 rust/examples/concurrent-read-write.rs > > diff --git a/rust/Cargo.toml b/rust/Cargo.toml > index d001248..4332783 100644 > --- a/rust/Cargo.toml > +++ b/rust/Cargo.toml > @@ -54,5 +54,6 @@ default = ["log", "tokio"] > anyhow = "1.0.72" > once_cell = "1.18.0" > pretty-hex = "0.3.0" > +rand = { version = "0.8.5", default-features = false, features = ["small_rng", "min_const_gen"] } > tempfile = "3.6.0" > tokio = { version = "1.29.1", default-features = false, features = ["rt-multi-thread", "macros"] } > diff --git a/rust/examples/concurrent-read-write.rs b/rust/examples/concurrent-read-write.rs > new file mode 100644 > index 0000000..a1c3e8a > --- /dev/null > +++ b/rust/examples/concurrent-read-write.rs > @@ -0,0 +1,135 @@ > +//! Example usage with nbdkit: > +//! > +//! nbdkit -U - memory 100M \ > +//! --run 'cargo run --example concurrent-read-write -- $unixsocket'It would be nice to make the example use a URI (ie. nbd_connect_uri). Is that possible?> +//! This will read and write randomly over the first megabyte of the > +//! plugin using multi-conn, multiple threads and multiple requests in > +//! flight on each thread. > + > +#![deny(warnings)] > +use rand::prelude::*; > +use std::env; > +use std::path::PathBuf; > +use std::sync::Arc; > +use tokio::task::JoinSet; > + > +/// Number of simultaneous connections to the NBD server. > +/// > +/// Note that some servers only support a limited number of > +/// simultaneous connections, and/or have a configurable thread pool > +/// internally, and if you exceed those limits then something will break. > +const NR_MULTI_CONN: usize = 8; > + > +/// Number of commands that can be "in flight" at the same time on each > +/// connection. (Therefore the total number of requests in flight may > +/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT). > +const MAX_IN_FLIGHT: usize = 16; > + > +/// The size of large reads and writes, must be > 512. > +const BUFFER_SIZE: usize = 1024; > + > +/// Number of commands we issue (per [task][tokio::task]). > +const NR_CYCLES: usize = 32; > + > +/// Statistics gathered during the run. > +#[derive(Debug, Default)] > +struct Stats { > + /// The total number of requests made. > + requests: usize, > +} > + > +#[tokio::main] > +async fn main() -> anyhow::Result<()> { > + let args = env::args_os().collect::<Vec<_>>(); > + if args.len() != 2 { > + anyhow::bail!("Usage: {:?} socket", args[0]); > + } > + let socket = &args[1]; > + > + // We begin by making a connection to the server to get the export size > + // and ensure that it supports multiple connections and is writable. > + let nbd = libnbd::Handle::new()?; > + nbd.connect_unix(&socket)?;ie. here ^^^> + let export_size = nbd.get_size()?; > + anyhow::ensure!( > + (BUFFER_SIZE as u64) < export_size, > + "export is {export_size}B, must be larger than {BUFFER_SIZE}B" > + ); > + anyhow::ensure!( > + !nbd.is_read_only()?, > + "error: this NBD export is read-only" > + ); > + anyhow::ensure!( > + nbd.can_multi_conn()?, > + "error: this NBD export does not support multi-conn" > + ); > + drop(nbd); // Close the connection. > + > + // Start the worker tasks, one per connection. > + let mut tasks = JoinSet::new(); > + for i in 0..NR_MULTI_CONN { > + tasks.spawn(run_thread(i, socket.clone().into(), export_size)); > + } > + > + // Wait for the tasks to complete. > + let mut stats = Stats::default(); > + while !tasks.is_empty() { > + let this_stats = tasks.join_next().await.unwrap().unwrap()?; > + stats.requests += this_stats.requests; > + } > + > + // Make sure the number of requests that were required matches what > + // we expect. > + assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES); > + > + Ok(()) > +} > + > +async fn run_thread( > + task_idx: usize, > + socket: PathBuf, > + export_size: u64, > +) -> anyhow::Result<Stats> { > + // Start a new connection to the server. > + // We shall spawn many commands concurrently on different tasks and those > + // futures must be `'static`, hence we wrap the handle in an [Arc]. > + let nbd = Arc::new(libnbd::AsyncHandle::new()?); > + nbd.connect_unix(socket).await?; > + > + let mut rng = SmallRng::seed_from_u64(44 as u64); > + > + // Issue commands. > + let mut stats = Stats::default(); > + let mut join_set = JoinSet::new(); > + //tokio::time::sleep(std::time::Duration::from_secs(1)).await; > + while stats.requests < NR_CYCLES || !join_set.is_empty() { > + while stats.requests < NR_CYCLES && join_set.len() < MAX_IN_FLIGHT { > + // If we want to issue another request, do so. Note that we reuse > + // the same buffer for multiple in-flight requests. It doesn't > + // matter here because we're just trying to write random stuff, > + // but that would be Very Bad in a real application. > + // Simulate a mix of large and small requests. > + let size = if rng.gen() { BUFFER_SIZE } else { 512 }; > + let offset = rng.gen_range(0..export_size - size as u64); > + > + let mut buf = [0u8; BUFFER_SIZE]; > + let nbd = nbd.clone(); > + if rng.gen() { > + join_set.spawn(async move { > + nbd.pread(&mut buf, offset, None).await > + }); > + } else { > + // Fill the buf with random data. > + rng.fill(&mut buf); > + join_set > + .spawn(async move { nbd.pwrite(&buf, offset, None).await }); > + } > + stats.requests += 1; > + } > + join_set.join_next().await.unwrap().unwrap()?; > + } > + > + if task_idx == 0 {} > + Ok(stats) > +} > diff --git a/rust/run-tests.sh.in b/rust/run-tests.sh.in > index 3ebf9a1..661c018 100755 > --- a/rust/run-tests.sh.in > +++ b/rust/run-tests.sh.in > @@ -32,6 +32,8 @@ if [ -z "$VG" ]; then > --run '@CARGO@ run --example get-size -- $unixsocket' > @NBDKIT@ -U - floppy . \ > --run '@CARGO@ run --example fetch-first-sector -- $unixsocket' > + @NBDKIT@ -U - memory 10M \ > + --run '@CARGO@ run --example concurrent-read-write -- $unixsocket'I didn't spot that the other examples could only use a unix socket, rather than a URI, but it'd be nice to allow them to use URIs too. For this commit: Reviewed-by: Richard W.M. Jones <rjones at redhat.com> Rich.> else > @CARGO@ test --config "target.'cfg(all())'.runner = \"$VG\"" -- --nocapture > fi > -- > 2.41.0-- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-builder quickly builds VMs from scratch http://libguestfs.org/virt-builder.1.html
Richard W.M. Jones
2023-Aug-15 09:07 UTC
[Libguestfs] [libnbd PATCH v7 9/9] rust: async: Add an example
So what do I think about the patch series as a whole ... (in particular, the patches I didn't add Reviewed-by tags to). It would be much nicer IMHO if we didn't have to define callback lifetimes in this way, since they were not intended to be classified into async_kind / cblifetime / cbcount, and this might limit our options for new ABIs in future. I see two ways to go here: (1) (Easier for now, problems in future) Rename async_kind, cblifetime and cbcount as rust_async_kind, rust_cblifetime, rust_cbcount, which would in some sense limit the scope of getting these right to the Rust bindings. This defers the pain til later (maybe never, if we never added an ABI which didn't satisfy these constraints). (2) (Harder for now, no problems in future) Use a reference count in the Rust bindings, which is how the other bindings work. It makes the Rust bindings more awkward to use, but does more accurately express the actual intention of the API. Discuss ... Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com libguestfs lets you edit virtual machines. Supports shell scripting, bindings from many languages. http://libguestfs.org