Eric Blake
2019-Jun-18 16:30 UTC
[Libguestfs] [nbdkit PATCH] Experiment: nbd: Use ppoll() instead of pipe-to-self
It is necessary to kick the state machine any time another thread blocks due to write() encountering a full buffer, to get the reader thread to learn that it must now poll on POLLOUT rather than just POLLIN. POSIX only provides two ways to do this: either poll() on a second fd (our pipe-to-self trick), or interrupt the polling with EINTR due to delivery of a signal. So, I played with the latter approach by running the reader thread with SIGUSR1 blocked except during the poll, where other threads change their kick action from writing into the pipe-to-self to instead calling pthread_kill() to force EINTR. Unfortunately, using only poll() for this is racy; to fix the race, we either have to resort to POSIX pselect() (which is horribly klunky and does not scale well to large fd values if we have lots of parallel clients) or use the non-POSIX ppoll() (these days, BSD and Linux systems all have it; if someone complains about failure to compile, we can #ifdef the code and fall back to pipe-to-self on the systems lacking ppoll()). Also unfortunately, the timing does not favor this approach. Repeating the setup for commit e897ed70, I see the following changes: Pre-patch, the runs averaged 17.232s, 9.76E+09 bits/s Post-patch, the runs averaged 17.993s, 9.34E+09 bits/s I'm posting this in case something changes in the future (for example, I envision that libnbd might add a callback function for performing a kick such that we signal the reader thread ONLY when we actually blocked on write(), rather than after every single request sent to the server), but will not be applying it for now. Signed-off-by: Eric Blake <eblake@redhat.com> --- plugins/nbd/nbd.c | 86 +++++++++++++++++++++++++++++------------------ 1 file changed, 53 insertions(+), 33 deletions(-) diff --git a/plugins/nbd/nbd.c b/plugins/nbd/nbd.c index 941e4cb7..02b8119a 100644 --- a/plugins/nbd/nbd.c +++ b/plugins/nbd/nbd.c @@ -45,6 +45,7 @@ #include <pthread.h> #include <semaphore.h> #include <poll.h> +#include <signal.h> #include <libnbd.h> @@ -67,7 +68,6 @@ struct handle { /* These fields are read-only once initialized */ struct nbd_handle *nbd; int fd; /* Cache of nbd_aio_get_fd */ - int fds[2]; /* Pipe for kicking the reader thread */ bool readonly; pthread_t reader; @@ -105,6 +105,15 @@ static char *tls_psk; static struct handle *nbdplug_open_handle (int readonly); static void nbdplug_close_handle (struct handle *h); +/* Original signal mask, with SIGUSR1 unblocked */ +static sigset_t origmask; + +/* No-op signal handler for interrupting ppoll on SIGUSR1 */ +static void +nbdplug_sigusr1 (int sig) +{ +} + static void nbdplug_unload (void) { @@ -195,10 +204,30 @@ nbdplug_config (const char *key, const char *value) return 0; } -/* Check the user passed exactly one socket description. */ +/* Finalize the configuration. */ static int nbdplug_config_complete (void) { + struct sigaction act = { + .sa_handler = nbdplug_sigusr1, + .sa_flags = SA_RESTART, + }; + + /* Register a no-op SIGUSR1 handler for use with ppoll */ + if ((errno = pthread_sigmask (SIG_SETMASK, NULL, &origmask))) { + nbdkit_error ("pthread_sigmask: %m"); + return -1; + } + if (sigismember (&origmask, SIGUSR1)) { + nbdkit_error ("SIGUSR1 should not be blocked"); + return -1; + } + if (sigaction (SIGUSR1, &act, NULL) == -1) { + nbdkit_error ("sigaction: %m"); + return -1; + } + + /* Check the user passed exactly one socket description. */ if (sockname) { struct sockaddr_un sock; @@ -306,37 +335,27 @@ nbdplug_reader (void *handle) int r; while (!nbd_aio_is_dead (h->nbd) && !nbd_aio_is_closed (h->nbd)) { - struct pollfd fds[2] = { - [0].fd = h->fd, - [1].fd = h->fds[0], - [1].events = POLLIN, - }; + struct pollfd fd; struct transaction *trans, **prev; int dir; - char c; + fd.fd = h->fd; dir = nbd_aio_get_direction (h->nbd); nbdkit_debug ("polling, dir=%d", dir); if (dir & LIBNBD_AIO_DIRECTION_READ) - fds[0].events |= POLLIN; + fd.events |= POLLIN; if (dir & LIBNBD_AIO_DIRECTION_WRITE) - fds[0].events |= POLLOUT; - if (poll (fds, 2, -1) == -1) { - nbdkit_error ("poll: %m"); + fd.events |= POLLOUT; + if (ppoll (&fd, 1, NULL, &origmask) == -1 && errno != EINTR) { + nbdkit_error ("ppoll: %m"); break; } - if (dir & LIBNBD_AIO_DIRECTION_READ && fds[0].revents & POLLIN) + if (dir & LIBNBD_AIO_DIRECTION_READ && fd.revents & POLLIN) nbd_aio_notify_read (h->nbd); - else if (dir & LIBNBD_AIO_DIRECTION_WRITE && fds[0].revents & POLLOUT) + else if (dir & LIBNBD_AIO_DIRECTION_WRITE && fd.revents & POLLOUT) nbd_aio_notify_write (h->nbd); - /* Check if we were kicked because a command was started */ - if (fds[1].revents & POLLIN && read (h->fds[0], &c, 1) != 1) { - nbdkit_error ("failed to read pipe: %m"); - break; - } - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock); trans = h->trans; prev = &h->trans; @@ -400,7 +419,6 @@ static struct transaction * nbdplug_register (struct handle *h, int64_t cookie) { struct transaction *trans; - char c = 0; if (cookie == -1) { nbdkit_error ("command failed: %s", nbd_get_error ()); @@ -417,8 +435,8 @@ nbdplug_register (struct handle *h, int64_t cookie) /* While locked, kick the reader thread and add our transaction */ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&h->trans_lock); - if (write (h->fds[1], &c, 1) != 1) { - nbdkit_error ("write to pipe: %m"); + if ((errno = pthread_kill (h->reader, SIGUSR1))) { + nbdkit_error ("pthread_kill: %m"); free (trans); return NULL; } @@ -466,17 +484,13 @@ nbdplug_open_handle (int readonly) struct handle *h; int r; unsigned long retries = retry; + sigset_t blocked; h = calloc (1, sizeof *h); if (h == NULL) { nbdkit_error ("malloc: %m"); return NULL; } - if (pipe (h->fds)) { - nbdkit_error ("pipe: %m"); - free (h); - return NULL; - } retry: h->fd = -1; @@ -520,22 +534,30 @@ nbdplug_open_handle (int readonly) if (readonly) h->readonly = true; - /* Spawn a dedicated reader thread */ + /* Spawn a dedicated reader thread with SIGUSR1 blocked */ if ((errno = pthread_mutex_init (&h->trans_lock, NULL))) { nbdkit_error ("failed to initialize transaction mutex: %m"); goto err; } + if (sigemptyset (&blocked) == -1 || + sigaddset (&blocked, SIGUSR1) == -1) + assert (false); + if ((errno = pthread_sigmask (SIG_BLOCK, &blocked, NULL))) { + nbdkit_error ("failed to set signal mask: %m"); + pthread_mutex_destroy (&h->trans_lock); + goto err; + } if ((errno = pthread_create (&h->reader, NULL, nbdplug_reader, h))) { nbdkit_error ("failed to initialize reader thread: %m"); pthread_mutex_destroy (&h->trans_lock); goto err; } + if (pthread_sigmask (SIG_SETMASK, &origmask, NULL)) + assert (false); return h; err: - close (h->fds[0]); - close (h->fds[1]); nbdkit_error ("failure while creating nbd handle: %s", nbd_get_error ()); if (h->nbd) nbd_close (h->nbd); @@ -560,8 +582,6 @@ nbdplug_close_handle (struct handle *h) nbdkit_debug ("failed to clean up handle: %s", nbd_get_error ()); if ((errno = pthread_join (h->reader, NULL))) nbdkit_debug ("failed to join reader thread: %m"); - close (h->fds[0]); - close (h->fds[1]); nbd_close (h->nbd); pthread_mutex_destroy (&h->trans_lock); free (h); -- 2.20.1
Apparently Analagous Threads
- [nbdkit PATCH 3/4] nbd: Use libnbd 0.1
- [nbdkit PATCH v3 3/5] nbd: Use libnbd 0.1.3+
- [PATCH libnbd v2 1/4] examples, tests: Remove want_to_send / ready logic, increase limit on cmds in flight.
- [libnbd] Simultaneous read and write
- [nbdkit PATCH 2/2] nbd: Use nbdkit aio_*_notify variants