Tage Johansson
2023-Aug-20 14:16 UTC
[Libguestfs] [libnbd PATCH v8 00/10] Rust Bindings for Libnbd
This is the 8th version of the Rust bindings for libnbd. Compared to v7, it (amongst others) removes the cblifetime and cbcount fields in API.ml. It also contains patches for improving the examples, reformatting rust files, and correcting the license field in Cargo.toml. Best regards, Tage Tage Johansson (10): rust: Specify minimum Rust version in Cargo.toml rust: Correct license field in Cargo.toml rust: Format some files to conform to rustfmt and rustfmt.toml rust: Make it possible to run examples with a URI generator: Add information about asynchronous handle calls rust: async: Create an async friendly handle type generator: Add `modifies_fd` flag to the [call] structure rust: async: Use the modifies_fd flag to exclude calls rust: async: Add a couple of integration tests rust: async: Add an example generator/API.ml | 64 +++++ generator/API.mli | 18 ++ generator/Rust.ml | 243 +++++++++++++++- generator/Rust.mli | 2 + generator/generator.ml | 2 + rust/Cargo.toml | 12 +- rust/Makefile.am | 2 + rust/cargo_test/Cargo.toml | 1 + rust/examples/concurrent-read-write.rs | 149 ++++++++++ rust/examples/connect-command.rs | 1 - rust/examples/fetch-first-sector.rs | 15 +- rust/examples/get-size.rs | 20 +- rust/libnbd-sys/Cargo.toml | 1 + rust/run-tests.sh.in | 2 + rust/src/async_handle.rs | 268 ++++++++++++++++++ rust/src/lib.rs | 8 + rust/src/utils.rs | 9 + rust/tests/test_200_connect_command.rs | 11 +- rust/tests/test_220_opt_list.rs | 2 +- rust/tests/test_240_opt_list_meta.rs | 2 +- rust/tests/test_245_opt_list_meta_queries.rs | 2 +- rust/tests/test_250_opt_set_meta.rs | 2 +- rust/tests/test_255_opt_set_meta_queries.rs | 2 +- rust/tests/test_300_get_size.rs | 1 - rust/tests/test_460_block_status.rs | 2 +- rust/tests/test_620_stats.rs | 11 +- rust/tests/test_async_100_handle.rs | 25 ++ rust/tests/test_async_200_connect_command.rs | 26 ++ rust/tests/test_async_210_opt_abort.rs | 32 +++ rust/tests/test_async_220_opt_list.rs | 86 ++++++ rust/tests/test_async_230_opt_info.rs | 122 ++++++++ rust/tests/test_async_240_opt_list_meta.rs | 150 ++++++++++ .../test_async_245_opt_list_meta_queries.rs | 94 ++++++ rust/tests/test_async_250_opt_set_meta.rs | 125 ++++++++ .../test_async_255_opt_set_meta_queries.rs | 110 +++++++ rust/tests/test_async_400_pread.rs | 40 +++ rust/tests/test_async_405_pread_structured.rs | 84 ++++++ rust/tests/test_async_410_pwrite.rs | 59 ++++ rust/tests/test_async_460_block_status.rs | 98 +++++++ rust/tests/test_async_620_stats.rs | 69 +++++ scripts/git.orderfile | 1 + 41 files changed, 1930 insertions(+), 43 deletions(-) create mode 100644 rust/examples/concurrent-read-write.rs create mode 100644 rust/src/async_handle.rs create mode 100644 rust/tests/test_async_100_handle.rs create mode 100644 rust/tests/test_async_200_connect_command.rs create mode 100644 rust/tests/test_async_210_opt_abort.rs create mode 100644 rust/tests/test_async_220_opt_list.rs create mode 100644 rust/tests/test_async_230_opt_info.rs create mode 100644 rust/tests/test_async_240_opt_list_meta.rs create mode 100644 rust/tests/test_async_245_opt_list_meta_queries.rs create mode 100644 rust/tests/test_async_250_opt_set_meta.rs create mode 100644 rust/tests/test_async_255_opt_set_meta_queries.rs create mode 100644 rust/tests/test_async_400_pread.rs create mode 100644 rust/tests/test_async_405_pread_structured.rs create mode 100644 rust/tests/test_async_410_pwrite.rs create mode 100644 rust/tests/test_async_460_block_status.rs create mode 100644 rust/tests/test_async_620_stats.rs base-commit: f2dac1102884e3dea1cfb33479b34dd689fbb670 prerequisite-patch-id: ce5d2f65bb12ecda61c97fdf22255e188016b3fc prerequisite-patch-id: cb5e3f05b600a4953e2a77bd53067bb51903aecd prerequisite-patch-id: 7613cb6ebcc41fb45da587fc9487eb6c643a14a4 prerequisite-patch-id: 397ff0bea47242cf549a894ce519b3702f072c44 prerequisite-patch-id: d6bcb838a1875541f3f125b95f346c21a7d614ea -- 2.41.0
Tage Johansson
2023-Aug-20 14:16 UTC
[Libguestfs] [libnbd PATCH v8 01/10] rust: Specify minimum Rust version in Cargo.toml
Specify the minimum Rust version (1.69) in rust/Cargo.toml. Previously, trying to compile with an older Rust toolchain caused various compilation errors which were hard to interpret. Now the cause of the error will be much more clear. --- rust/Cargo.toml | 4 ++++ rust/cargo_test/Cargo.toml | 1 + rust/libnbd-sys/Cargo.toml | 1 + rust/tests/test_220_opt_list.rs | 2 +- rust/tests/test_240_opt_list_meta.rs | 2 +- rust/tests/test_245_opt_list_meta_queries.rs | 2 +- rust/tests/test_250_opt_set_meta.rs | 2 +- rust/tests/test_255_opt_set_meta_queries.rs | 2 +- rust/tests/test_460_block_status.rs | 2 +- 9 files changed, 12 insertions(+), 6 deletions(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 04e371e..9ac6e53 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -20,7 +20,10 @@ [workspace.package] authors = ["Tage Johansson"] version = "0.1.0" +# Make sure that the values of the edition and rust-version fields in +# rust_test/Cargo.toml matches the values here. edition = "2021" +rust-version = "1.69" description = "Rust bindings for libnbd, a client library for controlling block devices over a network." license = "LGPL-2.1-only" keywords = ["libnbd", "block-device", "network"] @@ -31,6 +34,7 @@ name = "libnbd" authors.workspace = true version.workspace = true edition.workspace = true +rust-version.workspace = true description.workspace = true license.workspace = true keywords.workspace = true diff --git a/rust/cargo_test/Cargo.toml b/rust/cargo_test/Cargo.toml index 9f9d478..ffe88c1 100644 --- a/rust/cargo_test/Cargo.toml +++ b/rust/cargo_test/Cargo.toml @@ -20,4 +20,5 @@ [package] name = "cargo_test" edition = "2021" +rust-version = "1.69" version = "0.1.0" diff --git a/rust/libnbd-sys/Cargo.toml b/rust/libnbd-sys/Cargo.toml index 3fa581f..3f09f1b 100644 --- a/rust/libnbd-sys/Cargo.toml +++ b/rust/libnbd-sys/Cargo.toml @@ -18,6 +18,7 @@ [package] name = "libnbd-sys" version.workspace = true +rust-version.workspace = true edition.workspace = true description.workspace = true license.workspace = true diff --git a/rust/tests/test_220_opt_list.rs b/rust/tests/test_220_opt_list.rs index 180a95b..28d4db9 100644 --- a/rust/tests/test_220_opt_list.rs +++ b/rust/tests/test_220_opt_list.rs @@ -66,7 +66,7 @@ impl ConnTester { .push(String::from_utf8(name.to_owned()).unwrap()); 0 })?; - let exports = Arc::into_inner(exports).unwrap().into_inner().unwrap(); + let exports = Arc::try_unwrap(exports).unwrap().into_inner().unwrap(); assert_eq!(exports.len(), count as usize); assert_eq!(exports.len(), expected_exports.len()); for (export, &expected) in exports.iter().zip(expected_exports) { diff --git a/rust/tests/test_240_opt_list_meta.rs b/rust/tests/test_240_opt_list_meta.rs index 5598458..cc0310d 100644 --- a/rust/tests/test_240_opt_list_meta.rs +++ b/rust/tests/test_240_opt_list_meta.rs @@ -43,7 +43,7 @@ fn list_meta_ctxs(nbd: &libnbd::Handle) -> libnbd::Result<CtxInfo> { } 0 })?; - let info = Arc::into_inner(info).unwrap().into_inner().unwrap(); + let info = Arc::try_unwrap(info).unwrap().into_inner().unwrap(); assert_eq!(info.count, replies); Ok(info) } diff --git a/rust/tests/test_245_opt_list_meta_queries.rs b/rust/tests/test_245_opt_list_meta_queries.rs index da5c674..d6409e5 100644 --- a/rust/tests/test_245_opt_list_meta_queries.rs +++ b/rust/tests/test_245_opt_list_meta_queries.rs @@ -46,7 +46,7 @@ fn list_meta_ctxs( } 0 })?; - let info = Arc::into_inner(info).unwrap().into_inner().unwrap(); + let info = Arc::try_unwrap(info).unwrap().into_inner().unwrap(); assert_eq!(info.count, replies); Ok(info) } diff --git a/rust/tests/test_250_opt_set_meta.rs b/rust/tests/test_250_opt_set_meta.rs index c7a8144..26d82e5 100644 --- a/rust/tests/test_250_opt_set_meta.rs +++ b/rust/tests/test_250_opt_set_meta.rs @@ -44,7 +44,7 @@ fn set_meta_ctxs(nbd: &libnbd::Handle) -> libnbd::Result<CtxInfo> { } 0 })?; - let info = Arc::into_inner(info).unwrap().into_inner().unwrap(); + let info = Arc::try_unwrap(info).unwrap().into_inner().unwrap(); assert_eq!(info.count, replies); Ok(info) } diff --git a/rust/tests/test_255_opt_set_meta_queries.rs b/rust/tests/test_255_opt_set_meta_queries.rs index 143a2f1..87f8d6a 100644 --- a/rust/tests/test_255_opt_set_meta_queries.rs +++ b/rust/tests/test_255_opt_set_meta_queries.rs @@ -47,7 +47,7 @@ fn set_meta_ctxs_queries( } 0 })?; - let info = Arc::into_inner(info).unwrap().into_inner().unwrap(); + let info = Arc::try_unwrap(info).unwrap().into_inner().unwrap(); assert_eq!(info.count, replies); Ok(info) } diff --git a/rust/tests/test_460_block_status.rs b/rust/tests/test_460_block_status.rs index 7cdcb34..6b53ba9 100644 --- a/rust/tests/test_460_block_status.rs +++ b/rust/tests/test_460_block_status.rs @@ -43,7 +43,7 @@ fn block_status_get_entries( flags, ) .unwrap(); - Arc::into_inner(entries) + Arc::try_unwrap(entries) .unwrap() .into_inner() .unwrap() -- 2.41.0
Tage Johansson
2023-Aug-20 14:16 UTC
[Libguestfs] [libnbd PATCH v8 02/10] rust: Correct license field in Cargo.toml
Previously, the license field in rust/Cargo.toml was wrongly set to "LGPL-2.1-only". This commit changes it to the correct "LGPL-2.1-or-later", which conforms to the license preamble in the source files. --- rust/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 9ac6e53..01555de 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -25,7 +25,7 @@ version = "0.1.0" edition = "2021" rust-version = "1.69" description = "Rust bindings for libnbd, a client library for controlling block devices over a network." -license = "LGPL-2.1-only" +license = "LGPL-2.1-or-later" keywords = ["libnbd", "block-device", "network"] categories = ["api-bindings", "emulators", "virtualization"] -- 2.41.0
Tage Johansson
2023-Aug-20 14:16 UTC
[Libguestfs] [libnbd PATCH v8 03/10] rust: Format some files to conform to rustfmt and rustfmt.toml
Some Rust files were previously not formatted according to rustfmt and the top-level rustfmt.toml. This commit correct the formatting in those files. --- rust/examples/connect-command.rs | 1 - rust/tests/test_200_connect_command.rs | 11 ++--------- rust/tests/test_300_get_size.rs | 1 - rust/tests/test_620_stats.rs | 11 ++--------- 4 files changed, 4 insertions(+), 20 deletions(-) diff --git a/rust/examples/connect-command.rs b/rust/examples/connect-command.rs index db4adbe..4ad6143 100644 --- a/rust/examples/connect-command.rs +++ b/rust/examples/connect-command.rs @@ -1,7 +1,6 @@ //! This example shows how to run an NBD server //! (nbdkit) as a subprocess of libnbd. - fn main() -> libnbd::Result<()> { // Create the libnbd handle. let handle = libnbd::Handle::new()?; diff --git a/rust/tests/test_200_connect_command.rs b/rust/tests/test_200_connect_command.rs index 8338650..963f334 100644 --- a/rust/tests/test_200_connect_command.rs +++ b/rust/tests/test_200_connect_command.rs @@ -17,16 +17,9 @@ #![deny(warnings)] - #[test] fn test_connect_command() { let nbd = libnbd::Handle::new().unwrap(); - nbd.connect_command(&[ - "nbdkit", - "-s", - "--exit-with-parent", - "-v", - "null", - ]) - .unwrap(); + nbd.connect_command(&["nbdkit", "-s", "--exit-with-parent", "-v", "null"]) + .unwrap(); } diff --git a/rust/tests/test_300_get_size.rs b/rust/tests/test_300_get_size.rs index c830164..bed9d87 100644 --- a/rust/tests/test_300_get_size.rs +++ b/rust/tests/test_300_get_size.rs @@ -17,7 +17,6 @@ #![deny(warnings)] - #[test] fn test_get_size() { let nbd = libnbd::Handle::new().unwrap(); diff --git a/rust/tests/test_620_stats.rs b/rust/tests/test_620_stats.rs index 134d59a..0b6f4e2 100644 --- a/rust/tests/test_620_stats.rs +++ b/rust/tests/test_620_stats.rs @@ -17,7 +17,6 @@ #![deny(warnings)] - #[test] fn test_stats() { let nbd = libnbd::Handle::new().unwrap(); @@ -31,14 +30,8 @@ fn test_stats() { // Connection performs handshaking, which increments stats. // The number of bytes/chunks here may grow over time as more features get // automatically negotiated, so merely check that they are non-zero. - nbd.connect_command(&[ - "nbdkit", - "-s", - "--exit-with-parent", - "-v", - "null", - ]) - .unwrap(); + nbd.connect_command(&["nbdkit", "-s", "--exit-with-parent", "-v", "null"]) + .unwrap(); let bs1 = nbd.stats_bytes_sent(); let cs1 = nbd.stats_chunks_sent(); -- 2.41.0
Tage Johansson
2023-Aug-20 14:16 UTC
[Libguestfs] [libnbd PATCH v8 04/10] rust: Make it possible to run examples with a URI
Previously, the examples fetch-first-sector and get-size in rust/examples only accepted a unix socket as argument. This commit makes it possible to provide a URI as well. --- rust/examples/fetch-first-sector.rs | 15 +++++++++++---- rust/examples/get-size.rs | 20 +++++++++++++++----- 2 files changed, 26 insertions(+), 9 deletions(-) diff --git a/rust/examples/fetch-first-sector.rs b/rust/examples/fetch-first-sector.rs index 9efb47a..b035ccb 100644 --- a/rust/examples/fetch-first-sector.rs +++ b/rust/examples/fetch-first-sector.rs @@ -7,6 +7,9 @@ //! //! nbdkit -U - floppy . \ //! --run 'cargo run --example fetch-first-sector -- $unixsocket' +//! Or with a URI: +//! nbdkit -U - floppy . \ +//! --run 'cargo run --example fetch-first-sector -- $uri' //! //! The nbdkit floppy plugin creates an MBR disk so the //! first sector is the partition table. @@ -21,11 +24,15 @@ fn main() -> anyhow::Result<()> { if args.len() != 2 { anyhow::bail!("Usage: {:?} socket", args[0]); } - let socket = &args[1]; - // Connect to the NBD server over a - // Unix domain socket. - nbd.connect_unix(socket)?; + // Check if the user provided a URI or a unix socket. + let socket_or_uri = args[1].to_str().unwrap(); + if socket_or_uri.contains("://") { + nbd.connect_uri(socket_or_uri)?; + } else { + // Connect to the NBD server over a Unix domain socket. + nbd.connect_unix(socket_or_uri)?; + } // Read the first sector synchronously. let mut buf = [0; 512]; diff --git a/rust/examples/get-size.rs b/rust/examples/get-size.rs index 7f31df5..7af8e9f 100644 --- a/rust/examples/get-size.rs +++ b/rust/examples/get-size.rs @@ -5,6 +5,12 @@ //! //! nbdkit -U - memory 1M \ //! --run 'cargo run --example get-size -- $unixsocket' +//! Or with a URI: +//! nbdkit -U - memory 1M \ +//! --run 'cargo run --example get-size -- $uri' +//! +//! Or connect over a URI: +//! cargo run --example get-size -- nbd://hostname:port use std::env; @@ -15,15 +21,19 @@ fn main() -> anyhow::Result<()> { if args.len() != 2 { anyhow::bail!("Usage: {:?} socket", args[0]); } - let socket = &args[1]; - // Connect to the NBD server over a - // Unix domain socket. - nbd.connect_unix(socket)?; + // Check if the user provided a URI or a unix socket. + let socket_or_uri = args[1].to_str().unwrap(); + if socket_or_uri.contains("://") { + nbd.connect_uri(socket_or_uri)?; + } else { + // Connect to the NBD server over a Unix domain socket. + nbd.connect_unix(socket_or_uri)?; + } // Read the size in bytes and print it. let size = nbd.get_size()?; - println!("{:?}: size = {size} bytes", socket); + println!("{:?}: size = {size} bytes", socket_or_uri); Ok(()) } -- 2.41.0
Tage Johansson
2023-Aug-20 14:16 UTC
[Libguestfs] [libnbd PATCH v8 05/10] generator: Add information about asynchronous handle calls
A new field (async_kind) is added to the call data type in generator/API.ml*. The purpose is to tell if a certain handle call is an asynchronous command and if so how one can know when it is completed. The motivation for this is that all asynchronous commands on the AsyncHandle in the Rust bindings makes use of Rust's [`async fn`s](https://doc.rust-lang.org/std/keyword.async.html). But to implement such an async fn, the API needs to know when the command completed, either by a completion callback or via a change of state. --- generator/API.ml | 32 ++++++++++++++++++++++++++++++++ generator/API.mli | 11 +++++++++++ 2 files changed, 43 insertions(+) diff --git a/generator/API.ml b/generator/API.ml index 72c8165..99fcb82 100644 --- a/generator/API.ml +++ b/generator/API.ml @@ -32,6 +32,7 @@ type call = { permitted_states : permitted_state list; is_locked : bool; may_set_error : bool; + async_kind : async_kind option; mutable first_version : int * int; } and arg @@ -102,6 +103,9 @@ and permitted_state | Negotiating | Connected | Closed | Dead +and async_kind +| WithCompletionCallback +| ChangesState of string * bool and link | Link of string | SectionLink of string @@ -250,6 +254,7 @@ let default_call = { args = []; optargs = []; ret = RErr; see_also = []; permitted_states = []; is_locked = true; may_set_error = true; + async_kind = None; first_version = (0, 0) } (* Calls. @@ -2802,6 +2807,7 @@ wait for an L<eventfd(2)>."; default_call with args = [ SockAddrAndLen ("addr", "addrlen") ]; ret = RErr; permitted_states = [ Created ]; + async_kind = Some (ChangesState ("aio_is_connecting", false)); shortdesc = "connect to the NBD server"; longdesc = "\ Begin connecting to the NBD server. The C<addr> and C<addrlen> @@ -2814,6 +2820,7 @@ parameters specify the address of the socket to connect to. default_call with args = [ String "uri" ]; ret = RErr; permitted_states = [ Created ]; + async_kind = Some (ChangesState ("aio_is_connecting", false)); shortdesc = "connect to an NBD URI"; longdesc = "\ Begin connecting to the NBD URI C<uri>. Parameters behave as @@ -2827,6 +2834,7 @@ documented in L<nbd_connect_uri(3)>. default_call with args = [ Path "unixsocket" ]; ret = RErr; permitted_states = [ Created ]; + async_kind = Some (ChangesState ("aio_is_connecting", false)); shortdesc = "connect to the NBD server over a Unix domain socket"; longdesc = "\ Begin connecting to the NBD server over Unix domain socket @@ -2841,6 +2849,7 @@ L<nbd_connect_unix(3)>. default_call with args = [ UInt32 "cid"; UInt32 "port" ]; ret = RErr; permitted_states = [ Created ]; + async_kind = Some (ChangesState ("aio_is_connecting", false)); shortdesc = "connect to the NBD server over AF_VSOCK socket"; longdesc = "\ Begin connecting to the NBD server over the C<AF_VSOCK> @@ -2854,6 +2863,7 @@ L<nbd_connect_vsock(3)>. default_call with args = [ String "hostname"; String "port" ]; ret = RErr; permitted_states = [ Created ]; + async_kind = Some (ChangesState ("aio_is_connecting", false)); shortdesc = "connect to the NBD server over a TCP port"; longdesc = "\ Begin connecting to the NBD server listening on C<hostname:port>. @@ -2866,6 +2876,7 @@ Parameters behave as documented in L<nbd_connect_tcp(3)>. default_call with args = [ Fd "sock" ]; ret = RErr; permitted_states = [ Created ]; + async_kind = Some (ChangesState ("aio_is_connecting", false)); shortdesc = "connect directly to a connected socket"; longdesc = "\ Begin connecting to the connected socket C<fd>. @@ -2878,6 +2889,7 @@ Parameters behave as documented in L<nbd_connect_socket(3)>. default_call with args = [ StringList "argv" ]; ret = RErr; permitted_states = [ Created ]; + async_kind = Some (ChangesState ("aio_is_connecting", false)); shortdesc = "connect to the NBD server"; longdesc = "\ Run the command as a subprocess and begin connecting to it over @@ -2891,6 +2903,7 @@ L<nbd_connect_command(3)>. default_call with args = [ StringList "argv" ]; ret = RErr; permitted_states = [ Created ]; + async_kind = Some (ChangesState ("aio_is_connecting", false)); shortdesc = "connect using systemd socket activation"; longdesc = "\ Run the command as a subprocess and begin connecting to it using @@ -2907,6 +2920,7 @@ L<nbd_connect_systemd_socket_activation(3)>. optargs = [ OClosure completion_closure ]; ret = RErr; permitted_states = [ Negotiating ]; + async_kind = Some WithCompletionCallback; shortdesc = "end negotiation and move on to using an export"; longdesc = "\ Request that the server finish negotiation and move on to serving the @@ -2930,6 +2944,7 @@ when L<nbd_aio_is_negotiating(3)> returns true."; default_call with args = []; ret = RErr; permitted_states = [ Negotiating ]; + async_kind = Some (ChangesState ("aio_is_connecting", false)); shortdesc = "end negotiation and close the connection"; longdesc = "\ Request that the server finish negotiation, gracefully if possible, then @@ -2947,6 +2962,7 @@ L<nbd_aio_is_connecting(3)> to return false."; optargs = [ OClosure completion_closure ]; ret = RErr; permitted_states = [ Negotiating ]; + async_kind = Some WithCompletionCallback; shortdesc = "request the server to initiate TLS"; longdesc = "\ Request that the server initiate a secure TLS connection, by @@ -2971,6 +2987,7 @@ callback."; optargs = [ OClosure completion_closure ]; ret = RErr; permitted_states = [ Negotiating ]; + async_kind = Some WithCompletionCallback; shortdesc = "request the server to enable structured replies"; longdesc = "\ Request that the server use structured replies, by sending @@ -2995,6 +3012,7 @@ callback."; optargs = [ OClosure completion_closure ]; ret = RErr; permitted_states = [ Negotiating ]; + async_kind = Some WithCompletionCallback; shortdesc = "request the server to list all exports during negotiation"; longdesc = "\ Request that the server list all exports that it supports. This can @@ -3017,6 +3035,7 @@ callback."; optargs = [ OClosure completion_closure ]; ret = RErr; permitted_states = [ Negotiating ]; + async_kind = Some WithCompletionCallback; shortdesc = "request the server for information about an export"; longdesc = "\ Request that the server supply information about the export name @@ -3040,6 +3059,7 @@ callback."; args = [ Closure context_closure ]; ret = RInt; optargs = [ OClosure completion_closure ]; permitted_states = [ Negotiating ]; + async_kind = Some WithCompletionCallback; shortdesc = "request list of available meta contexts, using implicit query"; longdesc = "\ Request that the server list available meta contexts associated with @@ -3067,6 +3087,7 @@ callback."; args = [ StringList "queries"; Closure context_closure ]; ret = RInt; optargs = [ OClosure completion_closure ]; permitted_states = [ Negotiating ]; + async_kind = Some WithCompletionCallback; shortdesc = "request list of available meta contexts, using explicit query"; longdesc = "\ Request that the server list available meta contexts associated with @@ -3094,6 +3115,7 @@ callback."; args = [ Closure context_closure ]; ret = RInt; optargs = [ OClosure completion_closure ]; permitted_states = [ Negotiating ]; + async_kind = Some WithCompletionCallback; shortdesc = "select specific meta contexts, with implicit query list"; longdesc = "\ Request that the server supply all recognized meta contexts @@ -3127,6 +3149,7 @@ callback."; args = [ StringList "queries"; Closure context_closure ]; ret = RInt; optargs = [ OClosure completion_closure ]; permitted_states = [ Negotiating ]; + async_kind = Some WithCompletionCallback; shortdesc = "select specific meta contexts, with explicit query list"; longdesc = "\ Request that the server supply all recognized meta contexts @@ -3161,6 +3184,7 @@ callback."; OFlags ("flags", cmd_flags, Some []) ]; ret = RCookie; permitted_states = [ Connected ]; + async_kind = Some WithCompletionCallback; shortdesc = "read from the NBD server"; longdesc = "\ Issue a read command to the NBD server. @@ -3195,6 +3219,7 @@ Other parameters behave as documented in L<nbd_pread(3)>." OFlags ("flags", cmd_flags, Some ["DF"]) ]; ret = RCookie; permitted_states = [ Connected ]; + async_kind = Some WithCompletionCallback; shortdesc = "read from the NBD server"; longdesc = "\ Issue a read command to the NBD server. @@ -3227,6 +3252,7 @@ Other parameters behave as documented in L<nbd_pread_structured(3)>." OFlags ("flags", cmd_flags, Some ["FUA"; "PAYLOAD_LEN"]) ]; ret = RCookie; permitted_states = [ Connected ]; + async_kind = Some WithCompletionCallback; shortdesc = "write to the NBD server"; longdesc = "\ Issue a write command to the NBD server. @@ -3246,6 +3272,7 @@ completed. Other parameters behave as documented in L<nbd_pwrite(3)>." default_call with args = []; optargs = [ OFlags ("flags", cmd_flags, Some []) ]; ret = RErr; permitted_states = [ Connected ]; + async_kind = Some (ChangesState ("aio_is_closed", true)); shortdesc = "disconnect from the NBD server"; longdesc = "\ Issue the disconnect command to the NBD server. This is @@ -3273,6 +3300,7 @@ however, L<nbd_shutdown(3)> will call this function if appropriate."; OFlags ("flags", cmd_flags, Some []) ]; ret = RCookie; permitted_states = [ Connected ]; + async_kind = Some WithCompletionCallback; shortdesc = "send flush command to the NBD server"; longdesc = "\ Issue the flush command to the NBD server. @@ -3294,6 +3322,7 @@ Other parameters behave as documented in L<nbd_flush(3)>." OFlags ("flags", cmd_flags, Some ["FUA"]) ]; ret = RCookie; permitted_states = [ Connected ]; + async_kind = Some WithCompletionCallback; shortdesc = "send trim command to the NBD server"; longdesc = "\ Issue a trim command to the NBD server. @@ -3315,6 +3344,7 @@ Other parameters behave as documented in L<nbd_trim(3)>." OFlags ("flags", cmd_flags, Some []) ]; ret = RCookie; permitted_states = [ Connected ]; + async_kind = Some WithCompletionCallback; shortdesc = "send cache (prefetch) command to the NBD server"; longdesc = "\ Issue the cache (prefetch) command to the NBD server. @@ -3337,6 +3367,7 @@ Other parameters behave as documented in L<nbd_cache(3)>." Some ["FUA"; "NO_HOLE"; "FAST_ZERO"]) ]; ret = RCookie; permitted_states = [ Connected ]; + async_kind = Some WithCompletionCallback; shortdesc = "send write zeroes command to the NBD server"; longdesc = "\ Issue a write zeroes command to the NBD server. @@ -3359,6 +3390,7 @@ Other parameters behave as documented in L<nbd_zero(3)>." OFlags ("flags", cmd_flags, Some ["REQ_ONE"]) ]; ret = RCookie; permitted_states = [ Connected ]; + async_kind = Some WithCompletionCallback; shortdesc = "send block status command to the NBD server"; longdesc = "\ Send the block status command to the NBD server. diff --git a/generator/API.mli b/generator/API.mli index c5bba8c..361132d 100644 --- a/generator/API.mli +++ b/generator/API.mli @@ -36,6 +36,11 @@ type call = { {b guaranteed} never to do that we can save a bit of time by setting this to false. *) may_set_error : bool; + (** There are two types of asynchronous functions, those with a completion + callback and those which changes state when completed. This field tells + if the function is asynchronous and in that case how one can check if + it has completed. *) + async_kind : async_kind option; (** The first stable version that the symbol appeared in, for example (1, 2) if the symbol was added in development cycle 1.1.x and thus the first stable version was 1.2. This is @@ -117,6 +122,12 @@ and permitted_state not including CLOSED or DEAD *) | Closed | Dead (** can be called when the handle is CLOSED or DEAD *) +and async_kind +(** The asynchronous call has a completion callback. *) +| WithCompletionCallback +(** The asynchronous call is completed when the given handle call returns the + given boolean value. Might for instance be ("aio_is_connected", false). *) +| ChangesState of string * bool and link | Link of string (** link to L<nbd_PAGE(3)> *) | SectionLink of string (** link to L<libnbd(3)/SECTION> *) -- 2.41.0
Tage Johansson
2023-Aug-20 14:16 UTC
[Libguestfs] [libnbd PATCH v8 06/10] rust: async: Create an async friendly handle type
Create another handle type: AsyncHandle, which makes use of Rust's builtin asynchronous functions (see <https://doc.rust-lang.org/std/keyword.async.html>) and runs on top of the Tokio runtime (see <https://docs.rs/tokio>). For every asynchronous command, like aio_connect(), a corresponding `async` method is created on the handle. In this case it would be: async fn connect(...) -> Result<(), ...> When called, it will poll the file descriptor until the command is complete, and then return with a result. All the synchronous counterparts (like nbd_connect()) are excluded from this handle type as they are unnecessary and since they might interfear with the polling made by the Tokio runtime. For more details about how the asynchronous commands are executed, please see the comments in rust/src/async_handle.rs. --- generator/Rust.ml | 249 +++++++++++++++++++++++++++++++++++- generator/Rust.mli | 2 + generator/generator.ml | 2 + rust/Cargo.toml | 4 +- rust/Makefile.am | 2 + rust/src/async_handle.rs | 268 +++++++++++++++++++++++++++++++++++++++ rust/src/lib.rs | 8 ++ rust/src/utils.rs | 9 ++ scripts/git.orderfile | 1 + 9 files changed, 538 insertions(+), 7 deletions(-) create mode 100644 rust/src/async_handle.rs diff --git a/generator/Rust.ml b/generator/Rust.ml index 431c814..1bc81f0 100644 --- a/generator/Rust.ml +++ b/generator/Rust.ml @@ -61,11 +61,12 @@ let print_rust_flags { flag_prefix; flags } let rec to_upper_snake_case s let s = String.uppercase_ascii s in let s = explode s in - let s = filter_map ( - function - |'-' -> Some "_" | ':' -> None - | ch -> Some (String.make 1 ch) - ) s in + let s + filter_map + (function + | '-' -> Some "_" | ':' -> None | ch -> Some (String.make 1 ch)) + s + in String.concat "" s (* Split a string into a list of chars. In later OCaml we could @@ -75,7 +76,7 @@ and explode str let r = ref [] in for i = 0 to String.length str - 1 do let c = String.unsafe_get str i in - r := c :: !r; + r := c :: !r done; List.rev !r @@ -564,3 +565,239 @@ let generate_rust_bindings () pr "impl Handle {\n"; List.iter print_rust_handle_method handle_calls; pr "}\n\n" + +(*********************************************************) +(* The rest of the file conserns the asynchronous API. *) +(* *) +(* See the comments in rust/src/async_handle.rs for more *) +(* information about how it works. *) +(*********************************************************) + +let excluded_handle_calls : NameSet.t + NameSet.of_list + [ + "aio_get_fd"; + "aio_get_direction"; + "aio_notify_read"; + "aio_notify_write"; + "clear_debug_callback"; + "get_debug"; + "poll"; + "poll2"; + "set_debug"; + "set_debug_callback"; + ] + +(* A mapping with names as keys. *) +module NameMap = Map.Make (String) + +(* Strip "aio_" from the beginning of a string. *) +let strip_aio name : string + if String.starts_with ~prefix:"aio_" name then + String.sub name 4 (String.length name - 4) + else failwithf "Asynchronous call %s must begin with aio_" name + +(* A map with all asynchronous handle calls. The keys are names with "aio_" + stripped, the values are a tuple with the actual name (with "aio_"), the + [call] and the [async_kind]. *) +let async_handle_calls : ((string * call) * async_kind) NameMap.t + handle_calls + |> List.filter (fun (n, _) -> not (NameSet.mem n excluded_handle_calls)) + |> List.filter_map (fun (name, call) -> + call.async_kind + |> Option.map (fun async_kind -> + (strip_aio name, ((name, call), async_kind)))) + |> List.to_seq |> NameMap.of_seq + +(* A mapping with all synchronous (not asynchronous) handle calls. Excluded + are also all synchronous calls that has an asynchronous counterpart. So if + "foo" is the name of a handle call and an asynchronous call "aio_foo" + exists, then "foo" will not b in this map. *) +let sync_handle_calls : call NameMap.t + handle_calls + |> List.filter (fun (n, _) -> not (NameSet.mem n excluded_handle_calls)) + |> List.filter (fun (name, _) -> + (not (NameMap.mem name async_handle_calls)) + && not + (String.starts_with ~prefix:"aio_" name + && NameMap.mem (strip_aio name) async_handle_calls)) + |> List.to_seq |> NameMap.of_seq + +(* Get the Rust type for an argument in the asynchronous API. Like + [rust_arg_type] but no static lifetime on some buffers. *) +let rust_async_arg_type : arg -> string = function + | BytesPersistIn _ -> "&[u8]" + | BytesPersistOut _ -> "&mut [u8]" + | x -> rust_arg_type x + +(* Get the Rust type for an optional argument in the asynchronous API. Like + [rust_optarg_type] but no static lifetime on some closures. *) +let rust_async_optarg_type : optarg -> string = function + | OClosure x -> sprintf "Option<%s>" (rust_async_arg_type (Closure x)) + | x -> rust_optarg_type x + +(* A string of the argument list for a method on the handle, with both + mandotory and optional arguments. *) +let rust_async_handle_call_args { args; optargs } : string + let rust_args_names + List.map rust_arg_name args @ List.map rust_optarg_name optargs + and rust_args_types + List.map rust_async_arg_type args + @ List.map rust_async_optarg_type optargs + in + String.concat ", " + (List.map2 (sprintf "%s: %s") rust_args_names rust_args_types) + +(* Print the Rust function for a not asynchronous handle call. *) +let print_rust_sync_handle_call name call + print_rust_handle_call_comment call; + pr "pub fn %s(&self, %s) -> %s\n" name + (rust_async_handle_call_args call) + (rust_ret_type call); + print_ffi_call name "self.data.handle.handle" call; + pr "\n" + +(* Print the Rust function for an asynchronous handle call with a completion + callback. (Note that "callback" might be abbreviated with "cb" in the + following code. *) +let print_rust_async_handle_call_with_completion_cb name (aio_name, call) + (* An array of all optional arguments. Useful because we need to deel with + the index of the completion callback. *) + let optargs = Array.of_list call.optargs in + (* The index of the completion callback in [optargs] *) + let completion_cb_index + Array.find_map + (fun (i, optarg) -> + match optarg with + | OClosure { cbname } -> + if cbname = "completion" then Some i else None + | _ -> None) + (Array.mapi (fun x y -> (x, y)) optargs) + in + let completion_cb_index + match completion_cb_index with + | Some x -> x + | None -> + failwithf + "The handle call %s is claimed to have a completion callback among \ + its optional arguments by the async_kind field, but so does not \ + seem to be the case." + aio_name + in + let optargs_before_completion_cb + Array.to_list (Array.sub optargs 0 completion_cb_index) + and optargs_after_completion_cb + Array.to_list + (Array.sub optargs (completion_cb_index + 1) + (Array.length optargs - (completion_cb_index + 1))) + in + (* All optional arguments excluding the completion callback. *) + let optargs_without_completion_cb + optargs_before_completion_cb @ optargs_after_completion_cb + in + print_rust_handle_call_comment call; + pr "pub async fn %s(&self, %s) -> SharedResult<()> {\n" name + (rust_async_handle_call_args + { call with optargs = optargs_without_completion_cb }); + pr " // A oneshot channel to notify when the call is completed.\n"; + pr " let (ret_tx, ret_rx) = oneshot::channel::<SharedResult<()>>();\n"; + pr " let (ccb_tx, mut ccb_rx) = oneshot::channel::<c_int>();\n"; + (* Completion callback: *) + pr " let %s = Some(utils::fn_once_to_fn_mut(|err: &mut i32| {\n" + (rust_optarg_name (Array.get optargs completion_cb_index)); + pr " ccb_tx.send(*err).ok();\n"; + pr " 1\n"; + pr " }));\n"; + (* End of completion callback. *) + print_ffi_call aio_name "self.data.handle.handle" call; + pr "?;\n"; + pr " let mut ret_tx = Some(ret_tx);\n"; + pr " let completion_predicate = \n"; + pr " move |_handle: &Handle, res: &SharedResult<()>| {\n"; + pr " let ret = match res {\n"; + pr " Err(e) if e.is_fatal() => res.clone(),\n"; + pr " _ => {\n"; + pr " let Ok(errno) = ccb_rx.try_recv() else { return false; };\n"; + pr " if errno == 0 {\n"; + pr " Ok(())\n"; + pr " } else {\n"; + pr " if let Err(e) = res {\n"; + pr " Err(e.clone())\n"; + pr " } else {\n"; + pr " Err(Arc::new("; + pr " Error::Recoverable(ErrorKind::from_errno(errno))))\n"; + pr " }\n"; + pr " }\n"; + pr " },\n"; + pr " };\n"; + pr " ret_tx.take().unwrap().send(ret).ok();\n"; + pr " true\n"; + pr " };\n"; + pr " self.add_command(completion_predicate)?;\n"; + pr " ret_rx.await.unwrap()\n"; + pr "}\n\n" + +(* Print a Rust function for an asynchronous handle call which signals + completion by changing state. The predicate is a call like + "aio_is_connecting" which should get the value (like false) for the call to + be complete. *) +let print_rust_async_handle_call_changing_state name (aio_name, call) + (predicate, value) + let value = if value then "true" else "false" in + print_rust_handle_call_comment call; + pr "pub async fn %s(&self, %s) -> SharedResult<()>\n" name + (rust_async_handle_call_args call); + pr "{\n"; + print_ffi_call aio_name "self.data.handle.handle" call; + pr "?;\n"; + pr " let (ret_tx, ret_rx) = oneshot::channel::<SharedResult<()>>();\n"; + pr " let mut ret_tx = Some(ret_tx);\n"; + pr " let completion_predicate = \n"; + pr " move |handle: &Handle, res: &SharedResult<()>| {\n"; + pr " let ret = if let Err(_) = res {\n"; + pr " res.clone()\n"; + pr " } else {\n"; + pr " if handle.%s() != %s { return false; }\n" predicate value; + pr " else { Ok(()) }\n"; + pr " };\n"; + pr " ret_tx.take().unwrap().send(ret).ok();\n"; + pr " true\n"; + pr " };\n"; + pr " self.add_command(completion_predicate)?;\n"; + pr " ret_rx.await.unwrap()\n"; + pr "}\n\n" + +(* Print an impl with all handle calls. *) +let print_rust_async_handle_impls () + pr "impl AsyncHandle {\n"; + NameMap.iter print_rust_sync_handle_call sync_handle_calls; + NameMap.iter + (fun name (call, async_kind) -> + match async_kind with + | WithCompletionCallback -> + print_rust_async_handle_call_with_completion_cb name call + | ChangesState (predicate, value) -> + print_rust_async_handle_call_changing_state name call + (predicate, value)) + async_handle_calls; + pr "}\n\n" + +let print_rust_async_imports () + pr "use crate::{*, types::*};\n"; + pr "use os_socketaddr::OsSocketAddr;\n"; + pr "use std::ffi::*;\n"; + pr "use std::mem;\n"; + pr "use std::net::SocketAddr;\n"; + pr "use std::os::fd::{AsRawFd, OwnedFd};\n"; + pr "use std::os::unix::prelude::*;\n"; + pr "use std::path::PathBuf;\n"; + pr "use std::ptr;\n"; + pr "use std::sync::Arc;\n"; + pr "use tokio::sync::oneshot;\n"; + pr "\n" + +let generate_rust_async_bindings () + generate_header CStyle ~copyright:"Tage Johansson"; + pr "\n"; + print_rust_async_imports (); + print_rust_async_handle_impls () diff --git a/generator/Rust.mli b/generator/Rust.mli index 450e4ca..0960170 100644 --- a/generator/Rust.mli +++ b/generator/Rust.mli @@ -18,3 +18,5 @@ (* Print all flag-structs, enums, constants and handle calls in Rust code. *) val generate_rust_bindings : unit -> unit + +val generate_rust_async_bindings : unit -> unit diff --git a/generator/generator.ml b/generator/generator.ml index 8c9a585..11bec4d 100644 --- a/generator/generator.ml +++ b/generator/generator.ml @@ -69,3 +69,5 @@ let () RustSys.generate_rust_sys_bindings; output_to ~formatter:(Some Rustfmt) "rust/src/bindings.rs" Rust.generate_rust_bindings; + output_to ~formatter:(Some Rustfmt) "rust/src/async_bindings.rs" + Rust.generate_rust_async_bindings; diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 01555de..a9b5988 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -48,9 +48,11 @@ os_socketaddr = "0.2.4" thiserror = "1.0.40" log = { version = "0.4.19", optional = true } libc = "0.2.147" +tokio = { optional = true, version = "1.29.1", default-features = false, features = ["rt", "sync", "net"] } +epoll = "4.3.3" [features] -default = ["log"] +default = ["log", "tokio"] [dev-dependencies] anyhow = "1.0.72" diff --git a/rust/Makefile.am b/rust/Makefile.am index 7098c9a..2b5b85b 100644 --- a/rust/Makefile.am +++ b/rust/Makefile.am @@ -19,6 +19,7 @@ include $(top_srcdir)/subdir-rules.mk generator_built = \ libnbd-sys/src/generated.rs \ + src/async_bindings.rs \ src/bindings.rs \ $(NULL) @@ -30,6 +31,7 @@ source_files = \ src/handle.rs \ src/types.rs \ src/utils.rs \ + src/async_handle.rs \ examples/connect-command.rs \ examples/get-size.rs \ examples/fetch-first-sector.rs \ diff --git a/rust/src/async_handle.rs b/rust/src/async_handle.rs new file mode 100644 index 0000000..4223b80 --- /dev/null +++ b/rust/src/async_handle.rs @@ -0,0 +1,268 @@ +// nbd client library in userspace +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +// This module implements an asynchronous handle working on top of the +// [Tokio](https://tokio.rs) runtime. When the handle is created, +// a "polling task" is spawned on the Tokio runtime. The purpose of that +// "polling task" is to call `aio_notify_*` when appropriate. It shares a +// reference to the handle as well as some other things with the handle in the +// [HandleData] struct. The "polling task" is sleeping when no command is in +// flight, but wakes up as soon as any command is issued. +// +// The commands are implemented as +// [`async fn`s](https://doc.rust-lang.org/std/keyword.async.html) +// in async_bindings.rs. When a new command is issued, it registers a +// completion predicate with [Handle::add_command]. That predicate takes a +// reference to the handle and should return [true] iff the command is complete. +// Whenever some work is performed in the polling task, the completion +// predicates for all pending commands are called. + +#![allow(unused_imports)] // XXX: remove this +use crate::sys; +use crate::Handle; +use crate::{Error, FatalErrorKind, Result}; +use crate::{AIO_DIRECTION_BOTH, AIO_DIRECTION_READ, AIO_DIRECTION_WRITE}; +use epoll::Events; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::io::{unix::AsyncFd, Interest, Ready as IoReady}; +use tokio::sync::{broadcast, Notify}; +use tokio::task; + +/// A custom result type with a shared [crate::Error] as default error type. +pub type SharedResult<T, E = Arc<Error>> = Result<T, E>; + +/// An NBD handle using Rust's `async` functionality on top of the +/// [Tokio](https://docs.rs/tokio/) runtime. +pub struct AsyncHandle { + /// Data shared both by this struct and the polling task. + pub(crate) data: Arc<HandleData>, + + /// A task which soely purpose is to poll the NBD handle. + polling_task: tokio::task::AbortHandle, +} + +pub(crate) struct HandleData { + /// The underliing handle. + pub handle: Handle, + + /// A list of all pending commands. + /// + /// For every pending command (commands in flight), a predicate will be + /// stored in this list. Whenever some progress is made on the file + /// descriptor, the predicate is called with a reference to the handle + /// and a reference to the result of that call to `aio_notify_*`. + /// Iff the predicate returns [true], the command is considered completed + /// and removed from this list. + /// + /// If The polling task dies for some reason, this [SharedResult] will be + /// set to some error. + pub pending_commands: Mutex< + SharedResult< + Vec< + Box< + dyn FnMut(&Handle, &SharedResult<()>) -> bool + + Send + + Sync + + 'static, + >, + >, + >, + >, + + /// A notifier used by commands to notify the polling task when a new + /// asynchronous command is issued. + pub new_command: Notify, +} + +impl AsyncHandle { + pub fn new() -> Result<Self> { + let handle_data = Arc::new(HandleData { + handle: Handle::new()?, + pending_commands: Mutex::new(Ok(Vec::new())), + new_command: Notify::new(), + }); + + let handle_data_2 = handle_data.clone(); + let polling_task = task::spawn(async move { + // The polling task should never finish without an error. If the + // handle is dropped, the task is aborted so it'll not return in + // that case either. + let Err(err) = polling_task(&handle_data_2).await else { + unreachable!() + }; + let err = Arc::new(Error::Fatal(err)); + // Call the completion predicates for all pending commands with the + // error. + let mut pending_cmds + handle_data_2.pending_commands.lock().unwrap(); + let res = Err(err); + for f in pending_cmds.as_mut().unwrap().iter_mut() { + f(&handle_data_2.handle, &res); + } + *pending_cmds = Err(res.unwrap_err()); + }) + .abort_handle(); + Ok(Self { + data: handle_data, + polling_task, + }) + } + + /// Get the underliing C pointer to the handle. + pub(crate) fn raw_handle(&self) -> *mut sys::nbd_handle { + self.data.handle.raw_handle() + } + + /// Call this method when a new command is issued. As argument is passed a + /// predicate which should return [true] iff the command is completed. + pub(crate) fn add_command( + &self, + mut completion_predicate: impl FnMut(&Handle, &SharedResult<()>) -> bool + + Send + + Sync + + 'static, + ) -> SharedResult<()> { + if !completion_predicate(&self.data.handle, &Ok(())) { + let mut pending_cmds_lock + self.data.pending_commands.lock().unwrap(); + pending_cmds_lock + .as_mut() + .map_err(|e| e.clone())? + .push(Box::new(completion_predicate)); + self.data.new_command.notify_one(); + } + Ok(()) + } +} + +impl Drop for AsyncHandle { + fn drop(&mut self) { + self.polling_task.abort(); + } +} + +/// Get the read/write direction that the handle wants on the file descriptor. +fn get_fd_interest(handle: &Handle) -> Option<Interest> { + match handle.aio_get_direction() { + 0 => None, + AIO_DIRECTION_READ => Some(Interest::READABLE), + AIO_DIRECTION_WRITE => Some(Interest::WRITABLE), + AIO_DIRECTION_BOTH => Some(Interest::READABLE | Interest::WRITABLE), + _ => unreachable!(), + } +} + +/// A task that will run as long as the handle is alive. It will poll the +/// file descriptor when new data is availlable. +async fn polling_task(handle_data: &HandleData) -> Result<(), FatalErrorKind> { + let HandleData { + handle, + pending_commands, + new_command, + } = handle_data; + let fd = handle.aio_get_fd().map_err(Error::to_fatal)?; + // XXX: Might the file descriptor ever be changed? + let tokio_fd = AsyncFd::new(fd)?; + let epfd = epoll::create(false)?; + epoll::ctl( + epfd, + epoll::ControlOptions::EPOLL_CTL_ADD, + fd, + epoll::Event::new(Events::EPOLLIN | Events::EPOLLOUT, 42), + )?; + + // The following loop does approximately the following things: + // + // 1. Determine what Libnbd wants to do next on the file descriptor, + // (read/write/both/none), and store that in [fd_interest]. + // 2. Wait for either: + // a) That interest to be available on the file descriptor in which case: + // I. Call the correct `aio_notify_*` method. + // II. Execute step 1. + // III. Send the result of the call to `aio_notify_*` on + // [result_channel] to notify pending commands that some progress + // has been made. + // IV. Resume execution from step 2. + // b) A notification was received on [new_command] signaling that a new + // command was registered and that the intrest on the file descriptor + // might has changed. Resume execution from step 1. + loop { + let Some(fd_interest) = get_fd_interest(handle) else { + // The handle does not wait for any data of the file descriptor, + // so we wait until some command is issued. + new_command.notified().await; + continue; + }; + + if pending_commands + .lock() + .unwrap() + .as_ref() + .unwrap() + .is_empty() + { + // No command is pending so there is no point to do anything. + new_command.notified().await; + continue; + } + + // Wait for the requested interest to be available on the fd. + let mut ready_guard = tokio_fd.ready(fd_interest).await?; + let readyness = ready_guard.ready(); + let res = if readyness.is_readable() && fd_interest.is_readable() { + handle.aio_notify_read() + } else if readyness.is_writable() && fd_interest.is_writable() { + handle.aio_notify_write() + } else { + continue; + }; + let res = match res { + Ok(()) => Ok(()), + Err(e @ Error::Recoverable(_)) => Err(Arc::new(e)), + Err(Error::Fatal(e)) => return Err(e), + }; + + // Call the completion predicates of all pending commands. + let mut pending_cmds_lock = pending_commands.lock().unwrap(); + let pending_cmds = pending_cmds_lock.as_mut().unwrap(); + let mut i = 0; + while i < pending_cmds.len() { + if (pending_cmds[i])(handle, &res) { + let _ = pending_cmds.swap_remove(i); + } else { + i += 1; + } + } + drop(pending_cmds_lock); + + // Use epoll to check the current read/write availability on the fd. + // This is needed because Tokio does only support edge-triggered + // notifications but Libnbd requires level-triggered notifications. + let mut revent = epoll::Event { data: 0, events: 0 }; + // Setting timeout to 0 means that it will return immediately. + epoll::wait(epfd, 0, std::slice::from_mut(&mut revent))?; + let revents = Events::from_bits(revent.events).unwrap(); + if !revents.contains(Events::EPOLLIN) { + ready_guard.clear_ready_matching(IoReady::READABLE); + } + if !revents.contains(Events::EPOLLOUT) { + ready_guard.clear_ready_matching(IoReady::WRITABLE); + } + ready_guard.retain_ready(); + } +} diff --git a/rust/src/lib.rs b/rust/src/lib.rs index a6f3131..56316b4 100644 --- a/rust/src/lib.rs +++ b/rust/src/lib.rs @@ -17,11 +17,19 @@ #![deny(warnings)] +#[cfg(feature = "tokio")] +mod async_bindings; +#[cfg(feature = "tokio")] +mod async_handle; mod bindings; mod error; mod handle; pub mod types; mod utils; +#[cfg(feature = "tokio")] +pub use async_bindings::*; +#[cfg(feature = "tokio")] +pub use async_handle::{AsyncHandle, SharedResult}; pub use bindings::*; pub use error::{Error, ErrorKind, FatalErrorKind, Result}; pub use handle::Handle; diff --git a/rust/src/utils.rs b/rust/src/utils.rs index b8200c1..8984ebb 100644 --- a/rust/src/utils.rs +++ b/rust/src/utils.rs @@ -21,3 +21,12 @@ use std::ffi::c_void; pub unsafe extern "C" fn drop_data<T>(data: *mut c_void) { drop(Box::from_raw(data as *mut T)) } + +/// Turn a [FnOnce] (with a single `&mut` argument) to a [FnMut] +/// which panics on the second invocation. +pub fn fn_once_to_fn_mut<T, U>( + f: impl FnOnce(&mut T) -> U, +) -> impl FnMut(&mut T) -> U { + let mut f = Some(f); + move |x| (f.take().unwrap())(x) +} diff --git a/scripts/git.orderfile b/scripts/git.orderfile index b988d87..60ec56d 100644 --- a/scripts/git.orderfile +++ b/scripts/git.orderfile @@ -69,6 +69,7 @@ rust/src/types.rs rust/src/utils.rs rust/src/lib.rs rust/src/handle.rs +rust/src/async_handle.rs rust/libnbd-sys/* rust/examples/* rust/tests/* -- 2.41.0
Tage Johansson
2023-Aug-20 14:16 UTC
[Libguestfs] [libnbd PATCH v8 07/10] generator: Add `modifies_fd` flag to the [call] structure
Add a flag (modifies_fd) to the call structure in generator/API.ml which is set to true if the handle call may do something with the file descriptor. That is, it is true for all calls which are or may call aio_notify_*, including all synchronous commands like nbd_connect or nbd_opt_go. The motivation for this is that the asynchronous handle in the Rust bindings uses its own loop for polling, modifying the file descriptor outside of this loop may cause unexpected behaviour. Including that the handle may hang. All commands which set this flag to true will be excluded from that handle. The asynchronous (aio_*) functions will be used instead. --- generator/API.ml | 32 ++++++++++++++++++++++++++++++++ generator/API.mli | 7 +++++++ 2 files changed, 39 insertions(+) diff --git a/generator/API.ml b/generator/API.ml index 99fcb82..b0f5e2a 100644 --- a/generator/API.ml +++ b/generator/API.ml @@ -33,6 +33,7 @@ type call = { is_locked : bool; may_set_error : bool; async_kind : async_kind option; + modifies_fd: bool; mutable first_version : int * int; } and arg @@ -255,6 +256,7 @@ let default_call = { args = []; optargs = []; ret = RErr; permitted_states = []; is_locked = true; may_set_error = true; async_kind = None; + modifies_fd = false; first_version = (0, 0) } (* Calls. @@ -1162,6 +1164,7 @@ Return true if option negotiation mode was enabled on this handle."; default_call with args = []; ret = RErr; permitted_states = [ Negotiating ]; + modifies_fd = true; shortdesc = "end negotiation and move on to using an export"; longdesc = "\ Request that the server finish negotiation and move on to serving the @@ -1189,6 +1192,7 @@ although older servers will instead have killed the connection."; default_call with args = []; ret = RErr; permitted_states = [ Negotiating ]; + modifies_fd = true; shortdesc = "end negotiation and close the connection"; longdesc = "\ Request that the server finish negotiation, gracefully if possible, then @@ -1202,6 +1206,7 @@ enabled option mode."; default_call with args = []; ret = RBool; permitted_states = [ Negotiating ]; + modifies_fd = true; shortdesc = "request the server to initiate TLS"; longdesc = "\ Request that the server initiate a secure TLS connection, by @@ -1240,6 +1245,7 @@ established, as reported by L<nbd_get_tls_negotiated(3)>."; default_call with args = []; ret = RBool; permitted_states = [ Negotiating ]; + modifies_fd = true; shortdesc = "request the server to enable structured replies"; longdesc = "\ Request that the server use structured replies, by sending @@ -1266,6 +1272,7 @@ later calls to this function return false."; default_call with args = [ Closure list_closure ]; ret = RInt; permitted_states = [ Negotiating ]; + modifies_fd = true; shortdesc = "request the server to list all exports during negotiation"; longdesc = "\ Request that the server list all exports that it supports. This can @@ -1307,6 +1314,7 @@ description is set with I<-D>."; default_call with args = []; ret = RErr; permitted_states = [ Negotiating ]; + modifies_fd = true; shortdesc = "request the server for information about an export"; longdesc = "\ Request that the server supply information about the export name @@ -1338,6 +1346,7 @@ corresponding L<nbd_opt_go(3)> would succeed."; default_call with args = [ Closure context_closure ]; ret = RInt; permitted_states = [ Negotiating ]; + modifies_fd = true; shortdesc = "list available meta contexts, using implicit query list"; longdesc = "\ Request that the server list available meta contexts associated with @@ -1393,6 +1402,7 @@ a server may send a lengthy list."; default_call with args = [ StringList "queries"; Closure context_closure ]; ret = RInt; permitted_states = [ Negotiating ]; + modifies_fd = true; shortdesc = "list available meta contexts, using explicit query list"; longdesc = "\ Request that the server list available meta contexts associated with @@ -1443,6 +1453,7 @@ a server may send a lengthy list."; default_call with args = [ Closure context_closure ]; ret = RInt; permitted_states = [ Negotiating ]; + modifies_fd = true; shortdesc = "select specific meta contexts, using implicit query list"; longdesc = "\ Request that the server supply all recognized meta contexts @@ -1502,6 +1513,7 @@ no contexts are reported, or may fail but have a non-empty list."; default_call with args = [ StringList "queries"; Closure context_closure ]; ret = RInt; permitted_states = [ Negotiating ]; + modifies_fd = true; shortdesc = "select specific meta contexts, using explicit query list"; longdesc = "\ Request that the server supply all recognized meta contexts @@ -1731,6 +1743,7 @@ parameter in NBD URIs is allowed."; default_call with args = [ String "uri" ]; ret = RErr; permitted_states = [ Created ]; + modifies_fd = true; shortdesc = "connect to NBD URI"; longdesc = "\ Connect (synchronously) to an NBD server and export by specifying @@ -1909,6 +1922,7 @@ See L<nbd_get_uri(3)>."; default_call with args = [ Path "unixsocket" ]; ret = RErr; permitted_states = [ Created ]; + modifies_fd = true; shortdesc = "connect to NBD server over a Unix domain socket"; longdesc = "\ Connect (synchronously) over the named Unix domain socket (C<unixsocket>) @@ -1922,6 +1936,7 @@ to an NBD server running on the same machine. default_call with args = [ UInt32 "cid"; UInt32 "port" ]; ret = RErr; permitted_states = [ Created ]; + modifies_fd = true; shortdesc = "connect to NBD server over AF_VSOCK protocol"; longdesc = "\ Connect (synchronously) over the C<AF_VSOCK> protocol from a @@ -1942,6 +1957,7 @@ built on a system with vsock support, see L<nbd_supports_vsock(3)>. default_call with args = [ String "hostname"; String "port" ]; ret = RErr; permitted_states = [ Created ]; + modifies_fd = true; shortdesc = "connect to NBD server over a TCP port"; longdesc = "\ Connect (synchronously) to the NBD server listening on @@ -1956,6 +1972,7 @@ such as C<\"10809\">. default_call with args = [ Fd "sock" ]; ret = RErr; permitted_states = [ Created ]; + modifies_fd = true; shortdesc = "connect directly to a connected socket"; longdesc = "\ Pass a connected socket C<sock> through which libnbd will talk @@ -1977,6 +1994,7 @@ handle is closed. The caller must not use the socket in any way. default_call with args = [ StringList "argv" ]; ret = RErr; permitted_states = [ Created ]; + modifies_fd = true; shortdesc = "connect to NBD server command"; longdesc = "\ Run the command as a subprocess and connect to it over @@ -2012,6 +2030,7 @@ is killed. default_call with args = [ StringList "argv" ]; ret = RErr; permitted_states = [ Created ]; + modifies_fd = true; shortdesc = "connect using systemd socket activation"; longdesc = "\ Run the command as a subprocess and connect to it using @@ -2385,6 +2404,7 @@ requests sizes. optargs = [ OFlags ("flags", cmd_flags, Some []) ]; ret = RErr; permitted_states = [ Connected ]; + modifies_fd = true; shortdesc = "read from the NBD server"; longdesc = "\ Issue a read command to the NBD server for the range starting @@ -2424,6 +2444,7 @@ on failure." optargs = [ OFlags ("flags", cmd_flags, Some ["DF"]) ]; ret = RErr; permitted_states = [ Connected ]; + modifies_fd = true; shortdesc = "read from the NBD server"; longdesc = "\ Issue a read command to the NBD server for the range starting @@ -2516,6 +2537,7 @@ on failure." optargs = [ OFlags ("flags", cmd_flags, Some ["FUA"; "PAYLOAD_LEN"]) ]; ret = RErr; permitted_states = [ Connected ]; + modifies_fd = true; shortdesc = "write to the NBD server"; longdesc = "\ Issue a write command to the NBD server, writing the data in @@ -2552,6 +2574,7 @@ extended headers were negotiated." args = []; optargs = [ OFlags ("flags", shutdown_flags, None) ]; ret = RErr; permitted_states = [ Connected ]; + modifies_fd = true; shortdesc = "disconnect from the NBD server"; longdesc = "\ Issue the disconnect command to the NBD server. This is @@ -2589,6 +2612,7 @@ A future version of the library may add new flags."; default_call with args = []; optargs = [ OFlags ("flags", cmd_flags, Some []) ]; ret = RErr; permitted_states = [ Connected ]; + modifies_fd = true; shortdesc = "send flush command to the NBD server"; longdesc = "\ Issue the flush command to the NBD server. The function should @@ -2609,6 +2633,7 @@ protocol extensions)." optargs = [ OFlags ("flags", cmd_flags, Some ["FUA"]) ]; ret = RErr; permitted_states = [ Connected ]; + modifies_fd = true; shortdesc = "send trim command to the NBD server"; longdesc = "\ Issue a trim command to the NBD server, which if supported @@ -2640,6 +2665,7 @@ L<nbd_can_fua(3)>)." optargs = [ OFlags ("flags", cmd_flags, Some []) ]; ret = RErr; permitted_states = [ Connected ]; + modifies_fd = true; shortdesc = "send cache (prefetch) command to the NBD server"; longdesc = "\ Issue the cache (prefetch) command to the NBD server, which @@ -2669,6 +2695,7 @@ protocol extensions)." Some ["FUA"; "NO_HOLE"; "FAST_ZERO"]) ]; ret = RErr; permitted_states = [ Connected ]; + modifies_fd = true; shortdesc = "send write zeroes command to the NBD server"; longdesc = "\ Issue a write zeroes command to the NBD server, which if supported @@ -2707,6 +2734,7 @@ cannot do this, see L<nbd_can_fast_zero(3)>)." optargs = [ OFlags ("flags", cmd_flags, Some ["REQ_ONE"]) ]; ret = RErr; permitted_states = [ Connected ]; + modifies_fd = true; shortdesc = "send block status command to the NBD server"; longdesc = "\ Issue the block status command to the NBD server. If @@ -2773,6 +2801,7 @@ validate that the server obeyed the flag." "poll", { default_call with args = [ Int "timeout" ]; ret = RInt; + modifies_fd = true; shortdesc = "poll the handle once"; longdesc = "\ This is a simple implementation of L<poll(2)> which is used @@ -2794,6 +2823,7 @@ intended as something you would use."; "poll2", { default_call with args = [Fd "fd"; Int "timeout" ]; ret = RInt; + modifies_fd = true; shortdesc = "poll the handle once, with fd"; longdesc = "\ This is the same as L<nbd_poll(3)>, but an additional @@ -3477,6 +3507,7 @@ and invalidate the need to write more commands. "aio_notify_read", { default_call with args = []; ret = RErr; + modifies_fd = true; shortdesc = "notify that the connection is readable"; longdesc = "\ Send notification to the state machine that the connection @@ -3488,6 +3519,7 @@ connection is readable."; "aio_notify_write", { default_call with args = []; ret = RErr; + modifies_fd = true; shortdesc = "notify that the connection is writable"; longdesc = "\ Send notification to the state machine that the connection diff --git a/generator/API.mli b/generator/API.mli index 361132d..351170e 100644 --- a/generator/API.mli +++ b/generator/API.mli @@ -41,6 +41,13 @@ type call = { if the function is asynchronous and in that case how one can check if it has completed. *) async_kind : async_kind option; + (** A flag telling if the call may do something with the file descriptor. + Some bindings needs exclusive access to the file descriptor and can not + allow the user to call [aio_notify_read] or [aio_notify_write], neither + directly nor indirectly from another call. So all calls that might trigger + any of these functions to be called, including all synchronous commands + like [pread] or [connect], should set this to [true]. *) + modifies_fd : bool; (** The first stable version that the symbol appeared in, for example (1, 2) if the symbol was added in development cycle 1.1.x and thus the first stable version was 1.2. This is -- 2.41.0
Tage Johansson
2023-Aug-20 14:16 UTC
[Libguestfs] [libnbd PATCH v8 08/10] rust: async: Use the modifies_fd flag to exclude calls
All handle calls which has the modifies_fd flag set to true will be excluded from AsyncHandle (the asynchronous handle in the rust bindings). This is a better approach then listing all calls that should be excluded in Rust.ml explicetly. --- generator/Rust.ml | 60 +++++++++++++++++++++-------------------------- 1 file changed, 27 insertions(+), 33 deletions(-) diff --git a/generator/Rust.ml b/generator/Rust.ml index 1bc81f0..323ccf1 100644 --- a/generator/Rust.ml +++ b/generator/Rust.ml @@ -575,18 +575,17 @@ let generate_rust_bindings () let excluded_handle_calls : NameSet.t NameSet.of_list - [ - "aio_get_fd"; - "aio_get_direction"; - "aio_notify_read"; - "aio_notify_write"; - "clear_debug_callback"; - "get_debug"; - "poll"; - "poll2"; - "set_debug"; - "set_debug_callback"; - ] + @@ [ + "aio_get_fd"; + "aio_get_direction"; + "clear_debug_callback"; + "get_debug"; + "set_debug"; + "set_debug_callback"; + ] + @ (handle_calls + |> List.filter (fun (_, { modifies_fd }) -> modifies_fd) + |> List.map (fun (name, _) -> name)) (* A mapping with names as keys. *) module NameMap = Map.Make (String) @@ -597,16 +596,16 @@ let strip_aio name : string String.sub name 4 (String.length name - 4) else failwithf "Asynchronous call %s must begin with aio_" name -(* A map with all asynchronous handle calls. The keys are names with "aio_" - stripped, the values are a tuple with the actual name (with "aio_"), the - [call] and the [async_kind]. *) +(* A map with all asynchronous handle calls. The keys are names, the values + are tuples of: the name with "aio_" stripped, the [call] and the + [async_kind]. *) let async_handle_calls : ((string * call) * async_kind) NameMap.t handle_calls |> List.filter (fun (n, _) -> not (NameSet.mem n excluded_handle_calls)) |> List.filter_map (fun (name, call) -> call.async_kind |> Option.map (fun async_kind -> - (strip_aio name, ((name, call), async_kind)))) + (name, ((strip_aio name, call), async_kind)))) |> List.to_seq |> NameMap.of_seq (* A mapping with all synchronous (not asynchronous) handle calls. Excluded @@ -616,11 +615,7 @@ let async_handle_calls : ((string * call) * async_kind) NameMap.t let sync_handle_calls : call NameMap.t handle_calls |> List.filter (fun (n, _) -> not (NameSet.mem n excluded_handle_calls)) - |> List.filter (fun (name, _) -> - (not (NameMap.mem name async_handle_calls)) - && not - (String.starts_with ~prefix:"aio_" name - && NameMap.mem (strip_aio name) async_handle_calls)) + |> List.filter (fun (n, _) -> not (NameMap.mem n async_handle_calls)) |> List.to_seq |> NameMap.of_seq (* Get the Rust type for an argument in the asynchronous API. Like @@ -660,7 +655,7 @@ let print_rust_sync_handle_call name call (* Print the Rust function for an asynchronous handle call with a completion callback. (Note that "callback" might be abbreviated with "cb" in the following code. *) -let print_rust_async_handle_call_with_completion_cb name (aio_name, call) +let print_rust_async_handle_call_with_completion_cb aio_name (name, call) (* An array of all optional arguments. Useful because we need to deel with the index of the completion callback. *) let optargs = Array.of_list call.optargs in @@ -741,7 +736,7 @@ let print_rust_async_handle_call_with_completion_cb name (aio_name, call) completion by changing state. The predicate is a call like "aio_is_connecting" which should get the value (like false) for the call to be complete. *) -let print_rust_async_handle_call_changing_state name (aio_name, call) +let print_rust_async_handle_call_changing_state aio_name (name, call) (predicate, value) let value = if value then "true" else "false" in print_rust_handle_call_comment call; @@ -770,16 +765,15 @@ let print_rust_async_handle_call_changing_state name (aio_name, call) (* Print an impl with all handle calls. *) let print_rust_async_handle_impls () pr "impl AsyncHandle {\n"; - NameMap.iter print_rust_sync_handle_call sync_handle_calls; - NameMap.iter - (fun name (call, async_kind) -> - match async_kind with - | WithCompletionCallback -> - print_rust_async_handle_call_with_completion_cb name call - | ChangesState (predicate, value) -> - print_rust_async_handle_call_changing_state name call - (predicate, value)) - async_handle_calls; + sync_handle_calls |> NameMap.iter print_rust_sync_handle_call; + async_handle_calls + |> NameMap.iter (fun name (call, async_kind) -> + match async_kind with + | WithCompletionCallback -> + print_rust_async_handle_call_with_completion_cb name call + | ChangesState (predicate, value) -> + print_rust_async_handle_call_changing_state name call + (predicate, value)); pr "}\n\n" let print_rust_async_imports () -- 2.41.0
Tage Johansson
2023-Aug-20 14:16 UTC
[Libguestfs] [libnbd PATCH v8 09/10] rust: async: Add a couple of integration tests
Add a couple of integration tests as rust/tests/test_async_*.rs. They are very similar to the tests for the synchronous API. --- rust/Cargo.toml | 1 + rust/tests/test_async_100_handle.rs | 25 +++ rust/tests/test_async_200_connect_command.rs | 26 +++ rust/tests/test_async_210_opt_abort.rs | 32 ++++ rust/tests/test_async_220_opt_list.rs | 86 ++++++++++ rust/tests/test_async_230_opt_info.rs | 122 ++++++++++++++ rust/tests/test_async_240_opt_list_meta.rs | 150 ++++++++++++++++++ .../test_async_245_opt_list_meta_queries.rs | 94 +++++++++++ rust/tests/test_async_250_opt_set_meta.rs | 125 +++++++++++++++ .../test_async_255_opt_set_meta_queries.rs | 110 +++++++++++++ rust/tests/test_async_400_pread.rs | 40 +++++ rust/tests/test_async_405_pread_structured.rs | 84 ++++++++++ rust/tests/test_async_410_pwrite.rs | 59 +++++++ rust/tests/test_async_460_block_status.rs | 98 ++++++++++++ rust/tests/test_async_620_stats.rs | 69 ++++++++ 15 files changed, 1121 insertions(+) create mode 100644 rust/tests/test_async_100_handle.rs create mode 100644 rust/tests/test_async_200_connect_command.rs create mode 100644 rust/tests/test_async_210_opt_abort.rs create mode 100644 rust/tests/test_async_220_opt_list.rs create mode 100644 rust/tests/test_async_230_opt_info.rs create mode 100644 rust/tests/test_async_240_opt_list_meta.rs create mode 100644 rust/tests/test_async_245_opt_list_meta_queries.rs create mode 100644 rust/tests/test_async_250_opt_set_meta.rs create mode 100644 rust/tests/test_async_255_opt_set_meta_queries.rs create mode 100644 rust/tests/test_async_400_pread.rs create mode 100644 rust/tests/test_async_405_pread_structured.rs create mode 100644 rust/tests/test_async_410_pwrite.rs create mode 100644 rust/tests/test_async_460_block_status.rs create mode 100644 rust/tests/test_async_620_stats.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index a9b5988..c49f9f2 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -59,3 +59,4 @@ anyhow = "1.0.72" once_cell = "1.18.0" pretty-hex = "0.3.0" tempfile = "3.6.0" +tokio = { version = "1.29.1", default-features = false, features = ["rt-multi-thread", "macros"] } diff --git a/rust/tests/test_async_100_handle.rs b/rust/tests/test_async_100_handle.rs new file mode 100644 index 0000000..e50bad9 --- /dev/null +++ b/rust/tests/test_async_100_handle.rs @@ -0,0 +1,25 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +//! Just check that we can link with libnbd and create a handle. + +#![deny(warnings)] + +#[tokio::test] +async fn test_async_nbd_handle_new() { + let _ = libnbd::AsyncHandle::new().unwrap(); +} diff --git a/rust/tests/test_async_200_connect_command.rs b/rust/tests/test_async_200_connect_command.rs new file mode 100644 index 0000000..2f4e7cc --- /dev/null +++ b/rust/tests/test_async_200_connect_command.rs @@ -0,0 +1,26 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +#[tokio::test] +async fn test_async_connect_command() { + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.connect_command(&["nbdkit", "-s", "--exit-with-parent", "-v", "null"]) + .await + .unwrap(); +} diff --git a/rust/tests/test_async_210_opt_abort.rs b/rust/tests/test_async_210_opt_abort.rs new file mode 100644 index 0000000..e85fa0c --- /dev/null +++ b/rust/tests/test_async_210_opt_abort.rs @@ -0,0 +1,32 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +#[tokio::test] +async fn test_opt_abort() { + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.set_opt_mode(true).unwrap(); + nbd.connect_command(&["nbdkit", "-s", "--exit-with-parent", "-v", "null"]) + .await + .unwrap(); + assert_eq!(nbd.get_protocol().unwrap(), b"newstyle-fixed"); + assert!(nbd.get_structured_replies_negotiated().unwrap()); + + nbd.opt_abort().await.unwrap(); + assert!(nbd.aio_is_closed()); +} diff --git a/rust/tests/test_async_220_opt_list.rs b/rust/tests/test_async_220_opt_list.rs new file mode 100644 index 0000000..5d86a1d --- /dev/null +++ b/rust/tests/test_async_220_opt_list.rs @@ -0,0 +1,86 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +use std::env; +use std::os::unix::ffi::OsStringExt as _; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +/// Test different types of connections. +struct ConnTester { + script_path: String, +} + +impl ConnTester { + fn new() -> Self { + let srcdir = env::var("srcdir").unwrap(); + let srcdir = Path::new(&srcdir); + let script_path = srcdir.join("../tests/opt-list.sh"); + let script_path + String::from_utf8(script_path.into_os_string().into_vec()).unwrap(); + Self { script_path } + } + + async fn connect( + &self, + mode: u8, + expected_exports: &[&str], + ) -> libnbd::SharedResult<()> { + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.set_opt_mode(true).unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "sh", + &self.script_path, + format!("mode={mode}").as_str(), + ]) + .await + .unwrap(); + + // Collect all exports in this list. + let exports = Arc::new(Mutex::new(Vec::new())); + let exports_clone = exports.clone(); + nbd.opt_list(move |name, _| { + exports_clone + .lock() + .unwrap() + .push(String::from_utf8(name.to_owned()).unwrap()); + 0 + }) + .await?; + let exports = Arc::try_unwrap(exports).unwrap().into_inner().unwrap(); + assert_eq!(exports.len(), expected_exports.len()); + for (export, &expected) in exports.iter().zip(expected_exports) { + assert_eq!(export, expected); + } + Ok(()) + } +} + +#[tokio::test] +async fn test_opt_list() { + let conn_tester = ConnTester::new(); + assert!(conn_tester.connect(0, &[]).await.is_err()); + assert!(conn_tester.connect(1, &["a", "b"]).await.is_ok()); + assert!(conn_tester.connect(2, &[]).await.is_ok()); + assert!(conn_tester.connect(3, &["a"]).await.is_ok()); +} diff --git a/rust/tests/test_async_230_opt_info.rs b/rust/tests/test_async_230_opt_info.rs new file mode 100644 index 0000000..174200b --- /dev/null +++ b/rust/tests/test_async_230_opt_info.rs @@ -0,0 +1,122 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +use libnbd::CONTEXT_BASE_ALLOCATION; +use std::env; +use std::path::Path; + +#[tokio::test] +async fn test_opt_info() { + let srcdir = env::var("srcdir").unwrap(); + let srcdir = Path::new(&srcdir); + let script_path = srcdir.join("../tests/opt-info.sh"); + let script_path = script_path.to_str().unwrap(); + + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.set_opt_mode(true).unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "sh", + script_path, + ]) + .await + .unwrap(); + nbd.add_meta_context(CONTEXT_BASE_ALLOCATION).unwrap(); + + // No size, flags, or meta-contexts yet + assert!(nbd.get_size().is_err()); + assert!(nbd.is_read_only().is_err()); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).is_err()); + + // info with no prior name gets info on "" + assert!(nbd.opt_info().await.is_ok()); + assert_eq!(nbd.get_size().unwrap(), 0); + assert!(nbd.is_read_only().unwrap()); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); + + // changing export wipes out prior info + nbd.set_export_name("b").unwrap(); + assert!(nbd.get_size().is_err()); + assert!(nbd.is_read_only().is_err()); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).is_err()); + + // info on something not present fails + nbd.set_export_name("a").unwrap(); + assert!(nbd.opt_info().await.is_err()); + + // info for a different export, with automatic meta_context disabled + nbd.set_export_name("b").unwrap(); + nbd.set_request_meta_context(false).unwrap(); + nbd.opt_info().await.unwrap(); + // idempotent name change is no-op + nbd.set_export_name("b").unwrap(); + assert_eq!(nbd.get_size().unwrap(), 1); + assert!(!nbd.is_read_only().unwrap()); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).is_err()); + nbd.set_request_meta_context(true).unwrap(); + + // go on something not present + nbd.set_export_name("a").unwrap(); + assert!(nbd.opt_go().await.is_err()); + assert!(nbd.get_size().is_err()); + assert!(nbd.is_read_only().is_err()); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).is_err()); + + // go on a valid export + nbd.set_export_name("good").unwrap(); + nbd.opt_go().await.unwrap(); + assert_eq!(nbd.get_size().unwrap(), 4); + assert!(nbd.is_read_only().unwrap()); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); + + // now info is no longer valid, but does not wipe data + assert!(nbd.set_export_name("a").is_err()); + assert_eq!(nbd.get_export_name().unwrap(), b"good"); + assert!(nbd.opt_info().await.is_err()); + assert_eq!(nbd.get_size().unwrap(), 4); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); + nbd.disconnect(None).await.unwrap(); + + // Another connection. This time, check that SET_META triggered by opt_info + // persists through nbd_opt_go with set_request_meta_context disabled. + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.set_opt_mode(true).unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "sh", + &script_path, + ]) + .await + .unwrap(); + nbd.add_meta_context("x-unexpected:bogus").unwrap(); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).is_err()); + nbd.opt_info().await.unwrap(); + assert!(!nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); + nbd.set_request_meta_context(false).unwrap(); + // Adding to the request list now won't matter + nbd.add_meta_context(CONTEXT_BASE_ALLOCATION).unwrap(); + nbd.opt_go().await.unwrap(); + assert!(!nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); +} diff --git a/rust/tests/test_async_240_opt_list_meta.rs b/rust/tests/test_async_240_opt_list_meta.rs new file mode 100644 index 0000000..fade096 --- /dev/null +++ b/rust/tests/test_async_240_opt_list_meta.rs @@ -0,0 +1,150 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +use std::sync::{Arc, Mutex}; + +/// A struct with information about listed meta contexts. +#[derive(Debug, Clone, PartialEq, Eq)] +struct CtxInfo { + /// Whether the meta context "base:alloc" is listed. + has_alloc: bool, + /// The number of listed meta contexts. + count: u32, +} + +async fn list_meta_ctxs( + nbd: &libnbd::AsyncHandle, +) -> libnbd::SharedResult<CtxInfo> { + let info = Arc::new(Mutex::new(CtxInfo { + has_alloc: false, + count: 0, + })); + let info_clone = info.clone(); + nbd.opt_list_meta_context(move |ctx| { + let mut info = info_clone.lock().unwrap(); + info.count += 1; + if ctx == libnbd::CONTEXT_BASE_ALLOCATION { + info.has_alloc = true; + } + 0 + }) + .await?; + let info = Arc::try_unwrap(info).unwrap().into_inner().unwrap(); + Ok(info) +} + +#[tokio::test] +async fn test_async_opt_list_meta() { + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.set_opt_mode(true).unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "memory", + "size=1M", + ]) + .await + .unwrap(); + + // First pass: empty query should give at least "base:allocation". + let info = list_meta_ctxs(&nbd).await.unwrap(); + assert!(info.count >= 1); + assert!(info.has_alloc); + let max = info.count; + + // Second pass: bogus query has no response. + nbd.add_meta_context("x-nosuch:").unwrap(); + assert_eq!( + list_meta_ctxs(&nbd).await.unwrap(), + CtxInfo { + count: 0, + has_alloc: false + } + ); + + // Third pass: specific query should have one match. + nbd.add_meta_context("base:allocation").unwrap(); + assert_eq!(nbd.get_nr_meta_contexts().unwrap(), 2); + assert_eq!(nbd.get_meta_context(1).unwrap(), b"base:allocation"); + assert_eq!( + list_meta_ctxs(&nbd).await.unwrap(), + CtxInfo { + count: 1, + has_alloc: true + } + ); + + // Fourth pass: opt_list_meta_context is stateless, so it should + // not wipe status learned during opt_info + assert!(nbd.can_meta_context("base:allocation").is_err()); + assert!(nbd.get_size().is_err()); + nbd.opt_info().await.unwrap(); + assert_eq!(nbd.get_size().unwrap(), 1048576); + assert!(nbd.can_meta_context("base:allocation").unwrap()); + nbd.clear_meta_contexts().unwrap(); + nbd.add_meta_context("x-nosuch:").unwrap(); + assert_eq!( + list_meta_ctxs(&nbd).await.unwrap(), + CtxInfo { + count: 0, + has_alloc: false + } + ); + assert_eq!(nbd.get_size().unwrap(), 1048576); + assert!(nbd.can_meta_context("base:allocation").unwrap()); + + // Final pass: "base:" query should get at least "base:allocation" + nbd.add_meta_context("base:").unwrap(); + let info = list_meta_ctxs(&nbd).await.unwrap(); + assert!(info.count >= 1); + assert!(info.count <= max); + assert!(info.has_alloc); + + // Repeat but this time without structured replies. Deal gracefully + // with older servers that don't allow the attempt. + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.set_opt_mode(true).unwrap(); + nbd.set_request_structured_replies(false).unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "memory", + "size=1M", + ]) + .await + .unwrap(); + let bytes = nbd.stats_bytes_sent(); + if let Ok(info) = list_meta_ctxs(&nbd).await { + assert!(info.count >= 1); + assert!(info.has_alloc) + } else { + assert!(nbd.stats_bytes_sent() > bytes); + // ignoring failure from old server + } + + // Now enable structured replies, and a retry should pass. + nbd.opt_structured_reply().await.unwrap(); + let info = list_meta_ctxs(&nbd).await.unwrap(); + assert!(info.count >= 1); + assert!(info.has_alloc); +} diff --git a/rust/tests/test_async_245_opt_list_meta_queries.rs b/rust/tests/test_async_245_opt_list_meta_queries.rs new file mode 100644 index 0000000..5e6b01f --- /dev/null +++ b/rust/tests/test_async_245_opt_list_meta_queries.rs @@ -0,0 +1,94 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +use std::sync::{Arc, Mutex}; + +/// A struct with information about listed meta contexts. +#[derive(Debug, Clone, PartialEq, Eq)] +struct CtxInfo { + /// Whether the meta context "base:allocation" is listed. + has_alloc: bool, + /// The number of listed meta contexts. + count: u32, +} + +async fn list_meta_ctxs( + nbd: &libnbd::AsyncHandle, + queries: &[&[u8]], +) -> libnbd::SharedResult<CtxInfo> { + let info = Arc::new(Mutex::new(CtxInfo { + has_alloc: false, + count: 0, + })); + let info_clone = info.clone(); + nbd.opt_list_meta_context_queries(queries, move |ctx| { + let mut info = info_clone.lock().unwrap(); + info.count += 1; + if ctx == libnbd::CONTEXT_BASE_ALLOCATION { + info.has_alloc = true; + } + 0 + }) + .await?; + let info = Arc::try_unwrap(info).unwrap().into_inner().unwrap(); + Ok(info) +} + +#[tokio::test] +async fn test_async_opt_list_meta_queries() { + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.set_opt_mode(true).unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "memory", + "size=1M", + ]) + .await + .unwrap(); + + // First pass: empty query should give at least "base:allocation". + nbd.add_meta_context("x-nosuch:").unwrap(); + let info = list_meta_ctxs(&nbd, &[]).await.unwrap(); + assert!(info.count >= 1); + assert!(info.has_alloc); + + // Second pass: bogus query has no response. + nbd.clear_meta_contexts().unwrap(); + assert_eq!( + list_meta_ctxs(&nbd, &[b"x-nosuch:"]).await.unwrap(), + CtxInfo { + count: 0, + has_alloc: false + } + ); + + // Third pass: specific query should have one match. + assert_eq!( + list_meta_ctxs(&nbd, &[b"x-nosuch:", libnbd::CONTEXT_BASE_ALLOCATION]) + .await + .unwrap(), + CtxInfo { + count: 1, + has_alloc: true + } + ); +} diff --git a/rust/tests/test_async_250_opt_set_meta.rs b/rust/tests/test_async_250_opt_set_meta.rs new file mode 100644 index 0000000..7a401bc --- /dev/null +++ b/rust/tests/test_async_250_opt_set_meta.rs @@ -0,0 +1,125 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +use libnbd::CONTEXT_BASE_ALLOCATION; +use std::sync::{Arc, Mutex}; + +/// A struct with information about set meta contexts. +#[derive(Debug, Clone, PartialEq, Eq)] +struct CtxInfo { + /// Whether the meta context "base:allocation" is set. + has_alloc: bool, + /// The number of set meta contexts. + count: u32, +} + +async fn set_meta_ctxs( + nbd: &libnbd::AsyncHandle, +) -> libnbd::SharedResult<CtxInfo> { + let info = Arc::new(Mutex::new(CtxInfo { + has_alloc: false, + count: 0, + })); + let info_clone = info.clone(); + nbd.opt_set_meta_context(move |ctx| { + let mut info = info_clone.lock().unwrap(); + info.count += 1; + if ctx == CONTEXT_BASE_ALLOCATION { + info.has_alloc = true; + } + 0 + }) + .await?; + let info = Arc::try_unwrap(info).unwrap().into_inner().unwrap(); + Ok(info) +} + +#[tokio::test] +async fn test_async_opt_set_meta() { + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.set_opt_mode(true).unwrap(); + nbd.set_request_structured_replies(false).unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "memory", + "size=1M", + ]) + .await + .unwrap(); + + // No contexts negotiated yet; can_meta should be error if any requested + assert!(!nbd.get_structured_replies_negotiated().unwrap()); + assert!(!nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); + nbd.add_meta_context(CONTEXT_BASE_ALLOCATION).unwrap(); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).is_err()); + + // SET cannot succeed until SR is negotiated. + nbd.opt_structured_reply().await.unwrap(); + assert!(nbd.get_structured_replies_negotiated().unwrap()); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).is_err()); + + // nbdkit does not match wildcard for SET, even though it does for LIST + nbd.clear_meta_contexts().unwrap(); + nbd.add_meta_context("base:").unwrap(); + assert_eq!( + set_meta_ctxs(&nbd).await.unwrap(), + CtxInfo { + count: 0, + has_alloc: false + } + ); + assert!(!nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); + + // Negotiating with no contexts is not an error, but selects nothing + nbd.clear_meta_contexts().unwrap(); + assert_eq!( + set_meta_ctxs(&nbd).await.unwrap(), + CtxInfo { + count: 0, + has_alloc: false + } + ); + + // Request 2 with expectation of 1; with set_request_meta_context off + nbd.add_meta_context("x-nosuch:context").unwrap(); + nbd.add_meta_context(CONTEXT_BASE_ALLOCATION).unwrap(); + nbd.set_request_meta_context(false).unwrap(); + assert_eq!( + set_meta_ctxs(&nbd).await.unwrap(), + CtxInfo { + count: 1, + has_alloc: true + } + ); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); + + // Transition to transmission phase; our last set should remain active + nbd.clear_meta_contexts().unwrap(); + nbd.add_meta_context("x-nosuch:context").unwrap(); + nbd.opt_go().await.unwrap(); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); + + // Now too late to set; but should not lose earlier state + assert!(set_meta_ctxs(&nbd).await.is_err()); + assert_eq!(nbd.get_size().unwrap(), 1048576); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); +} diff --git a/rust/tests/test_async_255_opt_set_meta_queries.rs b/rust/tests/test_async_255_opt_set_meta_queries.rs new file mode 100644 index 0000000..652e760 --- /dev/null +++ b/rust/tests/test_async_255_opt_set_meta_queries.rs @@ -0,0 +1,110 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +use libnbd::CONTEXT_BASE_ALLOCATION; +use std::sync::{Arc, Mutex}; + +/// A struct with information about set meta contexts. +#[derive(Debug, Clone, PartialEq, Eq)] +struct CtxInfo { + /// Whether the meta context "base:allocation" is set. + has_alloc: bool, + /// The number of set meta contexts. + count: u32, +} + +async fn set_meta_ctxs_queries( + nbd: &libnbd::AsyncHandle, + queries: &[impl AsRef<[u8]>], +) -> libnbd::SharedResult<CtxInfo> { + let info = Arc::new(Mutex::new(CtxInfo { + has_alloc: false, + count: 0, + })); + let info_clone = info.clone(); + nbd.opt_set_meta_context_queries(queries, move |ctx| { + let mut info = info_clone.lock().unwrap(); + info.count += 1; + if ctx == CONTEXT_BASE_ALLOCATION { + info.has_alloc = true; + } + 0 + }) + .await?; + let info = Arc::try_unwrap(info).unwrap().into_inner().unwrap(); + Ok(info) +} + +#[tokio::test] +async fn test_async_opt_set_meta_queries() { + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.set_opt_mode(true).unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "memory", + "size=1M", + ]) + .await + .unwrap(); + + // nbdkit does not match wildcard for SET, even though it does for LIST + assert_eq!( + set_meta_ctxs_queries(&nbd, &["base:"]).await.unwrap(), + CtxInfo { + count: 0, + has_alloc: false + } + ); + assert!(!nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); + + // Negotiating with no contexts is not an error, but selects nothing + // An explicit empty list overrides a non-empty implicit list. + nbd.add_meta_context(CONTEXT_BASE_ALLOCATION).unwrap(); + assert_eq!( + set_meta_ctxs_queries(&nbd, &[] as &[&str]).await.unwrap(), + CtxInfo { + count: 0, + has_alloc: false + } + ); + assert!(!nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); + + // Request 2 with expectation of 1. + assert_eq!( + set_meta_ctxs_queries( + &nbd, + &[b"x-nosuch:context".as_slice(), CONTEXT_BASE_ALLOCATION] + ) + .await + .unwrap(), + CtxInfo { + count: 1, + has_alloc: true + } + ); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); + + // Transition to transmission phase; our last set should remain active + nbd.set_request_meta_context(false).unwrap(); + nbd.opt_go().await.unwrap(); + assert!(nbd.can_meta_context(CONTEXT_BASE_ALLOCATION).unwrap()); +} diff --git a/rust/tests/test_async_400_pread.rs b/rust/tests/test_async_400_pread.rs new file mode 100644 index 0000000..16a6f89 --- /dev/null +++ b/rust/tests/test_async_400_pread.rs @@ -0,0 +1,40 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +mod nbdkit_pattern; +use nbdkit_pattern::PATTERN; + +#[tokio::test] +async fn test_async_pread() { + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "pattern", + "size=1M", + ]) + .await + .unwrap(); + + let mut buf = [0; 512]; + nbd.pread(&mut buf, 0, None).await.unwrap(); + assert_eq!(buf.as_slice(), PATTERN.as_slice()); +} diff --git a/rust/tests/test_async_405_pread_structured.rs b/rust/tests/test_async_405_pread_structured.rs new file mode 100644 index 0000000..cf8fa28 --- /dev/null +++ b/rust/tests/test_async_405_pread_structured.rs @@ -0,0 +1,84 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +mod nbdkit_pattern; +use nbdkit_pattern::PATTERN; + +#[tokio::test] +async fn test_async_pread_structured() { + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "pattern", + "size=1M", + ]) + .await + .unwrap(); + + fn f(buf: &[u8], offset: u64, s: u32, err: &mut i32) { + assert_eq!(*err, 0); + *err = 42; + assert_eq!(buf, PATTERN.as_slice()); + assert_eq!(offset, 0); + assert_eq!(s, libnbd::READ_DATA); + } + + let mut buf = [0; 512]; + nbd.pread_structured( + &mut buf, + 0, + |b, o, s, e| { + f(b, o, s, e); + 0 + }, + None, + ) + .await + .unwrap(); + assert_eq!(buf.as_slice(), PATTERN.as_slice()); + + nbd.pread_structured( + &mut buf, + 0, + |b, o, s, e| { + f(b, o, s, e); + 0 + }, + Some(libnbd::CmdFlag::DF), + ) + .await + .unwrap(); + assert_eq!(buf.as_slice(), PATTERN.as_slice()); + + let res = nbd + .pread_structured( + &mut buf, + 0, + |b, o, s, e| { + f(b, o, s, e); + -1 + }, + Some(libnbd::CmdFlag::DF), + ) + .await; + assert_eq!(res.unwrap_err().errno(), Some(42)); +} diff --git a/rust/tests/test_async_410_pwrite.rs b/rust/tests/test_async_410_pwrite.rs new file mode 100644 index 0000000..9c4e4b8 --- /dev/null +++ b/rust/tests/test_async_410_pwrite.rs @@ -0,0 +1,59 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +use std::fs::{self, File}; + +#[tokio::test] +async fn test_async_pwrite() { + let tmp_dir = tempfile::tempdir().unwrap(); + let data_file_path = tmp_dir.path().join("pwrite_test.data"); + let data_file = File::create(&data_file_path).unwrap(); + data_file.set_len(512).unwrap(); + drop(data_file); + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "file", + data_file_path.to_str().unwrap(), + ]) + .await + .unwrap(); + + let mut buf_1 = [0; 512]; + buf_1[10] = 0x01; + buf_1[510] = 0x55; + buf_1[511] = 0xAA; + + let flags = Some(libnbd::CmdFlag::FUA); + nbd.pwrite(&buf_1, 0, flags).await.unwrap(); + + let mut buf_2 = [0; 512]; + nbd.pread(&mut buf_2, 0, None).await.unwrap(); + + assert_eq!(buf_1, buf_2); + + // Drop nbd before tmp_dir is dropped. + drop(nbd); + + let data_file_content = fs::read(&data_file_path).unwrap(); + assert_eq!(buf_1.as_slice(), data_file_content.as_slice()); +} diff --git a/rust/tests/test_async_460_block_status.rs b/rust/tests/test_async_460_block_status.rs new file mode 100644 index 0000000..c3536e6 --- /dev/null +++ b/rust/tests/test_async_460_block_status.rs @@ -0,0 +1,98 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +use std::env; +use std::path::Path; +use std::sync::{Arc, Mutex}; + +async fn block_status_get_entries( + nbd: &libnbd::AsyncHandle, + count: u64, + offset: u64, + flags: Option<libnbd::CmdFlag>, +) -> Vec<u32> { + let entries = Arc::new(Mutex::new(None)); + let entries_clone = entries.clone(); + nbd.block_status( + count, + offset, + move |metacontext, _, entries, err| { + assert_eq!(*err, 0); + if metacontext == libnbd::CONTEXT_BASE_ALLOCATION { + *entries_clone.lock().unwrap() = Some(entries.to_vec()); + } + 0 + }, + flags, + ) + .await + .unwrap(); + Arc::try_unwrap(entries) + .unwrap() + .into_inner() + .unwrap() + .unwrap() +} + +#[tokio::test] +async fn test_async_block_status() { + let srcdir = env::var("srcdir").unwrap(); + let srcdir = Path::new(&srcdir); + let script_path = srcdir.join("../tests/meta-base-allocation.sh"); + let script_path = script_path.to_str().unwrap(); + let nbd = libnbd::AsyncHandle::new().unwrap(); + nbd.add_meta_context(libnbd::CONTEXT_BASE_ALLOCATION) + .unwrap(); + nbd.connect_command(&[ + "nbdkit", + "-s", + "--exit-with-parent", + "-v", + "sh", + script_path, + ]) + .await + .unwrap(); + + assert_eq!( + block_status_get_entries(&nbd, 65536, 0, None) + .await + .as_slice(), + &[8192, 0, 8192, 1, 16384, 3, 16384, 2, 16384, 0,] + ); + + assert_eq!( + block_status_get_entries(&nbd, 1024, 32256, None) + .await + .as_slice(), + &[512, 3, 16384, 2] + ); + + assert_eq!( + block_status_get_entries( + &nbd, + 1024, + 32256, + Some(libnbd::CmdFlag::REQ_ONE) + ) + .await + .as_slice(), + &[512, 3] + ); +} diff --git a/rust/tests/test_async_620_stats.rs b/rust/tests/test_async_620_stats.rs new file mode 100644 index 0000000..fb03232 --- /dev/null +++ b/rust/tests/test_async_620_stats.rs @@ -0,0 +1,69 @@ +// libnbd Rust test case +// Copyright Tage Johansson +// +// This library is free software; you can redistribute it and/or +// modify it under the terms of the GNU Lesser General Public +// License as published by the Free Software Foundation; either +// version 2 of the License, or (at your option) any later version. +// +// This library is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU +// Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public +// License along with this library; if not, write to the Free Software +// Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA + +#![deny(warnings)] + +#[tokio::test] +async fn test_async_stats() { + let nbd = libnbd::AsyncHandle::new().unwrap(); + + // Pre-connection, stats start out at 0 + assert_eq!(nbd.stats_bytes_sent(), 0); + assert_eq!(nbd.stats_chunks_sent(), 0); + assert_eq!(nbd.stats_bytes_received(), 0); + assert_eq!(nbd.stats_chunks_received(), 0); + + // Connection performs handshaking, which increments stats. + // The number of bytes/chunks here may grow over time as more features get + // automatically negotiated, so merely check that they are non-zero. + nbd.connect_command(&["nbdkit", "-s", "--exit-with-parent", "-v", "null"]) + .await + .unwrap(); + + let bs1 = nbd.stats_bytes_sent(); + let cs1 = nbd.stats_chunks_sent(); + let br1 = nbd.stats_bytes_received(); + let cr1 = nbd.stats_chunks_received(); + assert!(cs1 > 0); + assert!(bs1 > cs1); + assert!(cr1 > 0); + assert!(br1 > cr1); + + // A flush command should be one chunk out, one chunk back (even if + // structured replies are in use) + nbd.flush(None).await.unwrap(); + let bs2 = nbd.stats_bytes_sent(); + let cs2 = nbd.stats_chunks_sent(); + let br2 = nbd.stats_bytes_received(); + let cr2 = nbd.stats_chunks_received(); + assert_eq!(bs2, bs1 + 28); + assert_eq!(cs2, cs1 + 1); + assert_eq!(br2, br1 + 16); // assumes nbdkit uses simple reply + assert_eq!(cr2, cr1 + 1); + + // Stats are still readable after the connection closes; we don't know if + // the server sent reply bytes to our NBD_CMD_DISC, so don't insist on it. + nbd.disconnect(None).await.unwrap(); + let bs3 = nbd.stats_bytes_sent(); + let cs3 = nbd.stats_chunks_sent(); + let br3 = nbd.stats_bytes_received(); + let cr3 = nbd.stats_chunks_received(); + assert!(bs3 > bs2); + assert_eq!(cs3, cs2 + 1); + assert!(br3 >= br2); + assert!(cr3 == cr2 || cr3 == cr2 + 1); +} -- 2.41.0
Tage Johansson
2023-Aug-20 14:16 UTC
[Libguestfs] [libnbd PATCH v8 10/10] rust: async: Add an example
This patch adds an example using the asynchronous Rust bindings. --- rust/Cargo.toml | 1 + rust/examples/concurrent-read-write.rs | 149 +++++++++++++++++++++++++ rust/run-tests.sh.in | 2 + 3 files changed, 152 insertions(+) create mode 100644 rust/examples/concurrent-read-write.rs diff --git a/rust/Cargo.toml b/rust/Cargo.toml index c49f9f2..0879b34 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -58,5 +58,6 @@ default = ["log", "tokio"] anyhow = "1.0.72" once_cell = "1.18.0" pretty-hex = "0.3.0" +rand = { version = "0.8.5", default-features = false, features = ["small_rng", "min_const_gen"] } tempfile = "3.6.0" tokio = { version = "1.29.1", default-features = false, features = ["rt-multi-thread", "macros"] } diff --git a/rust/examples/concurrent-read-write.rs b/rust/examples/concurrent-read-write.rs new file mode 100644 index 0000000..4858f76 --- /dev/null +++ b/rust/examples/concurrent-read-write.rs @@ -0,0 +1,149 @@ +//! Example usage with nbdkit: +//! nbdkit -U - memory 100M \ +//! --run 'cargo run --example concurrent-read-write -- $unixsocket' +//! Or connect over a URI: +//! nbdkit -U - memory 100M \ +//! --run 'cargo run --example concurrent-read-write -- $uri' +//! +//! This will read and write randomly over the first megabyte of the +//! plugin using multi-conn, multiple threads and multiple requests in +//! flight on each thread. + +#![deny(warnings)] +use rand::prelude::*; +use std::env; +use std::sync::Arc; +use tokio::task::JoinSet; + +/// Number of simultaneous connections to the NBD server. +/// +/// Note that some servers only support a limited number of +/// simultaneous connections, and/or have a configurable thread pool +/// internally, and if you exceed those limits then something will break. +const NR_MULTI_CONN: usize = 8; + +/// Number of commands that can be "in flight" at the same time on each +/// connection. (Therefore the total number of requests in flight may +/// be up to NR_MULTI_CONN * MAX_IN_FLIGHT). +const MAX_IN_FLIGHT: usize = 16; + +/// The size of large reads and writes, must be > 512. +const BUFFER_SIZE: usize = 1024; + +/// Number of commands we issue (per [task][tokio::task]). +const NR_CYCLES: usize = 32; + +/// Statistics gathered during the run. +#[derive(Debug, Default)] +struct Stats { + /// The total number of requests made. + requests: usize, +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + let args = env::args_os().collect::<Vec<_>>(); + if args.len() != 2 { + anyhow::bail!("Usage: {:?} socket", args[0]); + } + + // We begin by making a connection to the server to get the export size + // and ensure that it supports multiple connections and is writable. + let nbd = libnbd::Handle::new()?; + + // Check if the user provided a URI or a unix socket. + let socket_or_uri = args[1].to_str().unwrap(); + if socket_or_uri.contains("://") { + nbd.connect_uri(socket_or_uri)?; + } else { + nbd.connect_unix(socket_or_uri)?; + } + + let export_size = nbd.get_size()?; + anyhow::ensure!( + (BUFFER_SIZE as u64) < export_size, + "export is {export_size}B, must be larger than {BUFFER_SIZE}B" + ); + anyhow::ensure!( + !nbd.is_read_only()?, + "error: this NBD export is read-only" + ); + anyhow::ensure!( + nbd.can_multi_conn()?, + "error: this NBD export does not support multi-conn" + ); + drop(nbd); // Close the connection. + + // Start the worker tasks, one per connection. + let mut tasks = JoinSet::new(); + for i in 0..NR_MULTI_CONN { + tasks.spawn(run_thread(i, socket_or_uri.to_owned(), export_size)); + } + + // Wait for the tasks to complete. + let mut stats = Stats::default(); + while !tasks.is_empty() { + let this_stats = tasks.join_next().await.unwrap().unwrap()?; + stats.requests += this_stats.requests; + } + + // Make sure the number of requests that were required matches what + // we expect. + assert_eq!(stats.requests, NR_MULTI_CONN * NR_CYCLES); + + Ok(()) +} + +async fn run_thread( + task_idx: usize, + socket_or_uri: String, + export_size: u64, +) -> anyhow::Result<Stats> { + // Start a new connection to the server. + // We shall spawn many commands concurrently on different tasks and those + // futures must be `'static`, hence we wrap the handle in an [Arc]. + let nbd = Arc::new(libnbd::AsyncHandle::new()?); + + // Check if the user provided a URI or a unix socket. + if socket_or_uri.contains("://") { + nbd.connect_uri(socket_or_uri).await?; + } else { + nbd.connect_unix(socket_or_uri).await?; + } + + let mut rng = SmallRng::seed_from_u64(44 as u64); + + // Issue commands. + let mut stats = Stats::default(); + let mut join_set = JoinSet::new(); + //tokio::time::sleep(std::time::Duration::from_secs(1)).await; + while stats.requests < NR_CYCLES || !join_set.is_empty() { + while stats.requests < NR_CYCLES && join_set.len() < MAX_IN_FLIGHT { + // If we want to issue another request, do so. Note that we reuse + // the same buffer for multiple in-flight requests. It doesn't + // matter here because we're just trying to write random stuff, + // but that would be Very Bad in a real application. + // Simulate a mix of large and small requests. + let size = if rng.gen() { BUFFER_SIZE } else { 512 }; + let offset = rng.gen_range(0..export_size - size as u64); + + let mut buf = [0u8; BUFFER_SIZE]; + let nbd = nbd.clone(); + if rng.gen() { + join_set.spawn(async move { + nbd.pread(&mut buf, offset, None).await + }); + } else { + // Fill the buf with random data. + rng.fill(&mut buf); + join_set + .spawn(async move { nbd.pwrite(&buf, offset, None).await }); + } + stats.requests += 1; + } + join_set.join_next().await.unwrap().unwrap()?; + } + + if task_idx == 0 {} + Ok(stats) +} diff --git a/rust/run-tests.sh.in b/rust/run-tests.sh.in index 3ebf9a1..661c018 100755 --- a/rust/run-tests.sh.in +++ b/rust/run-tests.sh.in @@ -32,6 +32,8 @@ if [ -z "$VG" ]; then --run '@CARGO@ run --example get-size -- $unixsocket' @NBDKIT@ -U - floppy . \ --run '@CARGO@ run --example fetch-first-sector -- $unixsocket' + @NBDKIT@ -U - memory 10M \ + --run '@CARGO@ run --example concurrent-read-write -- $unixsocket' else @CARGO@ test --config "target.'cfg(all())'.runner = \"$VG\"" -- --nocapture fi -- 2.41.0