Tage Johansson
2023-Aug-26  11:29 UTC
[Libguestfs] [libnbd PATCH v9 6/7] rust: async: Add an example
This patch adds an example using the asynchronous Rust bindings.
---
 rust/Cargo.toml                        |   1 +
 rust/examples/concurrent-read-write.rs | 149 +++++++++++++++++++++++++
 rust/run-tests.sh.in                   |   2 +
 3 files changed, 152 insertions(+)
 create mode 100644 rust/examples/concurrent-read-write.rs
diff --git a/rust/Cargo.toml b/rust/Cargo.toml
index c49f9f2..0879b34 100644
--- a/rust/Cargo.toml
+++ b/rust/Cargo.toml
@@ -58,5 +58,6 @@ default = ["log", "tokio"]
 anyhow = "1.0.72"
 once_cell = "1.18.0"
 pretty-hex = "0.3.0"
+rand = { version = "0.8.5", default-features = false, features =
["small_rng", "min_const_gen"] }
 tempfile = "3.6.0"
 tokio = { version = "1.29.1", default-features = false, features =
["rt-multi-thread", "macros"] }
diff --git a/rust/examples/concurrent-read-write.rs
b/rust/examples/concurrent-read-write.rs
new file mode 100644
index 0000000..4858f76
--- /dev/null
+++ b/rust/examples/concurrent-read-write.rs
@@ -0,0 +1,149 @@
+//! Example usage with nbdkit:
+//!     nbdkit -U - memory 100M \
+//!       --run 'cargo run --example concurrent-read-write --
$unixsocket'
+//! Or connect over a URI:
+//!     nbdkit -U - memory 100M \
+//!       --run 'cargo run --example concurrent-read-write -- $uri'
+//!
+//! This will read and write randomly over the first megabyte of the
+//! plugin using multi-conn, multiple threads and multiple requests in
+//! flight on each thread.
+
+#![deny(warnings)]
+use rand::prelude::*;
+use std::env;
+use std::sync::Arc;
+use tokio::task::JoinSet;
+
+/// Number of simultaneous connections to the NBD server.
+///
+/// Note that some servers only support a limited number of
+/// simultaneous connections, and/or have a configurable thread pool
+/// internally, and if you exceed those limits then something will break.
+const NR_MULTI_CONN: usize = 8;
+
+/// Number of commands that can be "in flight" at the same time on
each
+/// connection.  (Therefore the total number of requests in flight may
+/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT).
+const MAX_IN_FLIGHT: usize = 16;
+
+/// The size of large reads and writes, must be > 512.
+const BUFFER_SIZE: usize = 1024;
+
+/// Number of commands we issue (per [task][tokio::task]).
+const NR_CYCLES: usize = 32;
+
+/// Statistics gathered during the run.
+#[derive(Debug, Default)]
+struct Stats {
+    /// The total number of requests made.
+    requests: usize,
+}
+
+#[tokio::main]
+async fn main() -> anyhow::Result<()> {
+    let args = env::args_os().collect::<Vec<_>>();
+    if args.len() != 2 {
+        anyhow::bail!("Usage: {:?} socket", args[0]);
+    }
+
+    // We begin by making a connection to the server to get the export size
+    // and ensure that it supports multiple connections and is writable.
+    let nbd = libnbd::Handle::new()?;
+
+    // Check if the user provided a URI or a unix socket.
+    let socket_or_uri = args[1].to_str().unwrap();
+    if socket_or_uri.contains("://") {
+        nbd.connect_uri(socket_or_uri)?;
+    } else {
+        nbd.connect_unix(socket_or_uri)?;
+    }
+
+    let export_size = nbd.get_size()?;
+    anyhow::ensure!(
+        (BUFFER_SIZE as u64) < export_size,
+        "export is {export_size}B, must be larger than
{BUFFER_SIZE}B"
+    );
+    anyhow::ensure!(
+        !nbd.is_read_only()?,
+        "error: this NBD export is read-only"
+    );
+    anyhow::ensure!(
+        nbd.can_multi_conn()?,
+        "error: this NBD export does not support multi-conn"
+    );
+    drop(nbd); // Close the connection.
+
+    // Start the worker tasks, one per connection.
+    let mut tasks = JoinSet::new();
+    for i in 0..NR_MULTI_CONN {
+        tasks.spawn(run_thread(i, socket_or_uri.to_owned(), export_size));
+    }
+
+    // Wait for the tasks to complete.
+    let mut stats = Stats::default();
+    while !tasks.is_empty() {
+        let this_stats = tasks.join_next().await.unwrap().unwrap()?;
+        stats.requests += this_stats.requests;
+    }
+
+    // Make sure the number of requests that were required matches what
+    // we expect.
+    assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES);
+
+    Ok(())
+}
+
+async fn run_thread(
+    task_idx: usize,
+    socket_or_uri: String,
+    export_size: u64,
+) -> anyhow::Result<Stats> {
+    // Start a new connection to the server.
+    // We shall spawn many commands concurrently on different tasks and those
+    // futures must be `'static`, hence we wrap the handle in an [Arc].
+    let nbd = Arc::new(libnbd::AsyncHandle::new()?);
+
+    // Check if the user provided a URI or a unix socket.
+    if socket_or_uri.contains("://") {
+        nbd.connect_uri(socket_or_uri).await?;
+    } else {
+        nbd.connect_unix(socket_or_uri).await?;
+    }
+
+    let mut rng = SmallRng::seed_from_u64(44 as u64);
+
+    // Issue commands.
+    let mut stats = Stats::default();
+    let mut join_set = JoinSet::new();
+    //tokio::time::sleep(std::time::Duration::from_secs(1)).await;
+    while stats.requests < NR_CYCLES || !join_set.is_empty() {
+        while stats.requests < NR_CYCLES && join_set.len() <
MAX_IN_FLIGHT {
+            // If we want to issue another request, do so.  Note that we reuse
+            // the same buffer for multiple in-flight requests.  It doesn't
+            // matter here because we're just trying to write random stuff,
+            // but that would be Very Bad in a real application.
+            // Simulate a mix of large and small requests.
+            let size = if rng.gen() { BUFFER_SIZE } else { 512 };
+            let offset = rng.gen_range(0..export_size - size as u64);
+
+            let mut buf = [0u8; BUFFER_SIZE];
+            let nbd = nbd.clone();
+            if rng.gen() {
+                join_set.spawn(async move {
+                    nbd.pread(&mut buf, offset, None).await
+                });
+            } else {
+                // Fill the buf with random data.
+                rng.fill(&mut buf);
+                join_set
+                    .spawn(async move { nbd.pwrite(&buf, offset,
None).await });
+            }
+            stats.requests += 1;
+        }
+        join_set.join_next().await.unwrap().unwrap()?;
+    }
+
+    if task_idx == 0 {}
+    Ok(stats)
+}
diff --git a/rust/run-tests.sh.in b/rust/run-tests.sh.in
index 3ebf9a1..661c018 100755
--- a/rust/run-tests.sh.in
+++ b/rust/run-tests.sh.in
@@ -32,6 +32,8 @@ if [ -z "$VG" ]; then
              --run '@CARGO@ run --example get-size -- $unixsocket'
     @NBDKIT@ -U - floppy . \
              --run '@CARGO@ run --example fetch-first-sector --
