Eric Blake
2017-Nov-17 03:26 UTC
[Libguestfs] [RFC nbdkit PATCH 0/6] Enable full parallel request handling
I want to make my nbd forwarding plugin fully parallel - but to do that, I first need to make nbdkit itself fully parallel ;) With this series, I was finally able to demonstrate out-of-order responses when using qemu-io (which is great at sending back-to-back requests prior to waiting for responses) coupled with the nbd file plugin (which has a great feature of rdelay and wdelay, to make it obvious whether processing occurs in parallel among separate threads or in serial). 'make check' sometimes succeeds with this series, but more often than not I was able to crash or hand on the test-socket-activation test; I've posted another series for the crashes, as well as analysis of the hang. Until the hang is solved, I'm leaving this series as RFC, but review is welcome to see if my approach is sound. Eric Blake (6): connections: Simplify handle_request() connections: Avoid plugin when shutdown is imminent connections: Add read/write lock over client I/O connections: Add thread-safe status indicator connections: Set up thread pool for handling client requests Add --threads option for supporting true parallel requests TODO | 7 -- docs/nbdkit.pod | 12 ++- nbdkit.in | 2 +- plugins/file/file.c | 2 +- src/connections.c | 271 +++++++++++++++++++++++++++++++++++++--------------- src/internal.h | 2 + src/main.c | 20 +++- src/plugins.c | 8 ++ 8 files changed, 237 insertions(+), 87 deletions(-) -- 2.13.6
Eric Blake
2017-Nov-17 03:26 UTC
[Libguestfs] [nbdkit PATCH 1/6] connections: Simplify handle_request()
_handle_request() was never returning anything but 0; and had to communicate the error value through a parameter by reference. As there is no return value that a plugin can use to declare the connection useless, we can consolidate this into just returning the error value. Meanwhile, we don't need a wrapper layer of handle_request() just to grab locks; so shuffle the naming a bit to reduce the lines of code. Signed-off-by: Eric Blake <eblake@redhat.com> --- src/connections.c | 55 ++++++++++++++++--------------------------------------- 1 file changed, 16 insertions(+), 39 deletions(-) diff --git a/src/connections.c b/src/connections.c index f3be63d..0ad252c 100644 --- a/src/connections.c +++ b/src/connections.c @@ -749,16 +749,14 @@ get_error (struct connection *conn) * check them again. 'buf' is either the data to be written or the * data to be returned, and points to a buffer of size 'count' bytes. * - * Only returns -1 if there is a fatal error and the connection cannot - * continue. - * - * On read/write errors, sets *error appropriately and returns 0. + * In all cases, the return value is the system errno value that will + * later be converted to the nbd error to send back to the client (0 + * for success). */ -static int -_handle_request (struct connection *conn, - uint32_t cmd, uint32_t flags, uint64_t offset, uint32_t count, - void *buf, - uint32_t *error) +static uint32_t +handle_request (struct connection *conn, + uint32_t cmd, uint32_t flags, uint64_t offset, uint32_t count, + void *buf) { bool flush_after_command; int r; @@ -776,40 +774,35 @@ _handle_request (struct connection *conn, case NBD_CMD_READ: r = plugin_pread (conn, buf, count, offset); if (r == -1) { - *error = get_error (conn); - return 0; + return get_error (conn); } break; case NBD_CMD_WRITE: r = plugin_pwrite (conn, buf, count, offset); if (r == -1) { - *error = get_error (conn); - return 0; + return get_error (conn); } break; case NBD_CMD_FLUSH: r = plugin_flush (conn); if (r == -1) { - *error = get_error (conn); - return 0; + return get_error (conn); } break; case NBD_CMD_TRIM: r = plugin_trim (conn, count, offset); if (r == -1) { - *error = get_error (conn); - return 0; + return get_error (conn); } break; case NBD_CMD_WRITE_ZEROES: r = plugin_zero (conn, count, offset, !(flags & NBD_CMD_FLAG_NO_HOLE)); if (r == -1) { - *error = get_error (conn); - return 0; + return get_error (conn); } break; @@ -820,8 +813,7 @@ _handle_request (struct connection *conn, if (flush_after_command) { r = plugin_flush (conn); if (r == -1) { - *error = get_error (conn); - return 0; + return get_error (conn); } } @@ -829,21 +821,6 @@ _handle_request (struct connection *conn, } static int -handle_request (struct connection *conn, - uint32_t cmd, uint32_t flags, uint64_t offset, uint32_t count, - void *buf, - uint32_t *error) -{ - int r; - - plugin_lock_request (conn); - r = _handle_request (conn, cmd, flags, offset, count, buf, error); - plugin_unlock_request (conn); - - return r; -} - -static int skip_over_write_buffer (int sock, size_t count) { char buf[BUFSIZ]; @@ -974,9 +951,9 @@ recv_request_send_reply (struct connection *conn) } /* Perform the request. Only this part happens inside the request lock. */ - r = handle_request (conn, cmd, flags, offset, count, buf, &error); - if (r == -1) - return -1; + plugin_lock_request (conn); + error = handle_request (conn, cmd, flags, offset, count, buf); + plugin_unlock_request (conn); /* Send the reply packet. */ send_reply: -- 2.13.6
Eric Blake
2017-Nov-17 03:26 UTC
[Libguestfs] [nbdkit PATCH 2/6] connections: Avoid plugin when shutdown is imminent
If nbdkit receives SIGINT while we are reading from the client, it's better to avoid calling into the plugin for any work that will further stall our response to the signal, and instead just immediately give the client an ESHUTDOWN error. Signed-off-by: Eric Blake <eblake@redhat.com> --- src/connections.c | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/connections.c b/src/connections.c index 0ad252c..dada9aa 100644 --- a/src/connections.c +++ b/src/connections.c @@ -951,9 +951,14 @@ recv_request_send_reply (struct connection *conn) } /* Perform the request. Only this part happens inside the request lock. */ - plugin_lock_request (conn); - error = handle_request (conn, cmd, flags, offset, count, buf); - plugin_unlock_request (conn); + if (quit) { + error = ESHUTDOWN; + } + else { + plugin_lock_request (conn); + error = handle_request (conn, cmd, flags, offset, count, buf); + plugin_unlock_request (conn); + } /* Send the reply packet. */ send_reply: -- 2.13.6
Eric Blake
2017-Nov-17 03:26 UTC
[Libguestfs] [nbdkit PATCH 3/6] connections: Add read/write lock over client I/O
In preparation for parallel processing, we need to be sure that two threads belonging to the same connection cannot interleave their I/O except at message boundaries. Add a mutex around all reads and writes that must occur as a group (for now, there is no contention for either mutex). Signed-off-by: Eric Blake <eblake@redhat.com> --- src/connections.c | 27 +++++++++++++++++++++++++-- 1 file changed, 25 insertions(+), 2 deletions(-) diff --git a/src/connections.c b/src/connections.c index dada9aa..dd43a9a 100644 --- a/src/connections.c +++ b/src/connections.c @@ -62,6 +62,8 @@ /* Connection structure. */ struct connection { pthread_mutex_t request_lock; + pthread_mutex_t read_lock; + pthread_mutex_t write_lock; void *handle; void *crypto_session; @@ -206,6 +208,8 @@ new_connection (int sockin, int sockout) conn->sockin = sockin; conn->sockout = sockout; pthread_mutex_init (&conn->request_lock, NULL); + pthread_mutex_init (&conn->read_lock, NULL); + pthread_mutex_init (&conn->write_lock, NULL); conn->recv = raw_recv; conn->send = raw_send; @@ -223,6 +227,8 @@ free_connection (struct connection *conn) conn->close (conn); pthread_mutex_destroy (&conn->request_lock); + pthread_mutex_destroy (&conn->read_lock); + pthread_mutex_destroy (&conn->write_lock); /* Don't call the plugin again if quit has been set because the main * thread will be in the process of unloading it. The plugin.unload @@ -888,19 +894,23 @@ recv_request_send_reply (struct connection *conn) CLEANUP_FREE char *buf = NULL; /* Read the request packet. */ + pthread_mutex_lock (&conn->read_lock); r = conn->recv (conn, &request, sizeof request); if (r == -1) { nbdkit_error ("read request: %m"); + pthread_mutex_unlock (&conn->read_lock); return -1; } if (r == 0) { debug ("client closed input socket, closing connection"); + pthread_mutex_unlock (&conn->read_lock); return 0; /* disconnect */ } magic = be32toh (request.magic); if (magic != NBD_REQUEST_MAGIC) { nbdkit_error ("invalid request: 'magic' field is incorrect (0x%x)", magic); + pthread_mutex_unlock (&conn->read_lock); return -1; } @@ -913,14 +923,18 @@ recv_request_send_reply (struct connection *conn) if (cmd == NBD_CMD_DISC) { debug ("client sent disconnect command, closing connection"); + pthread_mutex_unlock (&conn->read_lock); return 0; /* disconnect */ } /* Validate the request. */ if (!validate_request (conn, cmd, flags, offset, count, &error)) { if (cmd == NBD_CMD_WRITE && - skip_over_write_buffer (conn->sockin, count) < 0) + skip_over_write_buffer (conn->sockin, count) < 0) { + pthread_mutex_unlock (&conn->read_lock); return -1; + } + pthread_mutex_unlock (&conn->read_lock); goto send_reply; } @@ -931,8 +945,11 @@ recv_request_send_reply (struct connection *conn) perror ("malloc"); error = ENOMEM; if (cmd == NBD_CMD_WRITE && - skip_over_write_buffer (conn->sockin, count) < 0) + skip_over_write_buffer (conn->sockin, count) < 0) { + pthread_mutex_unlock (&conn->read_lock); return -1; + } + pthread_mutex_unlock (&conn->read_lock); goto send_reply; } } @@ -946,9 +963,11 @@ recv_request_send_reply (struct connection *conn) } if (r == -1) { nbdkit_error ("read data: %m"); + pthread_mutex_unlock (&conn->read_lock); return -1; } } + pthread_mutex_unlock (&conn->read_lock); /* Perform the request. Only this part happens inside the request lock. */ if (quit) { @@ -962,6 +981,7 @@ recv_request_send_reply (struct connection *conn) /* Send the reply packet. */ send_reply: + pthread_mutex_lock (&conn->write_lock); reply.magic = htobe32 (NBD_REPLY_MAGIC); reply.handle = request.handle; reply.error = htobe32 (nbd_errno (error)); @@ -978,6 +998,7 @@ recv_request_send_reply (struct connection *conn) r = conn->send (conn, &reply, sizeof reply); if (r == -1) { nbdkit_error ("write reply: %m"); + pthread_mutex_unlock (&conn->write_lock); return -1; } @@ -986,9 +1007,11 @@ recv_request_send_reply (struct connection *conn) r = conn->send (conn, buf, count); if (r == -1) { nbdkit_error ("write data: %m"); + pthread_mutex_unlock (&conn->write_lock); return -1; } } + pthread_mutex_unlock (&conn->write_lock); return 1; /* command processed ok */ } -- 2.13.6
Eric Blake
2017-Nov-17 03:26 UTC
[Libguestfs] [nbdkit PATCH 4/6] connections: Add thread-safe status indicator
Once we have multiple threads during parallel processing, we need to be sure that any I/O error flagged by one thread prevents the next thread from attempting I/O. Although we already have a separate lock for reads and writes, it's easier if status is shared by both actions, which needs yet another mutex; however we can optimize (via accessor functions) and only need to use the mutex if there are actually multiple threads at work. The next thing to notice is that because we now update status at all important points, the return value of _handle_single_connection() can just be the latest status, and recv_request_send_reply() can now return void. This will come in handy later as it will avoid trying to coordinate a value out of multiple threads. Signed-off-by: Eric Blake <eblake@redhat.com> --- src/connections.c | 99 ++++++++++++++++++++++++++++++++++++++----------------- 1 file changed, 69 insertions(+), 30 deletions(-) diff --git a/src/connections.c b/src/connections.c index dd43a9a..75d8884 100644 --- a/src/connections.c +++ b/src/connections.c @@ -64,8 +64,11 @@ struct connection { pthread_mutex_t request_lock; pthread_mutex_t read_lock; pthread_mutex_t write_lock; + pthread_mutex_t status_lock; + int status; /* 1 for more I/O with client, 0 for shutdown, -1 on error */ void *handle; void *crypto_session; + int nworkers; /* TODO set up a thread pool for parallel workers */ uint64_t exportsize; int readonly; @@ -83,7 +86,7 @@ struct connection { static struct connection *new_connection (int sockin, int sockout); static void free_connection (struct connection *conn); static int negotiate_handshake (struct connection *conn); -static int recv_request_send_reply (struct connection *conn); +static void recv_request_send_reply (struct connection *conn); /* Don't call these raw socket functions directly. Use conn->recv etc. */ static int raw_recv (struct connection *, void *buf, size_t len); @@ -146,40 +149,58 @@ connection_set_close (struct connection *conn, connection_close_function close) } static int +get_status (struct connection *conn) +{ + int r; + + if (conn->nworkers) + pthread_mutex_lock (&conn->status_lock); + r = conn->status; + if (conn->nworkers) + pthread_mutex_unlock (&conn->status_lock); + return r; +} + +/* Update the status if the new value is lower than the existing value. */ +static void +set_status (struct connection *conn, int value) +{ + if (conn->nworkers) + pthread_mutex_lock (&conn->status_lock); + if (value < conn->status) + conn->status = value; + if (conn->nworkers) + pthread_mutex_unlock (&conn->status_lock); +} + +static int _handle_single_connection (int sockin, int sockout) { - int r; + int r = -1; struct connection *conn = new_connection (sockin, sockout); if (!conn) - goto err; + goto done; if (plugin_open (conn, readonly) == -1) - goto err; + goto done; threadlocal_set_name (plugin_name ()); /* Handshake. */ if (negotiate_handshake (conn) == -1) - goto err; + goto done; /* Process requests. XXX Allow these to be dispatched in parallel using * a thread pool. */ - while (!quit) { - r = recv_request_send_reply (conn); - if (r == -1) - goto err; - if (r == 0) - break; - } + while (!quit && get_status (conn) > 0) + recv_request_send_reply (conn); + r = get_status (conn); + done: free_connection (conn); - return 0; - - err: - free_connection (conn); - return -1; + return r; } int @@ -205,11 +226,13 @@ new_connection (int sockin, int sockout) return NULL; } + conn->status = 1; conn->sockin = sockin; conn->sockout = sockout; pthread_mutex_init (&conn->request_lock, NULL); pthread_mutex_init (&conn->read_lock, NULL); pthread_mutex_init (&conn->write_lock, NULL); + pthread_mutex_init (&conn->status_lock, NULL); conn->recv = raw_recv; conn->send = raw_send; @@ -229,6 +252,7 @@ free_connection (struct connection *conn) pthread_mutex_destroy (&conn->request_lock); pthread_mutex_destroy (&conn->read_lock); pthread_mutex_destroy (&conn->write_lock); + pthread_mutex_destroy (&conn->status_lock); /* Don't call the plugin again if quit has been set because the main * thread will be in the process of unloading it. The plugin.unload @@ -883,7 +907,7 @@ nbd_errno (int error) } } -static int +static void recv_request_send_reply (struct connection *conn) { int r; @@ -895,23 +919,30 @@ recv_request_send_reply (struct connection *conn) /* Read the request packet. */ pthread_mutex_lock (&conn->read_lock); + if (get_status (conn) < 0) { + pthread_mutex_unlock (&conn->read_lock); + return; + } r = conn->recv (conn, &request, sizeof request); if (r == -1) { nbdkit_error ("read request: %m"); + set_status (conn, -1); pthread_mutex_unlock (&conn->read_lock); - return -1; + return; } if (r == 0) { debug ("client closed input socket, closing connection"); + set_status (conn, 0); pthread_mutex_unlock (&conn->read_lock); - return 0; /* disconnect */ + return; /* disconnect */ } magic = be32toh (request.magic); if (magic != NBD_REQUEST_MAGIC) { nbdkit_error ("invalid request: 'magic' field is incorrect (0x%x)", magic); + set_status (conn, -1); pthread_mutex_unlock (&conn->read_lock); - return -1; + return; } cmd = be32toh (request.type); @@ -923,16 +954,18 @@ recv_request_send_reply (struct connection *conn) if (cmd == NBD_CMD_DISC) { debug ("client sent disconnect command, closing connection"); + set_status (conn, 0); pthread_mutex_unlock (&conn->read_lock); - return 0; /* disconnect */ + return; /* disconnect */ } /* Validate the request. */ if (!validate_request (conn, cmd, flags, offset, count, &error)) { if (cmd == NBD_CMD_WRITE && skip_over_write_buffer (conn->sockin, count) < 0) { + set_status (conn, -1); pthread_mutex_unlock (&conn->read_lock); - return -1; + return; } pthread_mutex_unlock (&conn->read_lock); goto send_reply; @@ -946,8 +979,9 @@ recv_request_send_reply (struct connection *conn) error = ENOMEM; if (cmd == NBD_CMD_WRITE && skip_over_write_buffer (conn->sockin, count) < 0) { + set_status (conn, -1); pthread_mutex_unlock (&conn->read_lock); - return -1; + return; } pthread_mutex_unlock (&conn->read_lock); goto send_reply; @@ -963,14 +997,15 @@ recv_request_send_reply (struct connection *conn) } if (r == -1) { nbdkit_error ("read data: %m"); + set_status (conn, -1); pthread_mutex_unlock (&conn->read_lock); - return -1; + return; } } pthread_mutex_unlock (&conn->read_lock); /* Perform the request. Only this part happens inside the request lock. */ - if (quit) { + if (quit || !get_status (conn)) { error = ESHUTDOWN; } else { @@ -982,6 +1017,10 @@ recv_request_send_reply (struct connection *conn) /* Send the reply packet. */ send_reply: pthread_mutex_lock (&conn->write_lock); + if (get_status (conn) < 0) { + pthread_mutex_unlock (&conn->write_lock); + return; + } reply.magic = htobe32 (NBD_REPLY_MAGIC); reply.handle = request.handle; reply.error = htobe32 (nbd_errno (error)); @@ -999,7 +1038,8 @@ recv_request_send_reply (struct connection *conn) if (r == -1) { nbdkit_error ("write reply: %m"); pthread_mutex_unlock (&conn->write_lock); - return -1; + set_status (conn, -1); + return; } /* Send the read data buffer. */ @@ -1007,13 +1047,12 @@ recv_request_send_reply (struct connection *conn) r = conn->send (conn, buf, count); if (r == -1) { nbdkit_error ("write data: %m"); + set_status (conn, -1); pthread_mutex_unlock (&conn->write_lock); - return -1; + return; } } pthread_mutex_unlock (&conn->write_lock); - - return 1; /* command processed ok */ } /* Write buffer to conn->sockout and either succeed completely -- 2.13.6
Eric Blake
2017-Nov-17 03:26 UTC
[Libguestfs] [RFC nbdkit PATCH 5/6] connections: Set up thread pool for handling client requests
Finish plumbing up everything we will need to process multiple client requests in parallel after handshake is complete. Since status is now global, and properly protected by a mutex, all of the threads will eventually quit as soon as any of them notices EOF or nbdkit detects a signal. For ease of review, the framework for configuring threads is done separately from the low-level work of utilizing the threads, so this patch sees no behavior change (because we hard-code nworkers to 1); although it's a one-line hack to test that a larger nworkers still behaves the same even for a non-parallel plugin. Signed-off-by: Eric Blake <eblake@redhat.com> --- Still RFC, because with this patch and changing nworkers to 2, I can rather frequently get test-socket-activation to hang; it looks like the extra load of multiple new threads created after negotiate_handshake() claims success (because we wrote the entire oldstyle header into our end of the socket), coupled with the SIGTERM from the testsuite after only reading 8 bytes off the socket, can cause us to miss the change to the global 'quit' and instead hang on a poll() waiting for a second client that will never connect. The fix for that hang may belong in main.c/sockets.c while this patch is fine as-is, but I need more time to convince myself of that. --- src/connections.c | 93 +++++++++++++++++++++++++++++++++++++++++++++++++------ 1 file changed, 83 insertions(+), 10 deletions(-) diff --git a/src/connections.c b/src/connections.c index 75d8884..5257032 100644 --- a/src/connections.c +++ b/src/connections.c @@ -1,5 +1,5 @@ /* nbdkit - * Copyright (C) 2013-2016 Red Hat Inc. + * Copyright (C) 2013-2017 Red Hat Inc. * All rights reserved. * * Redistribution and use in source and binary forms, with or without @@ -68,7 +68,7 @@ struct connection { int status; /* 1 for more I/O with client, 0 for shutdown, -1 on error */ void *handle; void *crypto_session; - int nworkers; /* TODO set up a thread pool for parallel workers */ + int nworkers; uint64_t exportsize; int readonly; @@ -83,7 +83,8 @@ struct connection { connection_close_function close; }; -static struct connection *new_connection (int sockin, int sockout); +static struct connection *new_connection (int sockin, int sockout, + int nworkers); static void free_connection (struct connection *conn); static int negotiate_handshake (struct connection *conn); static void recv_request_send_reply (struct connection *conn); @@ -173,12 +174,39 @@ set_status (struct connection *conn, int value) pthread_mutex_unlock (&conn->status_lock); } +struct worker_data { + struct connection *conn; + char *name; +}; + +static void * +connection_worker (void *data) +{ + struct worker_data *worker = data; + struct connection *conn = worker->conn; + char *name = worker->name; + + debug ("starting worker thread %s", name); + threadlocal_new_server_thread (); + threadlocal_set_name (name); + free (worker); + + while (!quit && get_status (conn) > 0) + recv_request_send_reply (conn); + debug ("exiting worker thread %s", threadlocal_get_name ()); + free (name); + return NULL; +} + static int _handle_single_connection (int sockin, int sockout) { int r = -1; - struct connection *conn = new_connection (sockin, sockout); + struct connection *conn; + int nworkers = 1; /* TODO default to 16 for parallel plugins, with command-line override */ + pthread_t *workers = NULL; + conn = new_connection (sockin, sockout, nworkers); if (!conn) goto done; @@ -191,11 +219,55 @@ _handle_single_connection (int sockin, int sockout) if (negotiate_handshake (conn) == -1) goto done; - /* Process requests. XXX Allow these to be dispatched in parallel using - * a thread pool. - */ - while (!quit && get_status (conn) > 0) - recv_request_send_reply (conn); + if (nworkers == 1) { + /* No need for a separate thread. */ + debug ("handshake complete, processing requests serially"); + nworkers = conn->nworkers = 0; + while (!quit && get_status (conn) > 0) + recv_request_send_reply (conn); + } + else { + /* Create thread pool to process requests. */ + debug ("handshake complete, processing requests with %d threads", + nworkers); + workers = calloc (nworkers, sizeof *workers); + if (!workers) { + perror ("malloc"); + goto done; + } + + for (nworkers = 0; nworkers < conn->nworkers; nworkers++) { + struct worker_data *worker = malloc (sizeof *worker); + int err; + + if (!worker) { + perror ("malloc"); + set_status (conn, -1); + goto wait; + } + if (asprintf (&worker->name, "%s.%d", plugin_name (), nworkers) < 0) { + perror ("asprintf"); + set_status (conn, -1); + free (worker); + goto wait; + } + worker->conn = conn; + err = pthread_create (&workers[nworkers], NULL, connection_worker, + worker); + if (err) { + errno = err; + perror ("pthread_create"); + set_status (conn, -1); + free (worker); + goto wait; + } + } + + wait: + while (nworkers) + pthread_join (workers[--nworkers], NULL); + free (workers); + } r = get_status (conn); done: @@ -216,7 +288,7 @@ handle_single_connection (int sockin, int sockout) } static struct connection * -new_connection (int sockin, int sockout) +new_connection (int sockin, int sockout, int nworkers) { struct connection *conn; @@ -227,6 +299,7 @@ new_connection (int sockin, int sockout) } conn->status = 1; + conn->nworkers = nworkers; conn->sockin = sockin; conn->sockout = sockout; pthread_mutex_init (&conn->request_lock, NULL); -- 2.13.6
Eric Blake
2017-Nov-17 03:26 UTC
[Libguestfs] [nbdkit PATCH 6/6] Add --threads option for supporting true parallel requests
It's finally time to implement one of the TODO items: we want to support a thread pool of parallel readers from the client, in order to allow multiple in-flight operations with potential out-of-order completion. We also need at least one plugin that supports parallel processing for testing the option; the file plugin fits the bill. Add and document a new command line option, -t/--threads=N, which controls how many threads to create per connection (although we only ever spawn multiple threads if the plugin is parallel, since otherwise, at most one thread is running at a time anyway). Setting -t 1 forces a parallel plugin to behave serialized, setting to other values allows tuning for performance; the default of 16 matches the choice of MAX_NBD_REQUESTS used in qemu. One easy way to test: term1$ echo hello > junk term1$ ./nbdkit -f -v -r file file=junk rdelay=2s wdelay=1s term2$ qemu-io -f raw nbd://localhost:10809/ --trace='nbd_*' \ -c 'aio_read 0 1' -c 'aio_write -P 0x6c 2 2' -c 'aio_flush' If the write completes before the read, then nbdkit was properly handling things in parallel with out-of-order replies. Signed-off-by: Eric Blake <eblake@redhat.com> --- TODO | 7 ------- docs/nbdkit.pod | 12 +++++++++++- nbdkit.in | 2 +- plugins/file/file.c | 2 +- src/connections.c | 10 +++++++--- src/internal.h | 2 ++ src/main.c | 20 ++++++++++++++++++-- src/plugins.c | 8 ++++++++ 8 files changed, 48 insertions(+), 15 deletions(-) diff --git a/TODO b/TODO index 6c5bb5b..db7469b 100644 --- a/TODO +++ b/TODO @@ -12,10 +12,3 @@ * Glance and/or cinder plugins. * Performance - measure and improve it. - -* Implement true parallel request handling. Currently - NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS and - NBDKIT_THREAD_MODEL_PARALLEL are the same, because we handle - requests within each connection synchronously one at a time. We - could (and should) be able to handle them in parallel by having - another thread pool for requests. diff --git a/docs/nbdkit.pod b/docs/nbdkit.pod index e3043ba..4593391 100644 --- a/docs/nbdkit.pod +++ b/docs/nbdkit.pod @@ -9,7 +9,7 @@ nbdkit - A toolkit for creating NBD servers nbdkit [-e EXPORTNAME] [--exit-with-parent] [-f] [-g GROUP] [-i IPADDR] [--newstyle] [--oldstyle] [-P PIDFILE] [-p PORT] [-r] - [--run CMD] [-s] [--selinux-label LABEL] + [--run CMD] [-s] [--selinux-label LABEL] [-t THREADS] [--tls=off|on|require] [--tls-certificates /path/to/certificates] [--tls-verify-peer] [-U SOCKET] [-u USER] [-v] [-V] @@ -230,6 +230,16 @@ Unix domain sockets: nbdkit --selinux-label system_u:object_r:svirt_t:s0 ... +=item B<-t> THREADS + += item B<--threads> THREADS + +Set the number of threads to be used per connection, which in turn +controls the number of outstanding requests that can be processed at +once. Only matters for plugins with thread_model=parallel (where it +defaults to 16). To force serialized behavior (useful if the client +is not prepared for out-of-order responses), set this to 1. + =item B<--tls=off> =item B<--tls=on> diff --git a/nbdkit.in b/nbdkit.in index 6be89ec..9c3d625 100644 --- a/nbdkit.in +++ b/nbdkit.in @@ -65,7 +65,7 @@ verbose while [ $# -gt 0 ]; do case "$1" in # Flags that take an argument. We must not rewrite the argument. - -e | --export* | -g | --group | -i | --ip* | -P | --pid* | -p | --port | --run | --selinux-label | --tls | --tls-certificates | -U | --unix | -u | --user) + -e | --export* | -g | --group | -i | --ip* | -P | --pid* | -p | --port | --run | --selinux-label | -t | --threads | --tls | --tls-certificates | -U | --unix | -u | --user) args[$i]="$1" ((++i)) args[$i]="$2" diff --git a/plugins/file/file.c b/plugins/file/file.c index a603be8..ef5da3d 100644 --- a/plugins/file/file.c +++ b/plugins/file/file.c @@ -200,7 +200,7 @@ file_close (void *handle) free (h); } -#define THREAD_MODEL NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS +#define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL /* Get the file size. */ static int64_t diff --git a/src/connections.c b/src/connections.c index 5257032..2d184b0 100644 --- a/src/connections.c +++ b/src/connections.c @@ -59,6 +59,9 @@ /* Maximum length of any option data (bytes). */ #define MAX_OPTION_LENGTH 4096 +/* Default number of parallel requests. */ +#define DEFAULT_PARALLEL_REQUESTS 16 + /* Connection structure. */ struct connection { pthread_mutex_t request_lock; @@ -203,9 +206,11 @@ _handle_single_connection (int sockin, int sockout) { int r = -1; struct connection *conn; - int nworkers = 1; /* TODO default to 16 for parallel plugins, with command-line override */ + int nworkers = threads ? threads : DEFAULT_PARALLEL_REQUESTS; pthread_t *workers = NULL; + if (!plugin_is_parallel()) + nworkers = 0; conn = new_connection (sockin, sockout, nworkers); if (!conn) goto done; @@ -219,10 +224,9 @@ _handle_single_connection (int sockin, int sockout) if (negotiate_handshake (conn) == -1) goto done; - if (nworkers == 1) { + if (!nworkers) { /* No need for a separate thread. */ debug ("handshake complete, processing requests serially"); - nworkers = conn->nworkers = 0; while (!quit && get_status (conn) > 0) recv_request_send_reply (conn); } diff --git a/src/internal.h b/src/internal.h index 1fc5d69..b79c12c 100644 --- a/src/internal.h +++ b/src/internal.h @@ -103,6 +103,7 @@ extern const char *tls_certificates_dir; extern int tls_verify_peer; extern char *unixsocket; extern int verbose; +extern int threads; extern volatile int quit; @@ -151,6 +152,7 @@ extern void plugin_lock_connection (void); extern void plugin_unlock_connection (void); extern void plugin_lock_request (struct connection *conn); extern void plugin_unlock_request (struct connection *conn); +extern bool plugin_is_parallel (void); extern int plugin_errno_is_preserved (void); extern int plugin_open (struct connection *conn, int readonly); extern void plugin_close (struct connection *conn); diff --git a/src/main.c b/src/main.c index c9f08ab..cc5e9e3 100644 --- a/src/main.c +++ b/src/main.c @@ -84,6 +84,7 @@ int readonly; /* -r */ char *run; /* --run */ int listen_stdin; /* -s */ const char *selinux_label; /* --selinux-label */ +int threads; /* -t */ int tls; /* --tls : 0=off 1=on 2=require */ const char *tls_certificates_dir; /* --tls-certificates */ int tls_verify_peer; /* --tls-verify-peer */ @@ -99,7 +100,7 @@ static char *random_fifo = NULL; enum { HELP_OPTION = CHAR_MAX + 1 }; -static const char *short_options = "e:fg:i:nop:P:rsu:U:vV"; +static const char *short_options = "e:fg:i:nop:P:rst:u:U:vV"; static const struct option long_options[] = { { "help", 0, NULL, HELP_OPTION }, { "dump-config",0, NULL, 0 }, @@ -126,6 +127,7 @@ static const struct option long_options[] = { { "selinux-label", 1, NULL, 0 }, { "single", 0, NULL, 's' }, { "stdin", 0, NULL, 's' }, + { "threads", 1, NULL, 't' }, { "tls", 1, NULL, 0 }, { "tls-certificates", 1, NULL, 0 }, { "tls-verify-peer", 0, NULL, 0 }, @@ -143,7 +145,7 @@ usage (void) " [-e EXPORTNAME] [--exit-with-parent] [-f]\n" " [-g GROUP] [-i IPADDR]\n" " [--newstyle] [--oldstyle] [-P PIDFILE] [-p PORT] [-r]\n" - " [--run CMD] [-s] [--selinux-label LABEL]\n" + " [--run CMD] [-s] [--selinux-label LABEL] [-t THREADS]\n" " [--tls=off|on|require] [--tls-certificates /path/to/certificates]\n" " [--tls-verify-peer]\n" " [-U SOCKET] [-u USER] [-v] [-V]\n" @@ -331,6 +333,20 @@ main (int argc, char *argv[]) listen_stdin = 1; break; + case 't': + { + char *end; + + errno = 0; + threads = strtoul (optarg, &end, 0); + if (errno || *end) { + fprintf (stderr, "%s: cannot parse '%s' into threads\n", + program_name, optarg); + exit (EXIT_FAILURE); + } + /* XXX Worth a maximimum limit on threads? */ + } + case 'U': if (socket_activation) { fprintf (stderr, "%s: cannot use socket activation with -U flag\n", diff --git a/src/plugins.c b/src/plugins.c index e8c6b28..47c4fa5 100644 --- a/src/plugins.c +++ b/src/plugins.c @@ -360,6 +360,14 @@ plugin_unlock_request (struct connection *conn) } } +bool +plugin_is_parallel (void) +{ + assert (dl); + + return plugin._thread_model >= NBDKIT_THREAD_MODEL_PARALLEL; +} + int plugin_errno_is_preserved (void) { -- 2.13.6
Richard W.M. Jones
2017-Nov-17 08:59 UTC
Re: [Libguestfs] [nbdkit PATCH 3/6] connections: Add read/write lock over client I/O
On Thu, Nov 16, 2017 at 09:26:54PM -0600, Eric Blake wrote:> In preparation for parallel processing, we need to be sure that > two threads belonging to the same connection cannot interleave > their I/O except at message boundaries. Add a mutex around > all reads and writes that must occur as a group (for now, there > is no contention for either mutex). > > Signed-off-by: Eric Blake <eblake@redhat.com> > --- > src/connections.c | 27 +++++++++++++++++++++++++-- > 1 file changed, 25 insertions(+), 2 deletions(-) > > diff --git a/src/connections.c b/src/connections.c > index dada9aa..dd43a9a 100644 > --- a/src/connections.c > +++ b/src/connections.c > @@ -62,6 +62,8 @@ > /* Connection structure. */ > struct connection { > pthread_mutex_t request_lock; > + pthread_mutex_t read_lock; > + pthread_mutex_t write_lock; > void *handle; > void *crypto_session; > > @@ -206,6 +208,8 @@ new_connection (int sockin, int sockout) > conn->sockin = sockin; > conn->sockout = sockout; > pthread_mutex_init (&conn->request_lock, NULL); > + pthread_mutex_init (&conn->read_lock, NULL); > + pthread_mutex_init (&conn->write_lock, NULL); > > conn->recv = raw_recv; > conn->send = raw_send; > @@ -223,6 +227,8 @@ free_connection (struct connection *conn) > conn->close (conn); > > pthread_mutex_destroy (&conn->request_lock); > + pthread_mutex_destroy (&conn->read_lock); > + pthread_mutex_destroy (&conn->write_lock); > > /* Don't call the plugin again if quit has been set because the main > * thread will be in the process of unloading it. The plugin.unload > @@ -888,19 +894,23 @@ recv_request_send_reply (struct connection *conn) > CLEANUP_FREE char *buf = NULL; > > /* Read the request packet. */ > + pthread_mutex_lock (&conn->read_lock); > r = conn->recv (conn, &request, sizeof request); > if (r == -1) { > nbdkit_error ("read request: %m"); > + pthread_mutex_unlock (&conn->read_lock); > return -1; > } > if (r == 0) { > debug ("client closed input socket, closing connection"); > + pthread_mutex_unlock (&conn->read_lock); > return 0; /* disconnect */ > } > > magic = be32toh (request.magic); > if (magic != NBD_REQUEST_MAGIC) { > nbdkit_error ("invalid request: 'magic' field is incorrect (0x%x)", magic); > + pthread_mutex_unlock (&conn->read_lock); > return -1; > } > > @@ -913,14 +923,18 @@ recv_request_send_reply (struct connection *conn) > > if (cmd == NBD_CMD_DISC) { > debug ("client sent disconnect command, closing connection"); > + pthread_mutex_unlock (&conn->read_lock); > return 0; /* disconnect */ > } > > /* Validate the request. */ > if (!validate_request (conn, cmd, flags, offset, count, &error)) { > if (cmd == NBD_CMD_WRITE && > - skip_over_write_buffer (conn->sockin, count) < 0) > + skip_over_write_buffer (conn->sockin, count) < 0) { > + pthread_mutex_unlock (&conn->read_lock); > return -1; > + } > + pthread_mutex_unlock (&conn->read_lock); > goto send_reply; > } > > @@ -931,8 +945,11 @@ recv_request_send_reply (struct connection *conn) > perror ("malloc"); > error = ENOMEM; > if (cmd == NBD_CMD_WRITE && > - skip_over_write_buffer (conn->sockin, count) < 0) > + skip_over_write_buffer (conn->sockin, count) < 0) { > + pthread_mutex_unlock (&conn->read_lock); > return -1; > + } > + pthread_mutex_unlock (&conn->read_lock); > goto send_reply; > } > } > @@ -946,9 +963,11 @@ recv_request_send_reply (struct connection *conn) > } > if (r == -1) { > nbdkit_error ("read data: %m"); > + pthread_mutex_unlock (&conn->read_lock); > return -1; > } > } > + pthread_mutex_unlock (&conn->read_lock); > > /* Perform the request. Only this part happens inside the request lock. */ > if (quit) { > @@ -962,6 +981,7 @@ recv_request_send_reply (struct connection *conn) > > /* Send the reply packet. */ > send_reply: > + pthread_mutex_lock (&conn->write_lock); > reply.magic = htobe32 (NBD_REPLY_MAGIC); > reply.handle = request.handle; > reply.error = htobe32 (nbd_errno (error)); > @@ -978,6 +998,7 @@ recv_request_send_reply (struct connection *conn) > r = conn->send (conn, &reply, sizeof reply); > if (r == -1) { > nbdkit_error ("write reply: %m"); > + pthread_mutex_unlock (&conn->write_lock); > return -1; > } > > @@ -986,9 +1007,11 @@ recv_request_send_reply (struct connection *conn) > r = conn->send (conn, buf, count); > if (r == -1) { > nbdkit_error ("write data: %m"); > + pthread_mutex_unlock (&conn->write_lock); > return -1; > } > } > + pthread_mutex_unlock (&conn->write_lock); > > return 1; /* command processed ok */ > }There's nothing wrong with this patch, but it might be easier to use an attribute((cleanup)) handler to deal with the unlocking. See these links for how we do it in libguestfs: https://github.com/libguestfs/libguestfs/blob/50ca24b634f59f1a14fd230aa4893d7408347d76/common/utils/cleanups.h#L27 https://github.com/libguestfs/libguestfs/blob/50ca24b634f59f1a14fd230aa4893d7408347d76/lib/guestfs-internal.h#L81-L87 https://github.com/libguestfs/libguestfs/blob/50ca24b634f59f1a14fd230aa4893d7408347d76/lib/errors.c#L179 Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-builder quickly builds VMs from scratch http://libguestfs.org/virt-builder.1.html
Richard W.M. Jones
2017-Nov-17 09:04 UTC
Re: [Libguestfs] [nbdkit PATCH 6/6] Add --threads option for supporting true parallel requests
These series is fine. You can push 1-4 or 1-6 as you wish. Thanks, Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-df lists disk usage of guests without needing to install any software inside the virtual machine. Supports Linux and Windows. http://people.redhat.com/~rjones/virt-df/
Reasonably Related Threads
- [PATCH 1/9] plugins: Move locking to a new file.
- [PATCH nbdkit 1/3] plugins: Move locking to a new file.
- [PATCH nbdkit 2/3] Refactor plugin_* functions into a backend struct.
- [PATCH 2/9] Refactor plugin_* functions into a backend struct.
- [PATCH nbdkit v2 2/3] Refactor plugin_* functions into a backend struct.