Richard W.M. Jones
2023-Aug-15 09:00 UTC
[Libguestfs] [libnbd PATCH v7 9/9] rust: async: Add an example
On Thu, Aug 10, 2023 at 11:24:36AM +0000, Tage Johansson wrote:> This patch adds an example using the asynchronous Rust bindings. > --- > rust/Cargo.toml | 1 + > rust/examples/concurrent-read-write.rs | 135 +++++++++++++++++++++++++ > rust/run-tests.sh.in | 2 + > 3 files changed, 138 insertions(+) > create mode 100644 rust/examples/concurrent-read-write.rs > > diff --git a/rust/Cargo.toml b/rust/Cargo.toml > index d001248..4332783 100644 > --- a/rust/Cargo.toml > +++ b/rust/Cargo.toml > @@ -54,5 +54,6 @@ default = ["log", "tokio"] > anyhow = "1.0.72" > once_cell = "1.18.0" > pretty-hex = "0.3.0" > +rand = { version = "0.8.5", default-features = false, features = ["small_rng", "min_const_gen"] } > tempfile = "3.6.0" > tokio = { version = "1.29.1", default-features = false, features = ["rt-multi-thread", "macros"] } > diff --git a/rust/examples/concurrent-read-write.rs b/rust/examples/concurrent-read-write.rs > new file mode 100644 > index 0000000..a1c3e8a > --- /dev/null > +++ b/rust/examples/concurrent-read-write.rs > @@ -0,0 +1,135 @@ > +//! Example usage with nbdkit: > +//! > +//! nbdkit -U - memory 100M \ > +//! --run 'cargo run --example concurrent-read-write -- $unixsocket'It would be nice to make the example use a URI (ie. nbd_connect_uri). Is that possible?> +//! This will read and write randomly over the first megabyte of the > +//! plugin using multi-conn, multiple threads and multiple requests in > +//! flight on each thread. > + > +#![deny(warnings)] > +use rand::prelude::*; > +use std::env; > +use std::path::PathBuf; > +use std::sync::Arc; > +use tokio::task::JoinSet; > + > +/// Number of simultaneous connections to the NBD server. > +/// > +/// Note that some servers only support a limited number of > +/// simultaneous connections, and/or have a configurable thread pool > +/// internally, and if you exceed those limits then something will break. > +const NR_MULTI_CONN: usize = 8; > + > +/// Number of commands that can be "in flight" at the same time on each > +/// connection. (Therefore the total number of requests in flight may > +/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT). > +const MAX_IN_FLIGHT: usize = 16; > + > +/// The size of large reads and writes, must be > 512. > +const BUFFER_SIZE: usize = 1024; > + > +/// Number of commands we issue (per [task][tokio::task]). > +const NR_CYCLES: usize = 32; > + > +/// Statistics gathered during the run. > +#[derive(Debug, Default)] > +struct Stats { > + /// The total number of requests made. > + requests: usize, > +} > + > +#[tokio::main] > +async fn main() -> anyhow::Result<()> { > + let args = env::args_os().collect::<Vec<_>>(); > + if args.len() != 2 { > + anyhow::bail!("Usage: {:?} socket", args[0]); > + } > + let socket = &args[1]; > + > + // We begin by making a connection to the server to get the export size > + // and ensure that it supports multiple connections and is writable. > + let nbd = libnbd::Handle::new()?; > + nbd.connect_unix(&socket)?;ie. here ^^^> + let export_size = nbd.get_size()?; > + anyhow::ensure!( > + (BUFFER_SIZE as u64) < export_size, > + "export is {export_size}B, must be larger than {BUFFER_SIZE}B" > + ); > + anyhow::ensure!( > + !nbd.is_read_only()?, > + "error: this NBD export is read-only" > + ); > + anyhow::ensure!( > + nbd.can_multi_conn()?, > + "error: this NBD export does not support multi-conn" > + ); > + drop(nbd); // Close the connection. > + > + // Start the worker tasks, one per connection. > + let mut tasks = JoinSet::new(); > + for i in 0..NR_MULTI_CONN { > + tasks.spawn(run_thread(i, socket.clone().into(), export_size)); > + } > + > + // Wait for the tasks to complete. > + let mut stats = Stats::default(); > + while !tasks.is_empty() { > + let this_stats = tasks.join_next().await.unwrap().unwrap()?; > + stats.requests += this_stats.requests; > + } > + > + // Make sure the number of requests that were required matches what > + // we expect. > + assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES); > + > + Ok(()) > +} > + > +async fn run_thread( > + task_idx: usize, > + socket: PathBuf, > + export_size: u64, > +) -> anyhow::Result<Stats> { > + // Start a new connection to the server. > + // We shall spawn many commands concurrently on different tasks and those > + // futures must be `'static`, hence we wrap the handle in an [Arc]. > + let nbd = Arc::new(libnbd::AsyncHandle::new()?); > + nbd.connect_unix(socket).await?; > + > + let mut rng = SmallRng::seed_from_u64(44 as u64); > + > + // Issue commands. > + let mut stats = Stats::default(); > + let mut join_set = JoinSet::new(); > + //tokio::time::sleep(std::time::Duration::from_secs(1)).await; > + while stats.requests < NR_CYCLES || !join_set.is_empty() { > + while stats.requests < NR_CYCLES && join_set.len() < MAX_IN_FLIGHT { > + // If we want to issue another request, do so. Note that we reuse > + // the same buffer for multiple in-flight requests. It doesn't > + // matter here because we're just trying to write random stuff, > + // but that would be Very Bad in a real application. > + // Simulate a mix of large and small requests. > + let size = if rng.gen() { BUFFER_SIZE } else { 512 }; > + let offset = rng.gen_range(0..export_size - size as u64); > + > + let mut buf = [0u8; BUFFER_SIZE]; > + let nbd = nbd.clone(); > + if rng.gen() { > + join_set.spawn(async move { > + nbd.pread(&mut buf, offset, None).await > + }); > + } else { > + // Fill the buf with random data. > + rng.fill(&mut buf); > + join_set > + .spawn(async move { nbd.pwrite(&buf, offset, None).await }); > + } > + stats.requests += 1; > + } > + join_set.join_next().await.unwrap().unwrap()?; > + } > + > + if task_idx == 0 {} > + Ok(stats) > +} > diff --git a/rust/run-tests.sh.in b/rust/run-tests.sh.in > index 3ebf9a1..661c018 100755 > --- a/rust/run-tests.sh.in > +++ b/rust/run-tests.sh.in > @@ -32,6 +32,8 @@ if [ -z "$VG" ]; then > --run '@CARGO@ run --example get-size -- $unixsocket' > @NBDKIT@ -U - floppy . \ > --run '@CARGO@ run --example fetch-first-sector -- $unixsocket' > + @NBDKIT@ -U - memory 10M \ > + --run '@CARGO@ run --example concurrent-read-write -- $unixsocket'I didn't spot that the other examples could only use a unix socket, rather than a URI, but it'd be nice to allow them to use URIs too. For this commit: Reviewed-by: Richard W.M. Jones <rjones at redhat.com> Rich.> else > @CARGO@ test --config "target.'cfg(all())'.runner = \"$VG\"" -- --nocapture > fi > -- > 2.41.0-- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-builder quickly builds VMs from scratch http://libguestfs.org/virt-builder.1.html
Tage Johansson
2023-Aug-15 12:19 UTC
[Libguestfs] [libnbd PATCH v7 9/9] rust: async: Add an example
On 8/15/2023 11:00 AM, Richard W.M. Jones wrote:> On Thu, Aug 10, 2023 at 11:24:36AM +0000, Tage Johansson wrote: >> This patch adds an example using the asynchronous Rust bindings. >> --- >> rust/Cargo.toml | 1 + >> rust/examples/concurrent-read-write.rs | 135 +++++++++++++++++++++++++ >> rust/run-tests.sh.in | 2 + >> 3 files changed, 138 insertions(+) >> create mode 100644 rust/examples/concurrent-read-write.rs >> >> diff --git a/rust/Cargo.toml b/rust/Cargo.toml >> index d001248..4332783 100644 >> --- a/rust/Cargo.toml >> +++ b/rust/Cargo.toml >> @@ -54,5 +54,6 @@ default = ["log", "tokio"] >> anyhow = "1.0.72" >> once_cell = "1.18.0" >> pretty-hex = "0.3.0" >> +rand = { version = "0.8.5", default-features = false, features = ["small_rng", "min_const_gen"] } >> tempfile = "3.6.0" >> tokio = { version = "1.29.1", default-features = false, features = ["rt-multi-thread", "macros"] } >> diff --git a/rust/examples/concurrent-read-write.rs b/rust/examples/concurrent-read-write.rs >> new file mode 100644 >> index 0000000..a1c3e8a >> --- /dev/null >> +++ b/rust/examples/concurrent-read-write.rs >> @@ -0,0 +1,135 @@ >> +//! Example usage with nbdkit: >> +//! >> +//! nbdkit -U - memory 100M \ >> +//! --run 'cargo run --example concurrent-read-write -- $unixsocket' > It would be nice to make the example use a URI (ie. nbd_connect_uri). > Is that possible?Yes, of course.>> +//! This will read and write randomly over the first megabyte of the >> +//! plugin using multi-conn, multiple threads and multiple requests in >> +//! flight on each thread. >> + >> +#![deny(warnings)] >> +use rand::prelude::*; >> +use std::env; >> +use std::path::PathBuf; >> +use std::sync::Arc; >> +use tokio::task::JoinSet; >> + >> +/// Number of simultaneous connections to the NBD server. >> +/// >> +/// Note that some servers only support a limited number of >> +/// simultaneous connections, and/or have a configurable thread pool >> +/// internally, and if you exceed those limits then something will break. >> +const NR_MULTI_CONN: usize = 8; >> + >> +/// Number of commands that can be "in flight" at the same time on each >> +/// connection. (Therefore the total number of requests in flight may >> +/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT). >> +const MAX_IN_FLIGHT: usize = 16; >> + >> +/// The size of large reads and writes, must be > 512. >> +const BUFFER_SIZE: usize = 1024; >> + >> +/// Number of commands we issue (per [task][tokio::task]). >> +const NR_CYCLES: usize = 32; >> + >> +/// Statistics gathered during the run. >> +#[derive(Debug, Default)] >> +struct Stats { >> + /// The total number of requests made. >> + requests: usize, >> +} >> + >> +#[tokio::main] >> +async fn main() -> anyhow::Result<()> { >> + let args = env::args_os().collect::<Vec<_>>(); >> + if args.len() != 2 { >> + anyhow::bail!("Usage: {:?} socket", args[0]); >> + } >> + let socket = &args[1]; >> + >> + // We begin by making a connection to the server to get the export size >> + // and ensure that it supports multiple connections and is writable. >> + let nbd = libnbd::Handle::new()?; >> + nbd.connect_unix(&socket)?; > ie. here ^^^ > >> + let export_size = nbd.get_size()?; >> + anyhow::ensure!( >> + (BUFFER_SIZE as u64) < export_size, >> + "export is {export_size}B, must be larger than {BUFFER_SIZE}B" >> + ); >> + anyhow::ensure!( >> + !nbd.is_read_only()?, >> + "error: this NBD export is read-only" >> + ); >> + anyhow::ensure!( >> + nbd.can_multi_conn()?, >> + "error: this NBD export does not support multi-conn" >> + ); >> + drop(nbd); // Close the connection. >> + >> + // Start the worker tasks, one per connection. >> + let mut tasks = JoinSet::new(); >> + for i in 0..NR_MULTI_CONN { >> + tasks.spawn(run_thread(i, socket.clone().into(), export_size)); >> + } >> + >> + // Wait for the tasks to complete. >> + let mut stats = Stats::default(); >> + while !tasks.is_empty() { >> + let this_stats = tasks.join_next().await.unwrap().unwrap()?; >> + stats.requests += this_stats.requests; >> + } >> + >> + // Make sure the number of requests that were required matches what >> + // we expect. >> + assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES); >> + >> + Ok(()) >> +} >> + >> +async fn run_thread( >> + task_idx: usize, >> + socket: PathBuf, >> + export_size: u64, >> +) -> anyhow::Result<Stats> { >> + // Start a new connection to the server. >> + // We shall spawn many commands concurrently on different tasks and those >> + // futures must be `'static`, hence we wrap the handle in an [Arc]. >> + let nbd = Arc::new(libnbd::AsyncHandle::new()?); >> + nbd.connect_unix(socket).await?; >> + >> + let mut rng = SmallRng::seed_from_u64(44 as u64); >> + >> + // Issue commands. >> + let mut stats = Stats::default(); >> + let mut join_set = JoinSet::new(); >> + //tokio::time::sleep(std::time::Duration::from_secs(1)).await; >> + while stats.requests < NR_CYCLES || !join_set.is_empty() { >> + while stats.requests < NR_CYCLES && join_set.len() < MAX_IN_FLIGHT { >> + // If we want to issue another request, do so. Note that we reuse >> + // the same buffer for multiple in-flight requests. It doesn't >> + // matter here because we're just trying to write random stuff, >> + // but that would be Very Bad in a real application. >> + // Simulate a mix of large and small requests. >> + let size = if rng.gen() { BUFFER_SIZE } else { 512 }; >> + let offset = rng.gen_range(0..export_size - size as u64); >> + >> + let mut buf = [0u8; BUFFER_SIZE]; >> + let nbd = nbd.clone(); >> + if rng.gen() { >> + join_set.spawn(async move { >> + nbd.pread(&mut buf, offset, None).await >> + }); >> + } else { >> + // Fill the buf with random data. >> + rng.fill(&mut buf); >> + join_set >> + .spawn(async move { nbd.pwrite(&buf, offset, None).await }); >> + } >> + stats.requests += 1; >> + } >> + join_set.join_next().await.unwrap().unwrap()?; >> + } >> + >> + if task_idx == 0 {} >> + Ok(stats) >> +} >> diff --git a/rust/run-tests.sh.in b/rust/run-tests.sh.in >> index 3ebf9a1..661c018 100755 >> --- a/rust/run-tests.sh.in >> +++ b/rust/run-tests.sh.in >> @@ -32,6 +32,8 @@ if [ -z "$VG" ]; then >> --run '@CARGO@ run --example get-size -- $unixsocket' >> @NBDKIT@ -U - floppy . \ >> --run '@CARGO@ run --example fetch-first-sector -- $unixsocket' >> + @NBDKIT@ -U - memory 10M \ >> + --run '@CARGO@ run --example concurrent-read-write -- $unixsocket' > I didn't spot that the other examples could only use a unix socket, > rather than a URI, but it'd be nice to allow them to use URIs too.I will add that in a separate patch in the next version of the patch series. I guess that we still use nbdkit with a unix socket when testing the examples in run-tests.sh, right? Best regards, Tage> For this commit: > > Reviewed-by: Richard W.M. Jones <rjones at redhat.com> > > Rich. > >> else >> @CARGO@ test --config "target.'cfg(all())'.runner = \"$VG\"" -- --nocapture >> fi >> -- >> 2.41.0