$unixsocket'
+    @NBDKIT@ -U - memory 10M \
+             --run '@CARGO@ run --example concurrent-read-write --
$unixsocket'
 else
     @CARGO@ test --config "target.'cfg(all())'.runner =
\"$VG\"" -- --nocapture
 fi
-- 
2.42.0
Eric Blake
2023-Sep-01  20:41 UTC
[Libguestfs] [libnbd PATCH v9 6/7] rust: async: Add an example
On Sat, Aug 26, 2023 at 11:29:59AM +0000, Tage Johansson wrote:> This patch adds an example using the asynchronous Rust bindings. > --- > rust/Cargo.toml | 1 + > rust/examples/concurrent-read-write.rs | 149 +++++++++++++++++++++++++ > rust/run-tests.sh.in | 2 + > 3 files changed, 152 insertions(+) > create mode 100644 rust/examples/concurrent-read-write.rs > > diff --git a/rust/Cargo.toml b/rust/Cargo.toml > index c49f9f2..0879b34 100644 > --- a/rust/Cargo.toml > +++ b/rust/Cargo.toml > @@ -58,5 +58,6 @@ default = ["log", "tokio"] > anyhow = "1.0.72" > once_cell = "1.18.0" > pretty-hex = "0.3.0" > +rand = { version = "0.8.5", default-features = false, features = ["small_rng", "min_const_gen"] } > tempfile = "3.6.0" > tokio = { version = "1.29.1", default-features = false, features = ["rt-multi-thread", "macros"] } > diff --git a/rust/examples/concurrent-read-write.rs b/rust/examples/concurrent-read-write.rs > new file mode 100644 > index 0000000..4858f76 > --- /dev/null > +++ b/rust/examples/concurrent-read-write.rs > @@ -0,0 +1,149 @@ > +//! Example usage with nbdkit: > +//! nbdkit -U - memory 100M \ > +//! --run 'cargo run --example concurrent-read-write -- $unixsocket' > +//! Or connect over a URI: > +//! nbdkit -U - memory 100M \ > +//! --run 'cargo run --example concurrent-read-write -- $uri'Should be "$uri" here (to avoid accidental shell globbing surprises).> +//! > +//! This will read and write randomly over the first megabyte of theThis says first megabyte...[1]> +//! plugin using multi-conn, multiple threads and multiple requests in > +//! flight on each thread. > + > +#![deny(warnings)] > +use rand::prelude::*; > +use std::env; > +use std::sync::Arc; > +use tokio::task::JoinSet; > + > +/// Number of simultaneous connections to the NBD server. > +/// > +/// Note that some servers only support a limited number of > +/// simultaneous connections, and/or have a configurable thread pool > +/// internally, and if you exceed those limits then something will break. > +const NR_MULTI_CONN: usize = 8; > + > +/// Number of commands that can be "in flight" at the same time on each > +/// connection. (Therefore the total number of requests in flight may > +/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT). > +const MAX_IN_FLIGHT: usize = 16; > + > +/// The size of large reads and writes, must be > 512. > +const BUFFER_SIZE: usize = 1024;1024 isn't much larger than 512. It looks like you borrowed heavily from examples/threaded-reads-and-writes.c, but that used 1M as the large buffer.> + > +/// Number of commands we issue (per [task][tokio::task]). > +const NR_CYCLES: usize = 32; > + > +/// Statistics gathered during the run. > +#[derive(Debug, Default)] > +struct Stats { > + /// The total number of requests made. > + requests: usize, > +} > + > +#[tokio::main] > +async fn main() -> anyhow::Result<()> { > + let args = env::args_os().collect::<Vec<_>>(); > + if args.len() != 2 { > + anyhow::bail!("Usage: {:?} socket", args[0]); > + } > + > + // We begin by making a connection to the server to get the export size > + // and ensure that it supports multiple connections and is writable. > + let nbd = libnbd::Handle::new()?; > + > + // Check if the user provided a URI or a unix socket. > + let socket_or_uri = args[1].to_str().unwrap(); > + if socket_or_uri.contains("://") { > + nbd.connect_uri(socket_or_uri)?; > + } else { > + nbd.connect_unix(socket_or_uri)?; > + } > + > + let export_size = nbd.get_size()?; > + anyhow::ensure!( > + (BUFFER_SIZE as u64) < export_size, > + "export is {export_size}B, must be larger than {BUFFER_SIZE}B" > + ); > + anyhow::ensure!( > + !nbd.is_read_only()?, > + "error: this NBD export is read-only" > + ); > + anyhow::ensure!( > + nbd.can_multi_conn()?, > + "error: this NBD export does not support multi-conn" > + ); > + drop(nbd); // Close the connection. > + > + // Start the worker tasks, one per connection. > + let mut tasks = JoinSet::new(); > + for i in 0..NR_MULTI_CONN { > + tasks.spawn(run_thread(i, socket_or_uri.to_owned(), export_size)); > + } > + > + // Wait for the tasks to complete. > + let mut stats = Stats::default(); > + while !tasks.is_empty() { > + let this_stats = tasks.join_next().await.unwrap().unwrap()?; > + stats.requests += this_stats.requests; > + } > + > + // Make sure the number of requests that were required matches what > + // we expect. > + assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES); > + > + Ok(()) > +} > + > +async fn run_thread( > + task_idx: usize, > + socket_or_uri: String, > + export_size: u64, > +) -> anyhow::Result<Stats> { > + // Start a new connection to the server. > + // We shall spawn many commands concurrently on different tasks and those > + // futures must be `'static`, hence we wrap the handle in an [Arc]. > + let nbd = Arc::new(libnbd::AsyncHandle::new()?); > + > + // Check if the user provided a URI or a unix socket. > + if socket_or_uri.contains("://") { > + nbd.connect_uri(socket_or_uri).await?; > + } else { > + nbd.connect_unix(socket_or_uri).await?; > + } > + > + let mut rng = SmallRng::seed_from_u64(44 as u64);Why are you hard-coding this to the same fixed seed, for every worker thread?> + > + // Issue commands. > + let mut stats = Stats::default(); > + let mut join_set = JoinSet::new(); > + //tokio::time::sleep(std::time::Duration::from_secs(1)).await;Does this comment need to remain?> + while stats.requests < NR_CYCLES || !join_set.is_empty() { > + while stats.requests < NR_CYCLES && join_set.len() < MAX_IN_FLIGHT { > + // If we want to issue another request, do so. Note that we reuse > + // the same buffer for multiple in-flight requests. It doesn't > + // matter here because we're just trying to write random stuff, > + // but that would be Very Bad in a real application. > + // Simulate a mix of large and small requests. > + let size = if rng.gen() { BUFFER_SIZE } else { 512 };Is this missing a math operation to check only one bit of the generated value? /me goes and reads more Rust docs Oh, so rng.gen() is shorthand for rng.gen::<bool>() when used in a boolean context, which really does have a 50/50 split in returned values? Cool shorthand!> + let offset = rng.gen_range(0..export_size - size as u64);[1]...but you are actually targetting anywhere in the image. It looks like that was a copy-paste error from a comment that has gone stale after refactoring in the original C code.> + > + let mut buf = [0u8; BUFFER_SIZE]; > + let nbd = nbd.clone(); > + if rng.gen() { > + join_set.spawn(async move { > + nbd.pread(&mut buf, offset, None).await > + }); > + } else { > + // Fill the buf with random data. > + rng.fill(&mut buf); > + join_set > + .spawn(async move { nbd.pwrite(&buf, offset, None).await });I'm impressed: the async handle makes for functions that use a lot less boilerplate than the comparable C nbd_aio_* counterparts.> + } > + stats.requests += 1; > + } > + join_set.join_next().await.unwrap().unwrap()?; > + } > + > + if task_idx == 0 {} > + Ok(stats) > +} > diff --git a/rust/run-tests.sh.in b/rust/run-tests.sh.in > index 3ebf9a1..661c018 100755-- Eric Blake, Principal Software Engineer Red Hat, Inc. Virtualization: qemu.org | libguestfs.org