Richard W.M. Jones
2019-Jun-04 09:59 UTC
[Libguestfs] [PATCH libnbd v2 0/4] api: Implement concurrent writer.
v1: https://www.redhat.com/archives/libguestfs/2019-June/msg00014.html I pushed a few bits which are uncontroversial. The main changes since v1 are: An extra patch removes the want_to_send / check for nbd_aio_is_ready in examples/threaded-reads-and-writes.c. This logic was wrong since commit 6af72b87 as was pointed out by Eric in his review. Comments and structure of examples/concurrent-writes.c has been updated to match. Callbacks now return int instead of void. In some cases we ignore this return value. I added a lot more commentary in the commit message for the main patch (now patch 3 in this series). Rich.
Richard W.M. Jones
2019-Jun-04 09:59 UTC
[Libguestfs] [PATCH libnbd v2 1/4] examples, tests: Remove want_to_send / ready logic, increase limit on cmds in flight.
Since Eric's improvements to the command queue in commit 6af72b8 (and following) there's now a queue of commands waiting to be issued stored in the handle, and there's no need to issue commands only from the ready state. We can therefore remove the want_to_send logic, queue as many commands as possible, and don't need to test if the socket is ready for POLLOUT. This commit also removes some misleading comments, improves the documentation, and increases the limit on commands in flight (since this limit is effectively in place to stop memory exhaustion, not because of server limits). --- docs/libnbd.pod | 9 ++-- examples/threaded-reads-and-writes.c | 65 +++++++++++----------------- tests/aio-parallel-load.c | 56 ++++++++++-------------- tests/aio-parallel-tls.sh | 2 +- tests/aio-parallel.c | 58 +++++++++++-------------- tests/aio-parallel.sh | 2 +- 6 files changed, 83 insertions(+), 109 deletions(-) diff --git a/docs/libnbd.pod b/docs/libnbd.pod index a5dfb99..ede2539 100644 --- a/docs/libnbd.pod +++ b/docs/libnbd.pod @@ -362,9 +362,12 @@ Replies may arrive out of order. Although in theory you can have an indefinite number of requests in flight at the same time, in practice it's a good idea to limit them to -some number. It is suggested to start with a limit of 16 requests in -flight (per NBD connection), and measure how adjusting the limit up -and down affects performance for your local configuration. +some number. Libnbd will queue commands in the handle even if it +cannot write them to the server, so this limit is largely to prevent a +backlog of commands from consuming too much memory. It is suggested +to start with a limit of 64 requests in flight (per NBD connection), +and measure how adjusting the limit up and down affects performance +for your local configuration. There is a full example using multiple in-flight requests available at L<https://github.com/libguestfs/libnbd/blob/master/examples/threaded-reads-and-writes.c> diff --git a/examples/threaded-reads-and-writes.c b/examples/threaded-reads-and-writes.c index 5d0d2bd..bb82641 100644 --- a/examples/threaded-reads-and-writes.c +++ b/examples/threaded-reads-and-writes.c @@ -41,20 +41,16 @@ static int64_t exportsize; /* Number of commands that can be "in flight" at the same time on each * connection. (Therefore the total number of requests in flight may - * be up to NR_MULTI_CONN * MAX_IN_FLIGHT). qemu's NBD client can - * have up to 16 requests in flight. - * - * Some servers do not support multiple requests in flight and may - * deadlock or even crash if this is larger than 1, but common NBD - * servers should be OK. + * be up to NR_MULTI_CONN * MAX_IN_FLIGHT). See libnbd(3) section + * "Issuing multiple in-flight requests". */ -#define MAX_IN_FLIGHT 16 +#define MAX_IN_FLIGHT 64 /* The size of reads and writes. */ #define BUFFER_SIZE (1024*1024) /* Number of commands we issue (per thread). */ -#define NR_CYCLES 100000 +#define NR_CYCLES 1000000 struct thread_status { size_t i; /* Thread index, 0 .. NR_MULTI_CONN-1 */ @@ -192,7 +188,6 @@ start_thread (void *arg) uint64_t handles[MAX_IN_FLIGHT]; size_t in_flight; /* counts number of requests in flight */ int dir, r, cmd; - bool want_to_send; buf = malloc (BUFFER_SIZE); if (buf == NULL) { @@ -238,15 +233,31 @@ start_thread (void *arg) goto error; } - /* Do we want to send another request and there's room to issue it - * and the connection is in the READY state so it can be used to - * issue a request. + /* If we want to issue another request, do so. Note that we reuse + * the same buffer for multiple in-flight requests. It doesn't + * matter here because we're just trying to write random stuff, + * but that would be Very Bad in a real application. */ - want_to_send - i > 0 && in_flight < MAX_IN_FLIGHT && nbd_aio_is_ready (nbd); + while (i > 0 && in_flight < MAX_IN_FLIGHT) { + offset = rand () % (exportsize - sizeof buf); + cmd = rand () & 1; + if (cmd == 0) + handle = nbd_aio_pwrite (nbd, buf, sizeof buf, offset, 0); + else + handle = nbd_aio_pread (nbd, buf, sizeof buf, offset, 0); + if (handle == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + goto error; + } + handles[in_flight] = handle; + i--; + in_flight++; + if (in_flight > status->most_in_flight) + status->most_in_flight = in_flight; + } fds[0].fd = nbd_aio_get_fd (nbd); - fds[0].events = want_to_send ? POLLOUT : 0; + fds[0].events = 0; fds[0].revents = 0; dir = nbd_aio_get_direction (nbd); if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0) @@ -266,30 +277,6 @@ start_thread (void *arg) (fds[0].revents & POLLOUT) != 0) nbd_aio_notify_write (nbd); - /* If we can issue another request, do so. Note that we reuse the - * same buffer for multiple in-flight requests. It doesn't matter - * here because we're just trying to write random stuff, but that - * would be Very Bad in a real application. - */ - if (want_to_send && (fds[0].revents & POLLOUT) != 0 && - nbd_aio_is_ready (nbd)) { - offset = rand () % (exportsize - sizeof buf); - cmd = rand () & 1; - if (cmd == 0) - handle = nbd_aio_pwrite (nbd, buf, sizeof buf, offset, 0); - else - handle = nbd_aio_pread (nbd, buf, sizeof buf, offset, 0); - if (handle == -1) { - fprintf (stderr, "%s\n", nbd_get_error ()); - goto error; - } - handles[in_flight] = handle; - i--; - in_flight++; - if (in_flight > status->most_in_flight) - status->most_in_flight = in_flight; - } - /* If a command is ready to retire, retire it. */ for (j = 0; j < in_flight; ++j) { r = nbd_aio_command_completed (nbd, handles[j]); diff --git a/tests/aio-parallel-load.c b/tests/aio-parallel-load.c index 20fa358..398312f 100644 --- a/tests/aio-parallel-load.c +++ b/tests/aio-parallel-load.c @@ -44,7 +44,7 @@ #define NR_MULTI_CONN 8 /* Number of commands in flight per connection. */ -#define MAX_IN_FLIGHT 16 +#define MAX_IN_FLIGHT 64 /* Unix socket. */ static const char *unixsocket; @@ -169,7 +169,7 @@ start_thread (void *arg) size_t in_flight; /* counts number of requests in flight */ int dir, r, cmd; time_t t; - bool expired = false, want_to_send; + bool expired = false; nbd = nbd_create (); if (nbd == NULL) { @@ -219,15 +219,30 @@ start_thread (void *arg) break; } - /* Do we want to send another request and there's room to issue it - * and the connection is in the READY state so it can be used to - * issue a request. - */ - want_to_send - !expired && in_flight < MAX_IN_FLIGHT && nbd_aio_is_ready (nbd); + /* If we can issue another request, do so. */ + while (!expired && in_flight < MAX_IN_FLIGHT) { + offset = rand () % (EXPORTSIZE - sizeof buf); + cmd = rand () & 1; + if (cmd == 0) { + handle = nbd_aio_pwrite (nbd, buf, sizeof buf, offset, 0); + status->bytes_sent += sizeof buf; + } + else { + handle = nbd_aio_pread (nbd, buf, sizeof buf, offset, 0); + status->bytes_received += sizeof buf; + } + if (handle == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + goto error; + } + handles[in_flight] = handle; + in_flight++; + if (in_flight > status->most_in_flight) + status->most_in_flight = in_flight; + } fds[0].fd = nbd_aio_get_fd (nbd); - fds[0].events = want_to_send ? POLLOUT : 0; + fds[0].events = 0; fds[0].revents = 0; dir = nbd_aio_get_direction (nbd); if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0) @@ -247,29 +262,6 @@ start_thread (void *arg) (fds[0].revents & POLLOUT) != 0) nbd_aio_notify_write (nbd); - /* If we can issue another request, do so. */ - if (want_to_send && (fds[0].revents & POLLOUT) != 0 && - nbd_aio_is_ready (nbd)) { - offset = rand () % (EXPORTSIZE - sizeof buf); - cmd = rand () & 1; - if (cmd == 0) { - handle = nbd_aio_pwrite (nbd, buf, sizeof buf, offset, 0); - status->bytes_sent += sizeof buf; - } - else { - handle = nbd_aio_pread (nbd, buf, sizeof buf, offset, 0); - status->bytes_received += sizeof buf; - } - if (handle == -1) { - fprintf (stderr, "%s\n", nbd_get_error ()); - goto error; - } - handles[in_flight] = handle; - in_flight++; - if (in_flight > status->most_in_flight) - status->most_in_flight = in_flight; - } - /* If a command is ready to retire, retire it. */ for (i = 0; i < in_flight; ++i) { r = nbd_aio_command_completed (nbd, handles[i]); diff --git a/tests/aio-parallel-tls.sh b/tests/aio-parallel-tls.sh index d60d3fa..c0e5e91 100755 --- a/tests/aio-parallel-tls.sh +++ b/tests/aio-parallel-tls.sh @@ -20,5 +20,5 @@ nbdkit -U - --tls=require --tls-verify-peer --tls-psk=keys.psk \ --filter=cow \ - pattern size=8M \ + pattern size=64M \ --run '$VG ./aio-parallel-tls $unixsocket' diff --git a/tests/aio-parallel.c b/tests/aio-parallel.c index d3217a8..a9b0fd9 100644 --- a/tests/aio-parallel.c +++ b/tests/aio-parallel.c @@ -41,7 +41,7 @@ static char *ramdisk; #define BUFFERSIZE 16384 /* This is also defined in aio-parallel.sh and checked here. */ -#define EXPORTSIZE (8*1024*1024) +#define EXPORTSIZE (64*1024*1024) /* How long (seconds) that the test will run for. */ #define RUN_TIME 10 @@ -50,7 +50,7 @@ static char *ramdisk; #define NR_MULTI_CONN 8 /* Number of commands in flight per connection. */ -#define MAX_IN_FLIGHT 16 +#define MAX_IN_FLIGHT 64 #if BUFFERSIZE >= EXPORTSIZE / NR_MULTI_CONN / MAX_IN_FLIGHT #error "EXPORTSIZE too small" @@ -201,7 +201,7 @@ start_thread (void *arg) size_t in_flight; /* counts number of requests in flight */ int dir, r, cmd; time_t t; - bool expired = false, want_to_send; + bool expired = false; for (i = 0; i < MAX_IN_FLIGHT; ++i) commands[status->i][i].offset = -1; @@ -254,37 +254,8 @@ start_thread (void *arg) break; } - /* Do we want to send another request and there's room to issue it - * and the connection is in the READY state so it can be used to - * issue a request. - */ - want_to_send - !expired && in_flight < MAX_IN_FLIGHT && nbd_aio_is_ready (nbd); - - fds[0].fd = nbd_aio_get_fd (nbd); - fds[0].events = want_to_send ? POLLOUT : 0; - fds[0].revents = 0; - dir = nbd_aio_get_direction (nbd); - if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0) - fds[0].events |= POLLIN; - if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) - fds[0].events |= POLLOUT; - - if (poll (fds, 1, -1) == -1) { - perror ("poll"); - goto error; - } - - if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 && - (fds[0].revents & POLLIN) != 0) - nbd_aio_notify_read (nbd); - else if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0 && - (fds[0].revents & POLLOUT) != 0) - nbd_aio_notify_write (nbd); - /* If we can issue another request, do so. */ - if (want_to_send && (fds[0].revents & POLLOUT) != 0 && - nbd_aio_is_ready (nbd)) { + while (!expired && in_flight < MAX_IN_FLIGHT) { /* Find a free command slot. */ for (i = 0; i < MAX_IN_FLIGHT; ++i) if (commands[status->i][i].offset == -1) @@ -316,6 +287,27 @@ start_thread (void *arg) status->most_in_flight = in_flight; } + fds[0].fd = nbd_aio_get_fd (nbd); + fds[0].events = 0; + fds[0].revents = 0; + dir = nbd_aio_get_direction (nbd); + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0) + fds[0].events |= POLLIN; + if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) + fds[0].events |= POLLOUT; + + if (poll (fds, 1, -1) == -1) { + perror ("poll"); + goto error; + } + + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 && + (fds[0].revents & POLLIN) != 0) + nbd_aio_notify_read (nbd); + else if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0 && + (fds[0].revents & POLLOUT) != 0) + nbd_aio_notify_write (nbd); + /* If a command is ready to retire, retire it. */ for (i = 0; i < MAX_IN_FLIGHT; ++i) { if (commands[status->i][i].offset >= 0) { diff --git a/tests/aio-parallel.sh b/tests/aio-parallel.sh index 40cf794..fd0c5ce 100755 --- a/tests/aio-parallel.sh +++ b/tests/aio-parallel.sh @@ -20,5 +20,5 @@ nbdkit -U - \ --filter=cow \ - pattern size=8M \ + pattern size=64M \ --run '$VG ./aio-parallel $unixsocket' -- 2.21.0
Richard W.M. Jones
2019-Jun-04 09:59 UTC
[Libguestfs] [PATCH libnbd v2 2/4] generator: Callback returns int instead of void.
Callback functions now return an int instead of a void. This allows in some cases for the callback to indicate that there was an error. This is a small change to the API: For nbd_set_debug_callback the signature has changed, but the return value is ignored. For nbd_block_status and nbd_aio_block_status the extent function can return an error, which causes the block status command to return an error. Unfortunately this causes us to set the state to dead, although with more effort we could recover from this. Because of this behaviour I didn't document what happens in the error case as we may want to change it in future. For Python and OCaml bindings, raising any kind of exception from the callback is equivalent to returning -1 from the C callback. --- generator/generator | 36 +++++++++++++++++++++-------- generator/states-reply-structured.c | 12 +++++++--- interop/dirty-bitmap.c | 4 +++- lib/debug.c | 3 ++- lib/internal.h | 4 ++-- tests/meta-base-allocation.c | 10 ++++---- 6 files changed, 48 insertions(+), 21 deletions(-) diff --git a/generator/generator b/generator/generator index 4c895e8..ff6075d 100755 --- a/generator/generator +++ b/generator/generator @@ -830,7 +830,7 @@ and arg written by the function *) | BytesPersistIn of string * string (* same as above, but buffer persists *) | BytesPersistOut of string * string -| Callback of string * arg list (* callback function returning void *) +| Callback of string * arg list (* callback function returning int *) | CallbackPersist of string * arg list (* as above, but callback persists *) | Flags of string (* NBD_CMD_FLAG_* flags *) | Int of string (* small int *) @@ -2680,7 +2680,7 @@ let rec print_c_arg_list ?(handle = false) args | BytesPersistOut (n, len) -> pr "void *%s, size_t %s" n len | Callback (n, args) | CallbackPersist (n, args) -> - pr "void (*%s) " n; print_c_arg_list args + pr "int (*%s) " n; print_c_arg_list args | Flags n -> pr "uint32_t %s" n | Int n -> pr "int %s" n | Int64 n -> pr "int64_t %s" n @@ -3146,11 +3146,12 @@ let print_python_binding name { args; ret } pr " PyObject *data;\n"; pr "};\n"; pr "\n"; - pr "static void\n"; + pr "static int\n"; pr "%s_%s_wrapper " name cb_name; print_c_arg_list args; pr "\n"; pr "{\n"; + pr " int ret;\n"; pr " PyGILState_STATE py_save = PyGILState_UNLOCKED;\n"; pr " PyObject *py_args, *py_ret;\n"; List.iter ( @@ -3213,10 +3214,14 @@ let print_python_binding name { args; ret } pr "\n"; pr " Py_DECREF (py_args);\n"; pr "\n"; - pr " if (py_ret != NULL)\n"; + pr " if (py_ret != NULL) {\n"; + pr " ret = 0;\n"; pr " Py_DECREF (py_ret); /* return value is discarded */\n"; - pr " else\n"; + pr " }\n"; + pr " else {\n"; + pr " ret = -1;\n"; pr " PyErr_PrintEx (0); /* print exception */\n"; + pr " };\n"; pr "\n"; List.iter ( function @@ -3232,6 +3237,7 @@ let print_python_binding name { args; ret } | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _ | UInt _ | UInt32 _ -> assert false ) args; + pr " return ret;\n"; pr "}\n"; pr "\n" | _ -> () @@ -3917,7 +3923,7 @@ let print_ocaml_binding (name, { args; ret }) | UInt _ | UInt32 _ -> assert false ) args in - pr "static void\n"; + pr "static int\n"; pr "%s_%s_wrapper_locked " name cb_name; print_c_arg_list args; pr "\n"; @@ -3957,24 +3963,34 @@ let print_ocaml_binding (name, { args; ret }) pr " rv = caml_callbackN_exn (fnv, %d, args);\n" (List.length argnames); - pr " if (Is_exception_result (rv))\n"; + pr " if (Is_exception_result (rv)) {\n"; + pr " /* XXX This is not really an error as callbacks can return\n"; + pr " * an error indication. But perhaps we should direct this\n"; + pr " * to a more suitable place or formalize what exception.\n"; + pr " * means error versus unexpected failure.\n"; + pr " */\n"; pr " fprintf (stderr,\n"; pr " \"libnbd: uncaught OCaml exception: %%s\",\n"; pr " caml_format_exception (Extract_exception (rv)));\n"; + pr " CAMLreturnT (int, -1);\n"; + pr " }\n"; pr "\n"; - pr " CAMLreturn0;\n"; + pr " CAMLreturnT (int, 0);\n"; pr "}\n"; pr "\n"; - pr "static void\n"; + pr "static int\n"; pr "%s_%s_wrapper " name cb_name; print_c_arg_list args; pr "\n"; pr "{\n"; + pr " int ret;\n"; + pr "\n"; pr " caml_leave_blocking_section ();\n"; let c_argnames = List.flatten (List.map name_of_arg args) in - pr " %s_%s_wrapper_locked (%s);\n" name cb_name + pr " ret = %s_%s_wrapper_locked (%s);\n" name cb_name (String.concat ", " c_argnames); pr " caml_enter_blocking_section ();\n"; + pr " return ret;\n"; pr "}\n" | _ -> () ) args; diff --git a/generator/states-reply-structured.c b/generator/states-reply-structured.c index e6c1a8a..c835713 100644 --- a/generator/states-reply-structured.c +++ b/generator/states-reply-structured.c @@ -369,10 +369,16 @@ if (context_id == meta_context->context_id) break; - if (meta_context) + if (meta_context) { /* Call the caller's extent function. */ - cmd->extent_fn (cmd->data, meta_context->name, cmd->offset, - &h->bs_entries[1], (length-4) / 4); + if (cmd->extent_fn (cmd->data, meta_context->name, cmd->offset, + &h->bs_entries[1], (length-4) / 4) == -1) { + SET_NEXT_STATE (%.DEAD); /* XXX We should be able to recover. */ + if (errno == 0) errno = EPROTO; + set_error (errno, "extent function failed"); + return -1; + } + } else /* Emit a debug message, but ignore it. */ debug (h, "server sent unexpected meta context ID %" PRIu32, diff --git a/interop/dirty-bitmap.c b/interop/dirty-bitmap.c index b3a89d0..8d34173 100644 --- a/interop/dirty-bitmap.c +++ b/interop/dirty-bitmap.c @@ -32,7 +32,7 @@ static const char *base_allocation = "base:allocation"; static int calls; /* Track which contexts passed through callback */ -static void +static int cb (void *data, const char *metacontext, uint64_t offset, uint32_t *entries, size_t len) { @@ -71,6 +71,8 @@ cb (void *data, const char *metacontext, uint64_t offset, fprintf (stderr, "unexpected context %s\n", metacontext); exit (EXIT_FAILURE); } + + return 0; } int diff --git a/lib/debug.c b/lib/debug.c index 8e2ff76..02d49f7 100644 --- a/lib/debug.c +++ b/lib/debug.c @@ -41,7 +41,7 @@ nbd_unlocked_get_debug (struct nbd_handle *h) int nbd_unlocked_set_debug_callback (struct nbd_handle *h, void *data, - void (*debug_fn) (void *, const char *, const char *)) + int (*debug_fn) (void *, const char *, const char *)) { h->debug_fn = debug_fn; h->debug_data = data; @@ -75,6 +75,7 @@ nbd_internal_debug (struct nbd_handle *h, const char *fs, ...) goto out; if (h->debug_fn) + /* ignore return value */ h->debug_fn (h->debug_data, context, msg); else fprintf (stderr, "libnbd: debug: %s: %s\n", context ? : "unknown", msg); diff --git a/lib/internal.h b/lib/internal.h index c8e5094..c1a57ac 100644 --- a/lib/internal.h +++ b/lib/internal.h @@ -75,7 +75,7 @@ struct nbd_handle { /* For debugging. */ bool debug; void *debug_data; - void (*debug_fn) (void *, const char *, const char *); + int (*debug_fn) (void *, const char *, const char *); /* Linked list of close callbacks. */ struct close_callback *close_callbacks; @@ -212,7 +212,7 @@ struct socket { const struct socket_ops *ops; }; -typedef void (*extent_fn) (void *data, const char *metacontext, uint64_t offset, uint32_t *entries, size_t nr_entries); +typedef int (*extent_fn) (void *data, const char *metacontext, uint64_t offset, uint32_t *entries, size_t nr_entries); struct command_in_flight { struct command_in_flight *next; diff --git a/tests/meta-base-allocation.c b/tests/meta-base-allocation.c index 71f2afc..b00aa50 100644 --- a/tests/meta-base-allocation.c +++ b/tests/meta-base-allocation.c @@ -28,9 +28,9 @@ #include <libnbd.h> -static void check_extent (void *data, const char *metacontext, - uint64_t offset, - uint32_t *entries, size_t nr_entries); +static int check_extent (void *data, const char *metacontext, + uint64_t offset, + uint32_t *entries, size_t nr_entries); int main (int argc, char *argv[]) @@ -125,7 +125,7 @@ main (int argc, char *argv[]) exit (EXIT_SUCCESS); } -static void +static int check_extent (void *data, const char *metacontext, uint64_t offset, uint32_t *entries, size_t nr_entries) @@ -173,4 +173,6 @@ check_extent (void *data, const char *metacontext, else fprintf (stderr, "warning: ignored unexpected meta context %s\n", metacontext); + + return 0; } -- 2.21.0
Richard W.M. Jones
2019-Jun-04 09:59 UTC
[Libguestfs] [PATCH libnbd v2 3/4] api: Implement concurrent writer.
Callers may now optionally set up a concurrent writer thread. The outline of this idea is explained here: https://www.redhat.com/archives/libguestfs/2019-June/msg00010.html The change is quite small, but here are some points which are true but may not be obvious: * All writes return immediately with success (unless we run out of memory), and writes never block. * When going down the READY -> ISSUE_COMMAND.START (etc) path, because writes never block, we never stop for a read notification, so we always send all commands in the issue queue before starting to read any replies. (However this only means that the raw commands are enqueued in the writer thread, which can still take its merry time writing the requests to the socket.) * Commands can be counted as "in flight" when they haven't been written to the socket yet. This is another reason for increasing the in flight limits. Does this matter? Probably not. The kernel might also queue up packets before sending them, or they might be in flight across the internet or queued at the receiver. Nothing about "in flight" ever meant that the server has received and is processing those packets. * Even with this change, full parallelism isn't quite achievable. It's still possible to be in a state such as REPLY.STRUCTURED_REPLY.RECV_* waiting for an unusual fast writer / slow reader server. If you then decide that you want to send yet more commands then those commands will only be enqueued in the handle, not dispatched to the writer thread. To avoid this it is best to send as many commands as possible as soon as possible before entering poll, but to a certain extent this is unavoidable with having only one state machine. --- docs/libnbd.pod | 73 +++++++++++++++++++++++++++++++++++++++++++++ generator/generator | 29 ++++++++++++++++++ lib/handle.c | 32 ++++++++++++++++++++ lib/internal.h | 7 +++++ lib/socket.c | 27 ++++++++++++++--- podwrapper.pl.in | 3 +- 6 files changed, 166 insertions(+), 5 deletions(-) diff --git a/docs/libnbd.pod b/docs/libnbd.pod index ede2539..07d259f 100644 --- a/docs/libnbd.pod +++ b/docs/libnbd.pod @@ -400,6 +400,79 @@ If you are issuing multiple in-flight requests (see above) and limiting the number, then the limit should be applied to each individual NBD connection. +=head2 Concurrent writer thread + +To achieve the maximum possible performance from libnbd and NBD +servers, as well as the above techniques you must also use a +concurrent writer thread. This feature allows requests to be issued +on the NBD socket at the same time that replies are being read from +the socket. In other words L<send(2)> and L<recv(2)> calls will be +running at the same time on the same socket from different threads. + +There is a full example using a concurrent writer available at +L<https://github.com/libguestfs/libnbd/blob/master/examples/concurrent-writer.c> + +To implement this, you change your ordinary AIO code in four ways: + +=over 4 + +=item 1. Call nbd_set_concurrent_writer + + struct writer_data { + struct nbd_handle *nbd; + /* other data here as required */ + } data; + + nbd_set_concurrent_writer (nbd, &data, writer); + +This function can be called on the handle at any time, either after +the handle is created or after the connection and handshaking has +completed. + +=item 2. Implement non-blocking writer callback + +C<writer> is a I<non-blocking> callback which enqueues the buffer into +a ring or similar FIFO structure: + + struct ring_item { + struct writer_data *data; + const void *buf; + size_t len; + }; + + void writer (void *data, const void *buf, size_t len) + { + struct ring_item item; + + /* add (data, buf, len) to a shared ring */ + item.data = data; + item.buf = malloc (len); + memcpy (item.buf, buf, len); + item.len = len; + ring_add (&item); + + writer_signal (); /* kick the writer thread */ + } + +=item 3. Implement writer thread + +You must also supply another thread which picks up data off the ring +and writes it to the socket (see C<nbd_aio_get_fd>). If there is an +error when writing to the socket, call C<nbd_concurrent_writer_error> +with the C<errno>. + +You have a choice of whether to implement one thread per nbd_handle or +one thread shared between all handles. + +=item 4. Modify main loop + +Finally your main loop can unconditionally call +C<nbd_aio_notify_write> when C<nbd_aio_get_direction> returns C<WRITE> +or C<BOTH> (since the concurrent thread can always enqueue more data +and so is always "ready to write"). + +=back + =head1 ENCRYPTION AND AUTHENTICATION The NBD protocol and libnbd supports TLS (sometimes incorrectly called diff --git a/generator/generator b/generator/generator index ff6075d..718e253 100755 --- a/generator/generator +++ b/generator/generator @@ -1094,6 +1094,35 @@ C<\"qemu:dirty-bitmap:...\"> for qemu-nbd (see qemu-nbd I<-B> option). See also C<nbd_block_status>."; }; + "set_concurrent_writer", { + default_call with + args = [ Opaque "data"; + CallbackPersist ("writer", [Opaque "data"; + BytesIn ("buf", "len")]) ]; + ret = RErr; + permitted_states = [ Created; Connecting; Connected ]; + shortdesc = "set a concurrent writer thread"; + longdesc = "\ +Provide an optional concurrent writer thread for better performance. +See L<libnbd(3)/Concurrent writer thread> for how to use this."; + }; + + "concurrent_writer_error", { + default_call with + args = [ Int "err" ]; ret = RErr; + shortdesc = "signal an error from the concurrent writer thread"; + longdesc = "\ +This can be called from the concurrent writer thread to signal +that there was an error writing to the socket. As there is no +way to recover from such errors, the connection will move to the +dead state soon after. + +The parameter is the C<errno> returned by the failed L<send(2)> call. +It must be non-zero. + +See L<libnbd(3)/Concurrent writer thread> for how to use this."; + }; + "connect_uri", { default_call with args = [ String "uri" ]; ret = RErr; diff --git a/lib/handle.c b/lib/handle.c index cc311ba..cc5d40f 100644 --- a/lib/handle.c +++ b/lib/handle.c @@ -215,6 +215,38 @@ nbd_add_close_callback (struct nbd_handle *h, nbd_close_callback cb, void *data) return ret; } +int +nbd_unlocked_set_concurrent_writer (struct nbd_handle *h, + void *data, writer_cb writer) +{ + /* I suppose we could allow this, but it seems more likely that + * it's an error rather than intentional. + */ + if (h->writer != NULL) { + set_error (EINVAL, "concurrent writer was already set for this handle"); + return -1; + } + + h->writer = writer; + h->writer_data = data; + return 0; +} + +int +nbd_unlocked_concurrent_writer_error (struct nbd_handle *h, int err) +{ + if (err != 0) { + set_error (EINVAL, "concurrent writer error parameter must be non-zero"); + return -1; + } + + /* Ignore second and subsequent calls, record only the first error. */ + if (h->writer_error == 0) + h->writer_error = err; + + return 0; +} + const char * nbd_unlocked_get_package_name (struct nbd_handle *h) { diff --git a/lib/internal.h b/lib/internal.h index c1a57ac..380302d 100644 --- a/lib/internal.h +++ b/lib/internal.h @@ -43,6 +43,8 @@ struct close_callback; struct socket; struct command_in_flight; +typedef int (*writer_cb) (void *data, const void *buf, size_t len); + struct nbd_handle { /* Lock protecting concurrent access to the handle. */ pthread_mutex_t lock; @@ -90,6 +92,11 @@ struct nbd_handle { /* The socket or a wrapper if using GnuTLS. */ struct socket *sock; + /* Writer callback if using concurrent writer. */ + void *writer_data; + writer_cb writer; + int writer_error; + /* Generic way to read into a buffer - set rbuf to point to a * buffer, rlen to the amount of data you expect, and in the state * machine call recv_into_rbuf. diff --git a/lib/socket.c b/lib/socket.c index f48e455..77a45cd 100644 --- a/lib/socket.c +++ b/lib/socket.c @@ -46,10 +46,29 @@ socket_send (struct nbd_handle *h, { ssize_t r; - r = send (sock->u.fd, buf, len, 0); - if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK) - set_error (errno, "send"); - return r; + if (!h->writer) { + r = send (sock->u.fd, buf, len, 0); + if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK) + set_error (errno, "send"); + return r; + } + else if (h->writer_error) { + /* Concurrent writer thread signaled an error earlier, so + * return that here. + */ + errno = h->writer_error; + set_error (errno, "concurrent writer thread error"); + return -1; + } + else { + /* Pass the buffer to the concurrent writer thread. */ + if (h->writer (h->writer_data, buf, len) == -1) { + if (errno == 0) errno = EIO; + set_error (errno, "submitting to concurrent writer thread"); + return -1; + } + return len; + } } static int diff --git a/podwrapper.pl.in b/podwrapper.pl.in index 2471807..ecff2d6 100755 --- a/podwrapper.pl.in +++ b/podwrapper.pl.in @@ -324,7 +324,8 @@ foreach (@lines) { die "$progname: $input: line too long:\n$_\n" if length $_ > 76 && substr ($_, 0, 1) ne ' ' && - ! m/https?:/; + ! m/https?:/ && + ! m/connected and finished handshaking/; } # Output man page. -- 2.21.0
Richard W.M. Jones
2019-Jun-04 09:59 UTC
[Libguestfs] [PATCH libnbd v2 4/4] examples: Add concurrent writer example.
--- .gitignore | 1 + examples/Makefile.am | 12 + examples/concurrent-writer.c | 450 +++++++++++++++++++++++++++++++++++ 3 files changed, 463 insertions(+) diff --git a/.gitignore b/.gitignore index 30438c1..e4dad91 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,7 @@ Makefile.in /docs/libnbd-api.3 /docs/libnbd-api.pod /examples/batched-read-write +/examples/concurrent-writer /examples/threaded-reads-and-writes /examples/simple-fetch-first-sector /examples/simple-reads-and-writes diff --git a/examples/Makefile.am b/examples/Makefile.am index b933873..b5f7e44 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -21,6 +21,7 @@ EXTRA_DIST = LICENSE-FOR-EXAMPLES noinst_PROGRAMS = \ batched-read-write \ + concurrent-writer \ simple-fetch-first-sector \ simple-reads-and-writes \ threaded-reads-and-writes @@ -54,6 +55,17 @@ threaded_reads_and_writes_LDADD = \ $(top_builddir)/lib/libnbd.la \ $(PTHREAD_LIBS) +concurrent_writer_SOURCES = \ + concurrent-writer.c +concurrent_writer_CPPFLAGS = \ + -I$(top_srcdir)/include +concurrent_writer_CFLAGS = \ + $(WARNINGS_CFLAGS) \ + $(PTHREAD_CFLAGS) +concurrent_writer_LDADD = \ + $(top_builddir)/lib/libnbd.la \ + $(PTHREAD_LIBS) + batched_read_write_SOURCES = \ batched-read-write.c batched_read_write_CPPFLAGS = \ diff --git a/examples/concurrent-writer.c b/examples/concurrent-writer.c new file mode 100644 index 0000000..06c9720 --- /dev/null +++ b/examples/concurrent-writer.c @@ -0,0 +1,450 @@ +/* Example usage with nbdkit: + * + * nbdkit -U - memory 100M --run './concurrent-writer $unixsocket' + * + * This will read and write randomly over the first megabyte of the + * plugin using multi-conn, multiple threads, multiple requests in + * flight on each connection, and concurrent writer threads. + * + * To run it against a remote server over TCP: + * + * ./concurrent-writer hostname port + * or + * ./concurrent-writer nbd://hostname:port + */ + +#include <stdio.h> +#include <stdlib.h> +#include <stdbool.h> +#include <stdint.h> +#include <string.h> +#include <unistd.h> +#include <errno.h> +#include <signal.h> +#include <poll.h> +#include <time.h> +#include <assert.h> + +#include <pthread.h> + +#include <libnbd.h> + +static int64_t exportsize; + +/* Number of simultaneous connections to the NBD server. The number + * of threads is NR_MULTI_CONN * 2 because there is one thread reading + * plus a concurrent writer thread. Note that some servers only + * support a limited number of simultaneous connections, and/or have a + * configurable thread pool internally, and if you exceed those limits + * then something will break. + */ +#define NR_MULTI_CONN 8 + +/* Number of commands that can be "in flight" at the same time on each + * connection. (Therefore the total number of requests in flight may + * be up to NR_MULTI_CONN * MAX_IN_FLIGHT). See libnbd(3) section + * "Issuing multiple in-flight requests". + */ +#define MAX_IN_FLIGHT 64 + +/* The size of reads and writes. */ +#define BUFFER_SIZE (1024*1024) + +/* Number of commands we issue (per thread). */ +#define NR_CYCLES 1000000 + +/* Reader thread. */ +struct thread_status { + size_t i; /* Thread index, 0 .. NR_MULTI_CONN-1 */ + int argc; /* Command line parameters. */ + char **argv; + int status; /* Return status. */ + unsigned requests; /* Total number of requests made. */ + unsigned most_in_flight; /* Most requests seen in flight. */ +}; + +static void *start_reader_thread (void *arg); + +int +main (int argc, char *argv[]) +{ + struct nbd_handle *nbd; + pthread_t threads[NR_MULTI_CONN]; + struct thread_status status[NR_MULTI_CONN]; + size_t i; + int err; + unsigned requests, most_in_flight, errors; + + srand (time (NULL)); + + if (argc < 2 || argc > 3) { + fprintf (stderr, "%s uri | socket | hostname port\n", argv[0]); + exit (EXIT_FAILURE); + } + + nbd = nbd_create (); + if (nbd == NULL) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + /* Connect first to check if the server supports writes and multi-conn. */ + if (argc == 2) { + if (strstr (argv[1], "://")) { + if (nbd_connect_uri (nbd, argv[1]) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + else if (nbd_connect_unix (nbd, argv[1]) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + else { + if (nbd_connect_tcp (nbd, argv[1], argv[2]) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + + exportsize = nbd_get_size (nbd); + if (exportsize == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + if (nbd_read_only (nbd) == 1) { + fprintf (stderr, "%s: error: this NBD export is read-only\n", argv[0]); + exit (EXIT_FAILURE); + } + +#if NR_MULTI_CONN > 1 + if (nbd_can_multi_conn (nbd) == 0) { + fprintf (stderr, "%s: error: " + "this NBD export does not support multi-conn\n", argv[0]); + exit (EXIT_FAILURE); + } +#endif + + nbd_close (nbd); + + /* Start the reader threads. */ + for (i = 0; i < NR_MULTI_CONN; ++i) { + status[i].i = i; + status[i].argc = argc; + status[i].argv = argv; + status[i].status = 0; + status[i].requests = 0; + status[i].most_in_flight = 0; + err = pthread_create (&threads[i], NULL, start_reader_thread, &status[i]); + if (err != 0) { + errno = err; + perror ("pthread_create"); + exit (EXIT_FAILURE); + } + } + + /* Wait for the threads to exit. */ + errors = 0; + requests = 0; + most_in_flight = 0; + for (i = 0; i < NR_MULTI_CONN; ++i) { + err = pthread_join (threads[i], NULL); + if (err != 0) { + errno = err; + perror ("pthread_join"); + exit (EXIT_FAILURE); + } + if (status[i].status != 0) { + fprintf (stderr, "thread %zu failed with status %d\n", + i, status[i].status); + errors++; + } + requests += status[i].requests; + if (status[i].most_in_flight > most_in_flight) + most_in_flight = status[i].most_in_flight; + } + + /* Make sure the number of requests that were required matches what + * we expect. + */ + assert (requests == NR_MULTI_CONN * NR_CYCLES); + + printf ("most requests seen in flight = %u (per thread) " + "vs MAX_IN_FLIGHT = %d\n", + most_in_flight, MAX_IN_FLIGHT); + + exit (errors == 0 ? EXIT_SUCCESS : EXIT_FAILURE); +} + +struct queue { + struct queue *next; + void *buf; + size_t len; +}; + +/* Concurrent writer thread (one per libnbd handle). */ +struct writer_data { + size_t i; /* Thread index, 0 .. NR_MULTI_CONN-1 */ + struct nbd_handle *nbd; /* NBD handle. */ + struct queue *q, *q_end; /* Queue of items to write. */ + pthread_mutex_t q_lock; /* Lock on queue. */ + pthread_cond_t q_cond; /* Condition on queue. */ +}; + +static void *start_writer_thread (void *arg); +static int writer (void *data, const void *buf, size_t len); + +static void * +start_reader_thread (void *arg) +{ + struct nbd_handle *nbd; + struct pollfd fds[1]; + struct thread_status *status = arg; + struct writer_data writer_data; + pthread_t writer_thread; + int err; + char *buf; + size_t i, j; + uint64_t offset, handle; + uint64_t handles[MAX_IN_FLIGHT]; + size_t in_flight; /* counts number of requests in flight */ + int dir, r, cmd; + + buf = malloc (BUFFER_SIZE); + if (buf == NULL) { + perror ("malloc"); + exit (EXIT_FAILURE); + } + + nbd = nbd_create (); + if (nbd == NULL) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + if (status->argc == 2) { + if (strstr (status->argv[1], "://")) { + if (nbd_connect_uri (nbd, status->argv[1]) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + else if (nbd_connect_unix (nbd, status->argv[1]) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + else { + if (nbd_connect_tcp (nbd, status->argv[1], status->argv[2]) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + + for (i = 0; i < sizeof buf; ++i) + buf[i] = rand (); + + /* Start the concurrent writer thread, one per handle. */ + writer_data.i = status->i; + writer_data.nbd = nbd; + writer_data.q = writer_data.q_end = NULL; + pthread_mutex_init (&writer_data.q_lock, NULL); + + err = pthread_create (&writer_thread, NULL, + start_writer_thread, &writer_data); + if (err != 0) { + errno = err; + perror ("pthread_create"); + exit (EXIT_FAILURE); + } + + if (nbd_set_concurrent_writer (nbd, &writer_data, writer) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + /* Issue commands. */ + in_flight = 0; + i = NR_CYCLES; + while (i > 0 || in_flight > 0) { + if (nbd_aio_is_dead (nbd) || nbd_aio_is_closed (nbd)) { + fprintf (stderr, "thread %zu: connection is dead or closed\n", + status->i); + goto error; + } + + /* If we can issue another request, do so. Note that we reuse the + * same buffer for multiple in-flight requests. It doesn't matter + * here because we're just trying to write random stuff, but that + * would be Very Bad in a real application. + */ + while (i > 0 && in_flight < MAX_IN_FLIGHT) { + offset = rand () % (exportsize - sizeof buf); + cmd = rand () & 1; + if (cmd == 0) + handle = nbd_aio_pwrite (nbd, buf, sizeof buf, offset, 0); + else + handle = nbd_aio_pread (nbd, buf, sizeof buf, offset, 0); + if (handle == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + goto error; + } + handles[in_flight] = handle; + i--; + in_flight++; + if (in_flight > status->most_in_flight) + status->most_in_flight = in_flight; + } + + fds[0].fd = nbd_aio_get_fd (nbd); + fds[0].events = 0; + fds[0].revents = 0; + + dir = nbd_aio_get_direction (nbd); + if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) { + /* The concurrent writer is always writable, we don't have to + * test the socket in poll. Since calling nbd_aio_notify_write + * can change the state, after doing it we must restart the + * loop. + */ + nbd_aio_notify_write (nbd); + continue; + } + + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0) + fds[0].events |= POLLIN; + + if (poll (fds, 1, -1) == -1) { + perror ("poll"); + goto error; + } + + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 && + (fds[0].revents & POLLIN) != 0) + nbd_aio_notify_read (nbd); + + /* If a command is ready to retire, retire it. */ + for (j = 0; j < in_flight; ++j) { + r = nbd_aio_command_completed (nbd, handles[j]); + if (r == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + goto error; + } + if (r) { + memmove (&handles[j], &handles[j+1], + sizeof (handles[0]) * (in_flight - j - 1)); + j--; + in_flight--; + status->requests++; + } + } + } + + if (nbd_shutdown (nbd) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + nbd_close (nbd); + + printf ("thread %zu: finished OK\n", status->i); + + free (buf); + status->status = 0; + pthread_exit (status); + + error: + free (buf); + fprintf (stderr, "thread %zu: failed\n", status->i); + status->status = -1; + pthread_exit (status); +} + +/* This runs in the reader thread and enqueues the data which will be + * picked up by the writer thread. + */ +static int +writer (void *data, const void *buf, size_t len) +{ + struct writer_data *writer_data = data; + struct queue *item; + + item = malloc (sizeof *item); + if (!item) return -1; + item->next = NULL; + item->buf = malloc (len); + if (item->buf == NULL) { + free (item); + return -1; + } + memcpy (item->buf, buf, len); + item->len = len; + + /* Enqueue the item and signal the writer thread. */ + pthread_mutex_lock (&writer_data->q_lock); + if (writer_data->q_end == NULL) + writer_data->q = writer_data->q_end = item; + else { + writer_data->q_end->next = item; + writer_data->q_end = item; + } + pthread_cond_signal (&writer_data->q_cond); + pthread_mutex_unlock (&writer_data->q_lock); + + return 0; +} + +static void * +start_writer_thread (void *arg) +{ + struct writer_data *writer_data = arg; + struct nbd_handle *nbd = writer_data->nbd; + struct queue *item; + int fd; + struct pollfd fds[1]; + ssize_t r; + void *p; + + fd = nbd_aio_get_fd (nbd); + if (fd == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + for (;;) { + /* Pick next job off the queue. */ + pthread_mutex_lock (&writer_data->q_lock); + while (writer_data->q == NULL) + pthread_cond_wait (&writer_data->q_cond, &writer_data->q_lock); + item = writer_data->q; + writer_data->q = item->next; + if (writer_data->q == NULL) + writer_data->q_end = NULL; + pthread_mutex_unlock (&writer_data->q_lock); + + p = item->buf; + while (item->len > 0) { + /* Wait for the socket to become ready to write. */ + fds[0].fd = fd; + fds[0].events = POLLOUT; + fds[0].revents = 0; + + if (poll (fds, 1, -1) == -1) goto error; + + r = send (fd, p, item->len, 0); + if (r == -1) goto error; + + p += r; + item->len -= r; + } + + free (item->buf); + free (item); + } + + error: + nbd_concurrent_writer_error (nbd, errno); + return NULL; +} -- 2.21.0
Eric Blake
2019-Jun-04 11:03 UTC
Re: [Libguestfs] [PATCH libnbd v2 1/4] examples, tests: Remove want_to_send / ready logic, increase limit on cmds in flight.
On 6/4/19 4:59 AM, Richard W.M. Jones wrote:> Since Eric's improvements to the command queue in commit 6af72b8 (and > following) there's now a queue of commands waiting to be issued stored > in the handle, and there's no need to issue commands only from the > ready state. We can therefore remove the want_to_send logic, queue as > many commands as possible, and don't need to test if the socket is > ready for POLLOUT. > > This commit also removes some misleading comments, improves the > documentation, and increases the limit on commands in flight (since > this limit is effectively in place to stop memory exhaustion, not > because of server limits). > --- > docs/libnbd.pod | 9 ++-- > examples/threaded-reads-and-writes.c | 65 +++++++++++----------------- > tests/aio-parallel-load.c | 56 ++++++++++-------------- > tests/aio-parallel-tls.sh | 2 +- > tests/aio-parallel.c | 58 +++++++++++-------------- > tests/aio-parallel.sh | 2 +- > 6 files changed, 83 insertions(+), 109 deletions(-) >ACK -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Richard W.M. Jones
2019-Jun-04 11:16 UTC
Re: [Libguestfs] [PATCH libnbd v2 3/4] api: Implement concurrent writer.
There are several races / deadlocks which I've thought about. Let's see if I can remember them all ... (1) This I experienced: nbd_aio_get_fd deadlocks if there are concurrent synchronous APIs going on. A typical case is where you set up the concurrent writer thread before connecting, and then call a synchronous connect function such as connect_tcp. The synchronous function grabs h->lock, then writes something, which eventually invokes the writer thread which calls nbd_aio_get_fd and deadlocks on h->lock. -> Probably the writer thread should be forbidden from using nbd_handle. (2) The writer thread calls nbd_aio_get_fd, but the fd returned might might be closed before we use it, resulting in either EBADF or worse using another fd that happens to be opened around the same time. -> I think the solution to this would be to allow the writer callback to signal that the socket is about to be closed (eg. add an extra flag parameter to the callback), which would kill the writer thread. (3) nbd_concurrent_writer_error could lose errors. This might happen if the socket is closed normally without writing anything, which would never check h->writer_error. (4) nbd_concurrent_writer_error possibly deadlocks too since it needs to grab h->lock. Basically the same as (1). Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-p2v converts physical machines to virtual machines. Boot with a live CD or over the network (PXE) and turn machines into KVM guests. http://libguestfs.org/virt-v2v
Eric Blake
2019-Jun-04 11:35 UTC
Re: [Libguestfs] [PATCH libnbd v2 2/4] generator: Callback returns int instead of void.
On 6/4/19 4:59 AM, Richard W.M. Jones wrote:> Callback functions now return an int instead of a void. This allows > in some cases for the callback to indicate that there was an error. > > This is a small change to the API:Indeed; and my work to let nbdkit-nbd use libnbd is slightly impacted. If I want to support both 0.1.2 and 0.1.x, I now have to do a conditional compilation (either based on a configure check or on hard-coded version information - we don't yet have a LIBNBD_VERSION macro in libnbd.h, but my recent addition LIBNBD_HAVE_NBD_SUPPORTS_URI can serve as a hack witness for 0.1.2 vs. later) where I declare a typedef to the two different function signatures, then call nbd_aio_block_status(..., (type)myfunc) where the cast to (type) is a no-op for 0.1.x and casts the return type of int to void for 0.1.2. On the other hand, changing my nbdkit-nbd patches to use pkg-config to require 0.1.3 or newer is also easy, at which point I don't have to worry about back-compat to the older API, and no one else is going to try to be compatible across all our various earlier pre-stable API. I'll leave it up to you when to actually bump the release version, but it looks like I won't be committing my changes to nbdkit-nbd until 0.1.3 is actually cut.> > For nbd_set_debug_callback the signature has changed, but the return > value is ignored. > > For nbd_block_status and nbd_aio_block_status the extent function can > return an error, which causes the block status command to return an > error. Unfortunately this causes us to set the state to dead, > although with more effort we could recover from this. Because of this > behaviour I didn't document what happens in the error case as we may > want to change it in future.I can give a shot on top of this patch for recovery (my idea: record the error, then continue to accept additional reply chunks from the server until the final chunk, and merely skip calling the callback when an earlier callback error was recorded).> > For Python and OCaml bindings, raising any kind of exception from the > callback is equivalent to returning -1 from the C callback. > ---> @@ -3957,24 +3963,34 @@ let print_ocaml_binding (name, { args; ret }) > > pr " rv = caml_callbackN_exn (fnv, %d, args);\n" > (List.length argnames); > - pr " if (Is_exception_result (rv))\n"; > + pr " if (Is_exception_result (rv)) {\n"; > + pr " /* XXX This is not really an error as callbacks can return\n"; > + pr " * an error indication. But perhaps we should direct this\n"; > + pr " * to a more suitable place or formalize what exception.\n";Spurious trailing '.'> + pr " * means error versus unexpected failure.\n"; > + pr " */\n";> +++ b/generator/states-reply-structured.c > @@ -369,10 +369,16 @@ > if (context_id == meta_context->context_id) > break; > > - if (meta_context) > + if (meta_context) { > /* Call the caller's extent function. */ > - cmd->extent_fn (cmd->data, meta_context->name, cmd->offset, > - &h->bs_entries[1], (length-4) / 4); > + if (cmd->extent_fn (cmd->data, meta_context->name, cmd->offset, > + &h->bs_entries[1], (length-4) / 4) == -1) { > + SET_NEXT_STATE (%.DEAD); /* XXX We should be able to recover. */ > + if (errno == 0) errno = EPROTO; > + set_error (errno, "extent function failed"); > + return -1; > + } > + }If nothing else, our testsuite ought to provoke callback failure at least once.> else > /* Emit a debug message, but ignore it. */ > debug (h, "server sent unexpected meta context ID %" PRIu32, > diff --git a/interop/dirty-bitmap.c b/interop/dirty-bitmap.c > index b3a89d0..8d34173 100644 > --- a/interop/dirty-bitmap.c > +++ b/interop/dirty-bitmap.c > @@ -32,7 +32,7 @@ static const char *base_allocation = "base:allocation"; > > static int calls; /* Track which contexts passed through callback */ > > -static void > +static int > cb (void *data, const char *metacontext, uint64_t offset, > uint32_t *entries, size_t len) > { > @@ -71,6 +71,8 @@ cb (void *data, const char *metacontext, uint64_t offset, > fprintf (stderr, "unexpected context %s\n", metacontext); > exit (EXIT_FAILURE); > } > + > + return 0;In fact, having dirty-bitmap be the test to provoke failure, where we are expecting more than one context and can thus also prove that failure on the first context prevents the callback for the second context from being reached, seems like the right place. I'll see what it looks like on top of your patch. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Eric Blake
2019-Jun-04 13:15 UTC
Re: [Libguestfs] [PATCH libnbd v2 3/4] api: Implement concurrent writer.
On 6/4/19 4:59 AM, Richard W.M. Jones wrote:> Callers may now optionally set up a concurrent writer thread. The > outline of this idea is explained here: > > https://www.redhat.com/archives/libguestfs/2019-June/msg00010.html > > The change is quite small, but here are some points which are true but > may not be obvious: > > * All writes return immediately with success (unless we run out of > memory), and writes never block. > > * When going down the READY -> ISSUE_COMMAND.START (etc) path, because > writes never block, we never stop for a read notification, so we > always send all commands in the issue queue before starting to read > any replies. (However this only means that the raw commands are > enqueued in the writer thread, which can still take its merry time > writing the requests to the socket.) > > * Commands can be counted as "in flight" when they haven't been > written to the socket yet. This is another reason for increasing > the in flight limits. Does this matter? Probably not. The kernel > might also queue up packets before sending them, or they might be in > flight across the internet or queued at the receiver. Nothing about > "in flight" ever meant that the server has received and is > processing those packets. > > * Even with this change, full parallelism isn't quite achievable. > It's still possible to be in a state such as > REPLY.STRUCTURED_REPLY.RECV_* waiting for an unusual fast writer / > slow reader server. If you then decide that you want to send yetOr more likely, when NBD_CMD_READ is so large that the server can't send it all in TCP window, and therefore we are guaranteed to have to wait for POLLIN between reading the first few packets of the chunk to clear up the server's ability to send, and before the server's second half of the chunk arrives. As long as we are waiting in REPLY.STRUCTURED_REPLY.RECV_OFFSET_DATA or REPLY.SIMPLE_REPLY.RECV_READ_PAYLOAD, we are locking out our ability to send more commands unless we rework the state machine a bit.> more commands then those commands will only be enqueued in the > handle, not dispatched to the writer thread. To avoid this it is > best to send as many commands as possible as soon as possible before > entering poll, but to a certain extent this is unavoidable with > having only one state machine.If the state machine can make decisions based on whether there is a writer callback, perhaps we can allow more states to (conditionally) process a pending request immediately, any time we are otherwise blocked waiting for POLLIN. I'll have to think more about this.> --- > docs/libnbd.pod | 73 +++++++++++++++++++++++++++++++++++++++++++++ > generator/generator | 29 ++++++++++++++++++ > lib/handle.c | 32 ++++++++++++++++++++ > lib/internal.h | 7 +++++ > lib/socket.c | 27 ++++++++++++++--- > podwrapper.pl.in | 3 +- > 6 files changed, 166 insertions(+), 5 deletions(-) > > diff --git a/docs/libnbd.pod b/docs/libnbd.pod > index ede2539..07d259f 100644 > --- a/docs/libnbd.pod > +++ b/docs/libnbd.pod > @@ -400,6 +400,79 @@ If you are issuing multiple in-flight requests (see above) and > limiting the number, then the limit should be applied to each > individual NBD connection. > > +=head2 Concurrent writer thread > + > +To achieve the maximum possible performance from libnbd and NBD > +servers, as well as the above techniques you must also use a > +concurrent writer thread. This feature allows requests to be issued > +on the NBD socket at the same time that replies are being read from > +the socket. In other words L<send(2)> and L<recv(2)> calls will be > +running at the same time on the same socket from different threads. > + > +There is a full example using a concurrent writer available at > +L<https://github.com/libguestfs/libnbd/blob/master/examples/concurrent-writer.c> > + > +To implement this, you change your ordinary AIO code in four ways: > + > +=over 4 > + > +=item 1. Call nbd_set_concurrent_writer > + > + struct writer_data { > + struct nbd_handle *nbd; > + /* other data here as required */ > + } data; > + > + nbd_set_concurrent_writer (nbd, &data, writer); > + > +This function can be called on the handle at any time, either after > +the handle is created or after the connection and handshaking has > +completed. > + > +=item 2. Implement non-blocking writer callback > + > +C<writer> is a I<non-blocking> callback which enqueues the buffer into > +a ring or similar FIFO structure: > + > + struct ring_item { > + struct writer_data *data; > + const void *buf; > + size_t len; > + }; > + > + void writer (void *data, const void *buf, size_t len)Update to cover the int return type, and whether we add a flags argument.> + { > + struct ring_item item; > + > + /* add (data, buf, len) to a shared ring */ > + item.data = data; > + item.buf = malloc (len); > + memcpy (item.buf, buf, len); > + item.len = len; > + ring_add (&item); > + > + writer_signal (); /* kick the writer thread */Update to document immediate error return (ENOMEM)> + } > + > +=item 3. Implement writer thread > + > +You must also supply another thread which picks up data off the ring > +and writes it to the socket (see C<nbd_aio_get_fd>). If there is an > +error when writing to the socket, call C<nbd_concurrent_writer_error> > +with the C<errno>. > + > +You have a choice of whether to implement one thread per nbd_handle or > +one thread shared between all handles. > + > +=item 4. Modify main loop > + > +Finally your main loop can unconditionally call > +C<nbd_aio_notify_write> when C<nbd_aio_get_direction> returns C<WRITE> > +or C<BOTH> (since the concurrent thread can always enqueue more data > +and so is always "ready to write").Should it likewise unconditionally poll for POLLIN, even if aio_get_direction does not currently request reads (in the case where our atomic read of the current state spots a transient condition by some other thread progressing through emitting a request)? Or are we trying to beef up the state machine so that h->state never exposes transient states to other threads?> + > +=back > + > =head1 ENCRYPTION AND AUTHENTICATION > > The NBD protocol and libnbd supports TLS (sometimes incorrectly called > diff --git a/generator/generator b/generator/generator > index ff6075d..718e253 100755 > --- a/generator/generator > +++ b/generator/generator > @@ -1094,6 +1094,35 @@ C<\"qemu:dirty-bitmap:...\"> for qemu-nbd > (see qemu-nbd I<-B> option). See also C<nbd_block_status>."; > }; > > + "set_concurrent_writer", { > + default_call with > + args = [ Opaque "data"; > + CallbackPersist ("writer", [Opaque "data"; > + BytesIn ("buf", "len")]) ]; > + ret = RErr; > + permitted_states = [ Created; Connecting; Connected ]; > + shortdesc = "set a concurrent writer thread"; > + longdesc = "\ > +Provide an optional concurrent writer thread for better performance. > +See L<libnbd(3)/Concurrent writer thread> for how to use this."; > + }; > + > + "concurrent_writer_error", { > + default_call with > + args = [ Int "err" ]; ret = RErr; > + shortdesc = "signal an error from the concurrent writer thread"; > + longdesc = "\ > +This can be called from the concurrent writer thread to signal > +that there was an error writing to the socket. As there is no > +way to recover from such errors, the connection will move to the > +dead state soon after. > + > +The parameter is the C<errno> returned by the failed L<send(2)> call. > +It must be non-zero. > + > +See L<libnbd(3)/Concurrent writer thread> for how to use this."; > + };Do we also need a function for the writer thread to call when it is confirming that the writer callback was passed a flag stating that no further writes are needed? I'm trying to figure out if nbd_shutdown/nbd_close should wait for acknowledgment that the writer thread has reached clean shutdown; it may especially matter for clean TLS shutdown. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Eric Blake
2019-Jun-04 13:46 UTC
Re: [Libguestfs] [PATCH libnbd v2 4/4] examples: Add concurrent writer example.
On 6/4/19 4:59 AM, Richard W.M. Jones wrote:> --- > .gitignore | 1 + > examples/Makefile.am | 12 + > examples/concurrent-writer.c | 450 +++++++++++++++++++++++++++++++++++ > 3 files changed, 463 insertions(+) >> @@ -0,0 +1,450 @@ > +/* Example usage with nbdkit: > + * > + * nbdkit -U - memory 100M --run './concurrent-writer $unixsocket' > + * > + * This will read and write randomly over the first megabyte of theStale comment.> + * plugin using multi-conn, multiple threads, multiple requests in > + * flight on each connection, and concurrent writer threads. > + * > + * To run it against a remote server over TCP: > + * > + * ./concurrent-writer hostname port > + * or > + * ./concurrent-writer nbd://hostname:port > + */ > +> + > +/* Number of simultaneous connections to the NBD server. The number > + * of threads is NR_MULTI_CONN * 2 because there is one thread reading > + * plus a concurrent writer thread. Note that some servers only > + * support a limited number of simultaneous connections, and/or have a > + * configurable thread pool internally, and if you exceed those limits > + * then something will break.Possibly stale comment. More likely, you'll reach a point of diminishing returns.> + > + /* Make sure the number of requests that were required matches what > + * we expect. > + */ > + assert (requests == NR_MULTI_CONN * NR_CYCLES); > + > + printf ("most requests seen in flight = %u (per thread) " > + "vs MAX_IN_FLIGHT = %d\n", > + most_in_flight, MAX_IN_FLIGHT);Now that we queue commands without regards to the server receiving them, this should always equal MAX_IN_FLIGHT. But it doesn't hurt to print it to still check.> + > + exit (errors == 0 ? EXIT_SUCCESS : EXIT_FAILURE); > +} > + > +struct queue { > + struct queue *next; > + void *buf; > + size_t len; > +}; > + > +/* Concurrent writer thread (one per libnbd handle). */ > +struct writer_data { > + size_t i; /* Thread index, 0 .. NR_MULTI_CONN-1 */ > + struct nbd_handle *nbd; /* NBD handle. */ > + struct queue *q, *q_end; /* Queue of items to write. */ > + pthread_mutex_t q_lock; /* Lock on queue. */ > + pthread_cond_t q_cond; /* Condition on queue. */I'm half-wondering if we could use sem_t instead of pthread_cond_t for the same effect, and if it would have any noticeable timing differences. But that should be a separate experiment on top of this one.> +}; > + > +static void *start_writer_thread (void *arg); > +static int writer (void *data, const void *buf, size_t len); > + > +static void * > +start_reader_thread (void *arg) > +{> + > + dir = nbd_aio_get_direction (nbd); > + if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) { > + /* The concurrent writer is always writable, we don't have to > + * test the socket in poll. Since calling nbd_aio_notify_write > + * can change the state, after doing it we must restart the > + * loop. > + */ > + nbd_aio_notify_write (nbd); > + continue; > + }I'm still not convinced whether we can ever see DIRECTION_WRITE, but agree that leaving this in for safety doesn't hurt.> + > + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0) > + fds[0].events |= POLLIN; > +Should this ALWAYS look for POLLIN, rather than just checking DIRECTION_READ? I'm worried that we might deadlock if the poll() is called with fds[0].events == 0 because we managed to sniff nbd_aio_get_direction() at a point in time where the state machine was transiently not in a state that blocks on read. For this example, the thread posting nbd_aio_pread is the same as the thread calling poll(), so I guess that shouldn't happen (it's more of a concern for my nbdkit-nbd usage of libnbd).> + if (poll (fds, 1, -1) == -1) { > + perror ("poll"); > + goto error; > + } > + > + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 && > + (fds[0].revents & POLLIN) != 0) > + nbd_aio_notify_read (nbd); > + > + /* If a command is ready to retire, retire it. */ > + for (j = 0; j < in_flight; ++j) { > + r = nbd_aio_command_completed (nbd, handles[j]); > + if (r == -1) { > + fprintf (stderr, "%s\n", nbd_get_error ()); > + goto error; > + } > + if (r) { > + memmove (&handles[j], &handles[j+1], > + sizeof (handles[0]) * (in_flight - j - 1)); > + j--; > + in_flight--; > + status->requests++; > + } > + } > + } > + > + if (nbd_shutdown (nbd) == -1) { > + fprintf (stderr, "%s\n", nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + > + nbd_close (nbd); > + > + printf ("thread %zu: finished OK\n", status->i); > +Still no cleanup of the writer thread.> + free (buf); > + status->status = 0; > + pthread_exit (status); > + > + error: > + free (buf); > + fprintf (stderr, "thread %zu: failed\n", status->i); > + status->status = -1; > + pthread_exit (status); > +} > + > +/* This runs in the reader thread and enqueues the data which will be > + * picked up by the writer thread. > + */ > +static int > +writer (void *data, const void *buf, size_t len) > +{May change if we introduce a flags parameter (per your other thread on potential races/deadlocks that you are noticing).> + struct writer_data *writer_data = data; > + struct queue *item; > + > + item = malloc (sizeof *item); > + if (!item) return -1; > + item->next = NULL; > + item->buf = malloc (len); > + if (item->buf == NULL) { > + free (item); > + return -1; > + } > + memcpy (item->buf, buf, len); > + item->len = len; > + > + /* Enqueue the item and signal the writer thread. */ > + pthread_mutex_lock (&writer_data->q_lock); > + if (writer_data->q_end == NULL) > + writer_data->q = writer_data->q_end = item; > + else { > + writer_data->q_end->next = item; > + writer_data->q_end = item; > + } > + pthread_cond_signal (&writer_data->q_cond); > + pthread_mutex_unlock (&writer_data->q_lock); > + > + return 0; > +} > + > +static void * > +start_writer_thread (void *arg) > +{ > + struct writer_data *writer_data = arg; > + struct nbd_handle *nbd = writer_data->nbd; > + struct queue *item; > + int fd; > + struct pollfd fds[1]; > + ssize_t r; > + void *p; > + > + fd = nbd_aio_get_fd (nbd); > + if (fd == -1) { > + fprintf (stderr, "%s\n", nbd_get_error ()); > + exit (EXIT_FAILURE); > + }You already mentioned the potential deadlock here if the writer thread is started before nbd_connect_*.> + > + for (;;) { > + /* Pick next job off the queue. */ > + pthread_mutex_lock (&writer_data->q_lock); > + while (writer_data->q == NULL) > + pthread_cond_wait (&writer_data->q_cond, &writer_data->q_lock); > + item = writer_data->q; > + writer_data->q = item->next; > + if (writer_data->q == NULL) > + writer_data->q_end = NULL; > + pthread_mutex_unlock (&writer_data->q_lock); > + > + p = item->buf; > + while (item->len > 0) { > + /* Wait for the socket to become ready to write. */ > + fds[0].fd = fd; > + fds[0].events = POLLOUT; > + fds[0].revents = 0; > + > + if (poll (fds, 1, -1) == -1) goto error; > + > + r = send (fd, p, item->len, 0); > + if (r == -1) goto error; > + > + p += r; > + item->len -= r; > + } > + > + free (item->buf); > + free (item); > + } > + > + error: > + nbd_concurrent_writer_error (nbd, errno);Potential use-after-free if the reader thread does not join this one before calling nbd_close() prior to this thread detecting that the fd is no longer poll-able. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Seemingly Similar Threads
- [PATCH libnbd discussion only 5/5] examples: Add concurrent writer example.
- [PATCH libnbd v2 1/4] examples, tests: Remove want_to_send / ready logic, increase limit on cmds in flight.
- [libnbd PATCH 1/6] api: Add nbd_aio_in_flight
- [PATCH libnbd v2 0/4] api: Implement concurrent writer.
- [PATCH libnbd discussion only 0/5] api: Implement concurrent writer.