Eric Blake
2017-Nov-20  19:38 UTC
[Libguestfs] [nbdkit PATCH v2 0/8] Support parallel transactions within single connection
I've posted some of these patches or ideas before; but now I'm confident enough with the series that it should be ready to push; at any rate, I can now run test-socket-activation in a tight loop without triggering any crashes or hangs. With this in place, I'm going back to work on making the nbd forwarder wort with the parallel thread model. Eric Blake (8): sockets: Use pipe-to-self to avoid hanging in poll cleanup: Add ACQUIRE_LOCK_FOR_CURRENT_SCOPE() connections: Add read/write lock over client I/O connections: Add thread-safe status indicator connections: Set up thread pool for handling client requests core: Add --threads option for supporting true parallel requests tests: Annotate Makefile.am conditionals tests: Test parallel behavior TODO | 7 - configure.ac | 11 +- docs/nbdkit.pod | 12 +- nbdkit.in | 2 +- plugins/file/file.c | 2 +- src/cleanup.c | 8 +- src/connections.c | 316 ++++++++++++++++++++++++++++++-------------- src/internal.h | 14 +- src/main.c | 39 +++++- src/plugins.c | 8 ++ src/sockets.c | 14 +- tests/Makefile.am | 55 +++++--- tests/test-parallel-file.sh | 71 ++++++++++ 13 files changed, 414 insertions(+), 145 deletions(-) create mode 100755 tests/test-parallel-file.sh -- 2.13.6
Eric Blake
2017-Nov-20  19:38 UTC
[Libguestfs] [nbdkit PATCH v2 1/8] sockets: Use pipe-to-self to avoid hanging in poll
Once more per-connection threads are introduced for parallel
plugins, we run a much higher risk of the scheduler causing a
hang during test-socket-activation.  The hang is caused by
waiting for a second new client in accept_incoming_connections()
even though the test sent a SIGTERM after the first connection
to request an orderly shutdown, via the following scenario:
main                    signal context
--------------------------------------
first iteration, finish accept_connect()
checks !quit, starts second iteration
                        SIGTERM received
                        set quit
call poll()
That's because there is a window of time between when we read
the global 'quit' and when we start our poll(), in relation to
the signal handler which sets 'quit'.  If the signal arrives
after poll() is started, then poll() gets an EINTR failure
which lets us recheck 'quit'; but as more threads are competing
for execution time, the window gets bigger where the handler
can finish execution before poll() starts.
We want to avoid accepting any new connections once we detect
a signal, but basing it on the non-atomic 'quit' is inadequate,
so the solution is to set up a pipe-to-self as an additional
fd to poll for, so that we are guaranteed to break the poll()
loop the moment we know a signal has arrived.  Note, however,
that this means that shutdown signals are more responsive than
previously, and while we have a pthread_rwlock that prevents
us from shutting down while a plugin is active, we are more
likely to call exit() from the main thread before our detached
handler threads have detected that quit was set; perhaps we
will want a followup patch that gives the handler threads more
time to do an orderly shutdown() for less abrupt deaths to any
connected clients.
Signed-off-by: Eric Blake <eblake@redhat.com>
---
 src/internal.h |  4 ++--
 src/main.c     | 19 ++++++++++++++++++-
 src/sockets.c  | 14 +++++++++++---
 3 files changed, 31 insertions(+), 6 deletions(-)
diff --git a/src/internal.h b/src/internal.h
index 1fc5d69..5953185 100644
--- a/src/internal.h
+++ b/src/internal.h
@@ -1,5 +1,5 @@
 /* nbdkit
- * Copyright (C) 2013 Red Hat Inc.
+ * Copyright (C) 2013-2017 Red Hat Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -103,8 +103,8 @@ extern const char *tls_certificates_dir;
 extern int tls_verify_peer;
 extern char *unixsocket;
 extern int verbose;
-
 extern volatile int quit;
+extern int quit_fd;
 /* cleanup.c */
 extern void cleanup_free (void *ptr);
diff --git a/src/main.c b/src/main.c
index c9f08ab..224ee8a 100644
--- a/src/main.c
+++ b/src/main.c
@@ -1,5 +1,5 @@
 /* nbdkit
- * Copyright (C) 2013-2016 Red Hat Inc.
+ * Copyright (C) 2013-2017 Red Hat Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -92,7 +92,13 @@ const char *user, *group;       /* -u & -g */
 int verbose;                    /* -v */
 unsigned int socket_activation  /* $LISTEN_FDS and $LISTEN_PID set */;
+/* Detection of request to exit via signal.  Most places in the code
+ * can just poll quit at opportune moments, while sockets.c needs a
+ * pipe-to-self through quit_fd in order to break a poll loop without
+ * a race. */
 volatile int quit;
+int quit_fd;
+static int write_quit_fd;
 static char *random_fifo_dir = NULL;
 static char *random_fifo = NULL;
@@ -657,13 +663,24 @@ start_serving (void)
 static void
 handle_quit (int sig)
 {
+  char c = sig;
+
   quit = 1;
+  write (write_quit_fd, &c, 1);
 }
 static void
 set_up_signals (void)
 {
   struct sigaction sa;
+  int fds[2];
+
+  if (pipe (fds) < 0) {
+    perror ("pipe");
+    exit (EXIT_FAILURE);
+  }
+  quit_fd = fds[0];
+  write_quit_fd = fds[1];
   memset (&sa, 0, sizeof sa);
   sa.sa_flags = SA_RESTART;
diff --git a/src/sockets.c b/src/sockets.c
index 1d63523..0ea40a1 100644
--- a/src/sockets.c
+++ b/src/sockets.c
@@ -1,5 +1,5 @@
 /* nbdkit
- * Copyright (C) 2013 Red Hat Inc.
+ * Copyright (C) 2013-2017 Red Hat Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -316,7 +316,7 @@ accept_connection (int listen_sock)
 void
 accept_incoming_connections (int *socks, size_t nr_socks)
 {
-  struct pollfd fds[nr_socks];
+  struct pollfd fds[nr_socks + 1];
   size_t i;
   int r;
@@ -326,8 +326,11 @@ accept_incoming_connections (int *socks, size_t nr_socks)
       fds[i].events = POLLIN;
       fds[i].revents = 0;
     }
+    fds[i].fd = quit_fd;
+    fds[i].events = POLLIN;
+    fds[i].revents = 0;
-    r = poll (fds, nr_socks, -1);
+    r = poll (fds, nr_socks + 1, -1);
     if (r == -1) {
       if (errno == EINTR || errno == EAGAIN)
         continue;
@@ -335,6 +338,11 @@ accept_incoming_connections (int *socks, size_t nr_socks)
       exit (EXIT_FAILURE);
     }
+    /* We don't even have to read quit_fd - just knowing that it has data
+     * means the signal handler ran, so we are ready to quit the loop. */
+    if (fds[i].revents & POLLIN)
+      continue;
+
     for (i = 0; i < nr_socks; ++i) {
       if (fds[i].revents & POLLIN)
         accept_connection (fds[i].fd);
-- 
2.13.6
Eric Blake
2017-Nov-20  19:38 UTC
[Libguestfs] [nbdkit PATCH v2 2/8] cleanup: Add ACQUIRE_LOCK_FOR_CURRENT_SCOPE()
Add a new macro that lets us easily code a mutex cleanup no
matter how we leave a scope, which relies on the gcc/clang
extension of __attribute__((cleanup)).  Note that there is
no semi-safe fallback possible to unlock a mutex once we rely
on cleanup semantics: eliding the attribute would deadlock,
and not defining the macro at all would fail to compile for
those old compilers.  Our configure check was already warning
users about non-modern compilers (and no one was complaining),
so strengthen the probe into a full-time requirement that
refuses to configure without a decent compiler.
Another argument in favor of making cleanup support mandatory:
it is trivial to write an ill-behaved client that does a tight
loop in opening a connection, requesting an NBD_CMD_WRITE with
a 64M payload, then hanging up without actually sending the
data.  But since connections.c uses CLEANUP_FREE for the 64M
buffer under client control, and our (previous) behavior for a
deficient compiler was to simply elide the cleanup call and
thus leak memory, the bad client can mount a DoS attack against
real clients by forcing nbdkit compiled without cleanup support
to run out of memory very quickly (nbdkit compiled with cleanup
support is immune).
Based on an idea in libguestfs, courtesy of Richard Jones.
Signed-off-by: Eric Blake <eblake@redhat.com>
---
 configure.ac   | 7 +++----
 src/cleanup.c  | 8 +++++++-
 src/internal.h | 9 +++++----
 3 files changed, 15 insertions(+), 9 deletions(-)
diff --git a/configure.ac b/configure.ac
index 86ce5f2..130ab78 100644
--- a/configure.ac
+++ b/configure.ac
@@ -113,17 +113,16 @@ main (int argc, char *argv[])
 ]])
     ],[
     AC_MSG_RESULT([yes])
-    AC_DEFINE([HAVE_ATTRIBUTE_CLEANUP],[1],[Define to 1 if
'__attribute__((cleanup(...)))' works with this compiler.])
     ],[
-    AC_MSG_WARN(
+    AC_MSG_RESULT([no])
+    AC_MSG_ERROR(
 ['__attribute__((cleanup(...)))' does not work.
 You may not be using a sufficiently recent version of GCC or CLANG, or
 you may be using a C compiler which does not support this attribute,
 or the configure test may be wrong.
-The code will still compile, but is likely to leak memory and other
-resources when it runs.])])
+This code requires the attribute to work for proper locking between
threads.])])
 dnl restore CFLAGS
 CFLAGS="${acx_nbdkit_save_CFLAGS}"
diff --git a/src/cleanup.c b/src/cleanup.c
index fccf90c..3f7f8af 100644
--- a/src/cleanup.c
+++ b/src/cleanup.c
@@ -1,5 +1,5 @@
 /* nbdkit
- * Copyright (C) 2013 Red Hat Inc.
+ * Copyright (C) 2013-2017 Red Hat Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -46,3 +46,9 @@ cleanup_free (void *ptr)
 {
   free (* (void **) ptr);
 }
+
+void
+cleanup_unlock (pthread_mutex_t **ptr)
+{
+  pthread_mutex_unlock (*ptr);
+}
diff --git a/src/internal.h b/src/internal.h
index 5953185..a0bbef7 100644
--- a/src/internal.h
+++ b/src/internal.h
@@ -108,11 +108,12 @@ extern int quit_fd;
 /* cleanup.c */
 extern void cleanup_free (void *ptr);
-#ifdef HAVE_ATTRIBUTE_CLEANUP
 #define CLEANUP_FREE __attribute__((cleanup (cleanup_free)))
-#else
-#define CLEANUP_FREE
-#endif
+extern void cleanup_unlock (pthread_mutex_t **ptr);
+#define CLEANUP_UNLOCK __attribute__((cleanup (cleanup_unlock)))
+#define ACQUIRE_LOCK_FOR_CURRENT_SCOPE(mutex) \
+  CLEANUP_UNLOCK pthread_mutex_t *_lock = mutex; \
+  pthread_mutex_lock (_lock)
 /* connections.c */
 struct connection;
-- 
2.13.6
Eric Blake
2017-Nov-20  19:38 UTC
[Libguestfs] [nbdkit PATCH v2 3/8] connections: Add read/write lock over client I/O
In preparation for parallel processing, we need to be sure that
two threads belonging to the same connection cannot interleave
their I/O except at message boundaries.  Add a mutex around
all reads and writes that must occur as a group (for now, there
is no contention for either mutex).
This commit is best viewed with 'git diff -b', thanks to the
indentation changes required by using macros for scoped locking
coupled with needing two scopes in one function.  Splitting
the function wouldn't be any prettier, as we would then have
to coordinate the malloc'd buffer for NBD_CMD_READ/WRITE.
Signed-off-by: Eric Blake <eblake@redhat.com>
---
 src/connections.c | 151 +++++++++++++++++++++++++++++-------------------------
 1 file changed, 82 insertions(+), 69 deletions(-)
diff --git a/src/connections.c b/src/connections.c
index 27d685e..f779903 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -1,5 +1,5 @@
 /* nbdkit
- * Copyright (C) 2013-2016 Red Hat Inc.
+ * Copyright (C) 2013-2017 Red Hat Inc.
  * All rights reserved.
  *
  * Redistribution and use in source and binary forms, with or without
@@ -62,6 +62,8 @@
 /* Connection structure. */
 struct connection {
   pthread_mutex_t request_lock;
+  pthread_mutex_t read_lock;
+  pthread_mutex_t write_lock;
   void *handle;
   void *crypto_session;
@@ -206,6 +208,8 @@ new_connection (int sockin, int sockout)
   conn->sockin = sockin;
   conn->sockout = sockout;
   pthread_mutex_init (&conn->request_lock, NULL);
+  pthread_mutex_init (&conn->read_lock, NULL);
+  pthread_mutex_init (&conn->write_lock, NULL);
   conn->recv = raw_recv;
   conn->send = raw_send;
@@ -223,6 +227,8 @@ free_connection (struct connection *conn)
   conn->close (conn);
   pthread_mutex_destroy (&conn->request_lock);
+  pthread_mutex_destroy (&conn->read_lock);
+  pthread_mutex_destroy (&conn->write_lock);
   /* Don't call the plugin again if quit has been set because the main
    * thread will be in the process of unloading it.  The plugin.unload
@@ -874,65 +880,69 @@ recv_request_send_reply (struct connection *conn)
   CLEANUP_FREE char *buf = NULL;
   /* Read the request packet. */
-  r = conn->recv (conn, &request, sizeof request);
-  if (r == -1) {
-    nbdkit_error ("read request: %m");
-    return -1;
-  }
-  if (r == 0) {
-    debug ("client closed input socket, closing connection");
-    return 0;                   /* disconnect */
-  }
-
-  magic = be32toh (request.magic);
-  if (magic != NBD_REQUEST_MAGIC) {
-    nbdkit_error ("invalid request: 'magic' field is incorrect
(0x%x)", magic);
-    return -1;
-  }
+  {
+    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&conn->read_lock);
+    r = conn->recv (conn, &request, sizeof request);
+    if (r == -1) {
+      nbdkit_error ("read request: %m");
+      return -1;
+    }
+    if (r == 0) {
+      debug ("client closed input socket, closing connection");
+      return 0;                   /* disconnect */
+    }
-  cmd = be32toh (request.type);
-  flags = cmd & ~NBD_CMD_MASK_COMMAND;
-  cmd &= NBD_CMD_MASK_COMMAND;
+    magic = be32toh (request.magic);
+    if (magic != NBD_REQUEST_MAGIC) {
+      nbdkit_error ("invalid request: 'magic' field is incorrect
(0x%x)",
+                    magic);
+      return -1;
+    }
-  offset = be64toh (request.offset);
-  count = be32toh (request.count);
+    cmd = be32toh (request.type);
+    flags = cmd & ~NBD_CMD_MASK_COMMAND;
+    cmd &= NBD_CMD_MASK_COMMAND;
-  if (cmd == NBD_CMD_DISC) {
-    debug ("client sent disconnect command, closing connection");
-    return 0;                   /* disconnect */
-  }
+    offset = be64toh (request.offset);
+    count = be32toh (request.count);
-  /* Validate the request. */
-  if (!validate_request (conn, cmd, flags, offset, count, &error)) {
-    if (cmd == NBD_CMD_WRITE &&
-        skip_over_write_buffer (conn->sockin, count) < 0)
-      return -1;
-    goto send_reply;
-  }
+    if (cmd == NBD_CMD_DISC) {
+      debug ("client sent disconnect command, closing connection");
+      return 0;                   /* disconnect */
+    }
-  /* Allocate the data buffer used for either read or write requests. */
-  if (cmd == NBD_CMD_READ || cmd == NBD_CMD_WRITE) {
-    buf = malloc (count);
-    if (buf == NULL) {
-      perror ("malloc");
-      error = ENOMEM;
+    /* Validate the request. */
+    if (!validate_request (conn, cmd, flags, offset, count, &error)) {
       if (cmd == NBD_CMD_WRITE &&
           skip_over_write_buffer (conn->sockin, count) < 0)
         return -1;
       goto send_reply;
     }
-  }
-  /* Receive the write data buffer. */
-  if (cmd == NBD_CMD_WRITE) {
-    r = conn->recv (conn, buf, count);
-    if (r == 0) {
-      errno = EBADMSG;
-      r = -1;
+    /* Allocate the data buffer used for either read or write requests. */
+    if (cmd == NBD_CMD_READ || cmd == NBD_CMD_WRITE) {
+      buf = malloc (count);
+      if (buf == NULL) {
+        perror ("malloc");
+        error = ENOMEM;
+        if (cmd == NBD_CMD_WRITE &&
+            skip_over_write_buffer (conn->sockin, count) < 0)
+          return -1;
+        goto send_reply;
+      }
     }
-    if (r == -1) {
-      nbdkit_error ("read data: %m");
-      return -1;
+
+    /* Receive the write data buffer. */
+    if (cmd == NBD_CMD_WRITE) {
+      r = conn->recv (conn, buf, count);
+      if (r == 0) {
+        errno = EBADMSG;
+        r = -1;
+      }
+      if (r == -1) {
+        nbdkit_error ("read data: %m");
+        return -1;
+      }
     }
   }
@@ -948,32 +958,35 @@ recv_request_send_reply (struct connection *conn)
   /* Send the reply packet. */
  send_reply:
-  reply.magic = htobe32 (NBD_REPLY_MAGIC);
-  reply.handle = request.handle;
-  reply.error = htobe32 (nbd_errno (error));
+  {
+    ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&conn->write_lock);
+    reply.magic = htobe32 (NBD_REPLY_MAGIC);
+    reply.handle = request.handle;
+    reply.error = htobe32 (nbd_errno (error));
-  if (error != 0) {
-    /* Since we're about to send only the limited NBD_E* errno to the
-     * client, don't lose the information about what really happened
-     * on the server side.  Make sure there is a way for the operator
-     * to retrieve the real error.
-     */
-    debug ("sending error reply: %s", strerror (error));
-  }
+    if (error != 0) {
+      /* Since we're about to send only the limited NBD_E* errno to the
+       * client, don't lose the information about what really happened
+       * on the server side.  Make sure there is a way for the operator
+       * to retrieve the real error.
+       */
+      debug ("sending error reply: %s", strerror (error));
+    }
-  r = conn->send (conn, &reply, sizeof reply);
-  if (r == -1) {
-    nbdkit_error ("write reply: %m");
-    return -1;
-  }
-
-  /* Send the read data buffer. */
-  if (cmd == NBD_CMD_READ && !error) {
-    r = conn->send (conn, buf, count);
+    r = conn->send (conn, &reply, sizeof reply);
     if (r == -1) {
-      nbdkit_error ("write data: %m");
+      nbdkit_error ("write reply: %m");
       return -1;
     }
+
+    /* Send the read data buffer. */
+    if (cmd == NBD_CMD_READ && !error) {
+      r = conn->send (conn, buf, count);
+      if (r == -1) {
+        nbdkit_error ("write data: %m");
+        return -1;
+      }
+    }
   }
   return 1;                     /* command processed ok */
-- 
2.13.6
Eric Blake
2017-Nov-20  19:38 UTC
[Libguestfs] [nbdkit PATCH v2 4/8] connections: Add thread-safe status indicator
Once we have multiple threads during parallel processing, we need
to be sure that any I/O error flagged by one thread prevents the
next thread from attempting I/O.  Although we already have a
separate lock for reads and writes, it's easier if status is
shared by both actions, which needs yet another mutex; however
we can optimize (via accessor functions) and only need to use the
mutex if there are actually multiple threads at work.
The next thing to notice is that because we now update status at
all important points, the return value of _handle_single_connection()
matches the latest status; which will come in handy later as it will
avoid trying to coordinate a value out of multiple threads.
Signed-off-by: Eric Blake <eblake@redhat.com>
---
 src/connections.c | 82 +++++++++++++++++++++++++++++++++++++------------------
 1 file changed, 56 insertions(+), 26 deletions(-)
diff --git a/src/connections.c b/src/connections.c
index f779903..9d95e7f 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -64,8 +64,11 @@ struct connection {
   pthread_mutex_t request_lock;
   pthread_mutex_t read_lock;
   pthread_mutex_t write_lock;
+  pthread_mutex_t status_lock;
+  int status; /* 1 for more I/O with client, 0 for shutdown, -1 on error */
   void *handle;
   void *crypto_session;
+  int nworkers; /* TODO set up a thread pool for parallel workers */
   uint64_t exportsize;
   int readonly;
@@ -146,40 +149,60 @@ connection_set_close (struct connection *conn,
connection_close_function close)
 }
 static int
+get_status (struct connection *conn)
+{
+  int r;
+
+  if (conn->nworkers)
+    pthread_mutex_lock (&conn->status_lock);
+  r = conn->status;
+  if (conn->nworkers)
+    pthread_mutex_unlock (&conn->status_lock);
+  return r;
+}
+
+/* Update the status if the new value is lower than the existing value.
+ * For convenience, return the incoming value. */
+static int
+set_status (struct connection *conn, int value)
+{
+  if (conn->nworkers)
+    pthread_mutex_lock (&conn->status_lock);
+  if (value < conn->status)
+    conn->status = value;
+  if (conn->nworkers)
+    pthread_mutex_unlock (&conn->status_lock);
+  return value;
+}
+
+static int
 _handle_single_connection (int sockin, int sockout)
 {
-  int r;
+  int r = -1;
   struct connection *conn = new_connection (sockin, sockout);
   if (!conn)
-    goto err;
+    goto done;
   if (plugin_open (conn, readonly) == -1)
-    goto err;
+    goto done;
   threadlocal_set_name (plugin_name ());
   /* Handshake. */
   if (negotiate_handshake (conn) == -1)
-    goto err;
+    goto done;
   /* Process requests.  XXX Allow these to be dispatched in parallel using
    * a thread pool.
    */
-  while (!quit) {
-    r = recv_request_send_reply (conn);
-    if (r == -1)
-      goto err;
-    if (r == 0)
-      break;
-  }
+  while (!quit && get_status (conn) > 0)
+    recv_request_send_reply (conn);
+  r = get_status (conn);
+ done:
   free_connection (conn);
-  return 0;
-
- err:
-  free_connection (conn);
-  return -1;
+  return r;
 }
 int
@@ -205,11 +228,13 @@ new_connection (int sockin, int sockout)
     return NULL;
   }
+  conn->status = 1;
   conn->sockin = sockin;
   conn->sockout = sockout;
   pthread_mutex_init (&conn->request_lock, NULL);
   pthread_mutex_init (&conn->read_lock, NULL);
   pthread_mutex_init (&conn->write_lock, NULL);
+  pthread_mutex_init (&conn->status_lock, NULL);
   conn->recv = raw_recv;
   conn->send = raw_send;
@@ -229,6 +254,7 @@ free_connection (struct connection *conn)
   pthread_mutex_destroy (&conn->request_lock);
   pthread_mutex_destroy (&conn->read_lock);
   pthread_mutex_destroy (&conn->write_lock);
+  pthread_mutex_destroy (&conn->status_lock);
   /* Don't call the plugin again if quit has been set because the main
    * thread will be in the process of unloading it.  The plugin.unload
@@ -882,21 +908,23 @@ recv_request_send_reply (struct connection *conn)
   /* Read the request packet. */
   {
     ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&conn->read_lock);
+    if (get_status (conn) < 0)
+      return -1;
     r = conn->recv (conn, &request, sizeof request);
     if (r == -1) {
       nbdkit_error ("read request: %m");
-      return -1;
+      return set_status (conn, -1);
     }
     if (r == 0) {
       debug ("client closed input socket, closing connection");
-      return 0;                   /* disconnect */
+      return set_status (conn, 0);                   /* disconnect */
     }
     magic = be32toh (request.magic);
     if (magic != NBD_REQUEST_MAGIC) {
       nbdkit_error ("invalid request: 'magic' field is incorrect
(0x%x)",
                     magic);
-      return -1;
+      return set_status (conn, -1);
     }
     cmd = be32toh (request.type);
@@ -908,14 +936,14 @@ recv_request_send_reply (struct connection *conn)
     if (cmd == NBD_CMD_DISC) {
       debug ("client sent disconnect command, closing connection");
-      return 0;                   /* disconnect */
+      return set_status (conn, 0);                   /* disconnect */
     }
     /* Validate the request. */
     if (!validate_request (conn, cmd, flags, offset, count, &error)) {
       if (cmd == NBD_CMD_WRITE &&
           skip_over_write_buffer (conn->sockin, count) < 0)
-        return -1;
+        return set_status (conn, -1);
       goto send_reply;
     }
@@ -927,7 +955,7 @@ recv_request_send_reply (struct connection *conn)
         error = ENOMEM;
         if (cmd == NBD_CMD_WRITE &&
             skip_over_write_buffer (conn->sockin, count) < 0)
-          return -1;
+          return set_status (conn, -1);
         goto send_reply;
       }
     }
@@ -941,13 +969,13 @@ recv_request_send_reply (struct connection *conn)
       }
       if (r == -1) {
         nbdkit_error ("read data: %m");
-        return -1;
+        return set_status (conn, -1);
       }
     }
   }
   /* Perform the request.  Only this part happens inside the request lock. */
-  if (quit) {
+  if (quit || !get_status (conn)) {
     error = ESHUTDOWN;
   }
   else {
@@ -960,6 +988,8 @@ recv_request_send_reply (struct connection *conn)
  send_reply:
   {
     ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&conn->write_lock);
+    if (get_status (conn) < 0)
+      return -1;
     reply.magic = htobe32 (NBD_REPLY_MAGIC);
     reply.handle = request.handle;
     reply.error = htobe32 (nbd_errno (error));
@@ -976,7 +1006,7 @@ recv_request_send_reply (struct connection *conn)
     r = conn->send (conn, &reply, sizeof reply);
     if (r == -1) {
       nbdkit_error ("write reply: %m");
-      return -1;
+      return set_status (conn, -1);
     }
     /* Send the read data buffer. */
@@ -984,7 +1014,7 @@ recv_request_send_reply (struct connection *conn)
       r = conn->send (conn, buf, count);
       if (r == -1) {
         nbdkit_error ("write data: %m");
-        return -1;
+        return set_status (conn, -1);
       }
     }
   }
-- 
2.13.6
Eric Blake
2017-Nov-20  19:38 UTC
[Libguestfs] [nbdkit PATCH v2 5/8] connections: Set up thread pool for handling client requests
Finish plumbing up everything we will need to process multiple
client requests in parallel after handshake is complete. Since
status is now global, and properly protected by a mutex, all
of the threads will eventually quit as soon as any of them
notices EOF or nbdkit detects a signal.
For ease of review, the framework for configuring threads is
done separately from the low-level work of utilizing the threads,
so this patch sees no behavior change (because we hard-code
conn->nworkers to 0); although it's a one-line hack to test that
a larger nworkers still behaves the same even for a non-parallel
plugin (in fact, such a hack was how I found and squashed several
thread-safety bugs in the previous patches, exposed from running
test-socket-activation in a loop).
Signed-off-by: Eric Blake <eblake@redhat.com>
---
 src/connections.c | 91 +++++++++++++++++++++++++++++++++++++++++++++++++------
 1 file changed, 82 insertions(+), 9 deletions(-)
diff --git a/src/connections.c b/src/connections.c
index 9d95e7f..41371fb 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -68,7 +68,7 @@ struct connection {
   int status; /* 1 for more I/O with client, 0 for shutdown, -1 on error */
   void *handle;
   void *crypto_session;
-  int nworkers; /* TODO set up a thread pool for parallel workers */
+  int nworkers;
   uint64_t exportsize;
   int readonly;
@@ -83,7 +83,8 @@ struct connection {
   connection_close_function close;
 };
-static struct connection *new_connection (int sockin, int sockout);
+static struct connection *new_connection (int sockin, int sockout,
+                                          int nworkers);
 static void free_connection (struct connection *conn);
 static int negotiate_handshake (struct connection *conn);
 static int recv_request_send_reply (struct connection *conn);
@@ -175,12 +176,39 @@ set_status (struct connection *conn, int value)
   return value;
 }
+struct worker_data {
+  struct connection *conn;
+  char *name;
+};
+
+static void *
+connection_worker (void *data)
+{
+  struct worker_data *worker = data;
+  struct connection *conn = worker->conn;
+  char *name = worker->name;
+
+  debug ("starting worker thread %s", name);
+  threadlocal_new_server_thread ();
+  threadlocal_set_name (name);
+  free (worker);
+
+  while (!quit && get_status (conn) > 0)
+    recv_request_send_reply (conn);
+  debug ("exiting worker thread %s", threadlocal_get_name ());
+  free (name);
+  return NULL;
+}
+
 static int
 _handle_single_connection (int sockin, int sockout)
 {
   int r = -1;
-  struct connection *conn = new_connection (sockin, sockout);
+  struct connection *conn;
+  int nworkers = 1; /* TODO default to 16 for parallel plugins, with
command-line override */
+  pthread_t *workers = NULL;
+  conn = new_connection (sockin, sockout, nworkers);
   if (!conn)
     goto done;
@@ -193,11 +221,55 @@ _handle_single_connection (int sockin, int sockout)
   if (negotiate_handshake (conn) == -1)
     goto done;
-  /* Process requests.  XXX Allow these to be dispatched in parallel using
-   * a thread pool.
-   */
-  while (!quit && get_status (conn) > 0)
-    recv_request_send_reply (conn);
+  if (nworkers <= 1) {
+    /* No need for a separate thread. */
+    debug ("handshake complete, processing requests serially");
+    conn->nworkers = 0;
+    while (!quit && get_status (conn) > 0)
+      recv_request_send_reply (conn);
+  }
+  else {
+    /* Create thread pool to process requests. */
+    debug ("handshake complete, processing requests with %d threads",
+           nworkers);
+    workers = calloc (nworkers, sizeof *workers);
+    if (!workers) {
+      perror ("malloc");
+      goto done;
+    }
+
+    for (nworkers = 0; nworkers < conn->nworkers; nworkers++) {
+      struct worker_data *worker = malloc (sizeof *worker);
+      int err;
+
+      if (!worker) {
+        perror ("malloc");
+        set_status (conn, -1);
+        goto wait;
+      }
+      if (asprintf (&worker->name, "%s.%d", plugin_name (),
nworkers) < 0) {
+        perror ("asprintf");
+        set_status (conn, -1);
+        free (worker);
+        goto wait;
+      }
+      worker->conn = conn;
+      err = pthread_create (&workers[nworkers], NULL, connection_worker,
+                            worker);
+      if (err) {
+        errno = err;
+        perror ("pthread_create");
+        set_status (conn, -1);
+        free (worker);
+        goto wait;
+      }
+    }
+
+  wait:
+    while (nworkers)
+      pthread_join (workers[--nworkers], NULL);
+    free (workers);
+  }
   r = get_status (conn);
  done:
@@ -218,7 +290,7 @@ handle_single_connection (int sockin, int sockout)
 }
 static struct connection *
-new_connection (int sockin, int sockout)
+new_connection (int sockin, int sockout, int nworkers)
 {
   struct connection *conn;
@@ -229,6 +301,7 @@ new_connection (int sockin, int sockout)
   }
   conn->status = 1;
+  conn->nworkers = nworkers;
   conn->sockin = sockin;
   conn->sockout = sockout;
   pthread_mutex_init (&conn->request_lock, NULL);
-- 
2.13.6
Eric Blake
2017-Nov-20  19:38 UTC
[Libguestfs] [nbdkit PATCH v2 6/8] core: Add --threads option for supporting true parallel requests
It's finally time to implement one of the TODO items: we want to
support a thread pool of parallel readers from the client, in
order to allow multiple in-flight operations with potential
out-of-order completion.  We also need at least one plugin that
supports parallel processing for testing the option; the file
plugin fits the bill.
Add and document a new command line option, -t/--threads=N,
which controls how many threads to create per connection (although
we only ever spawn multiple threads if the plugin is parallel,
since otherwise, at most one thread is running at a time anyway).
Setting -t 1 forces a parallel plugin to behave serialized,
setting to other values allows tuning for performance; the
default of 16 matches the choice of MAX_NBD_REQUESTS used in qemu.
One easy way to test:
term1$ echo hello > junk
term1$ ./nbdkit -f -v -r file file=junk rdelay=2s wdelay=1s
term2$ qemu-io -f raw nbd://localhost:10809/ --trace='nbd_*' \
  -c 'aio_read 0 1' -c 'aio_write -P 0x6c 2 2' -c
'aio_flush'
If the write completes before the read, then nbdkit was properly
handling things in parallel with out-of-order replies.
Signed-off-by: Eric Blake <eblake@redhat.com>
---
 TODO                |  7 -------
 docs/nbdkit.pod     | 12 +++++++++++-
 nbdkit.in           |  2 +-
 plugins/file/file.c |  2 +-
 src/connections.c   | 10 +++++++---
 src/internal.h      |  3 +++
 src/main.c          | 20 ++++++++++++++++++--
 src/plugins.c       |  8 ++++++++
 8 files changed, 49 insertions(+), 15 deletions(-)
diff --git a/TODO b/TODO
index 6c5bb5b..db7469b 100644
--- a/TODO
+++ b/TODO
@@ -12,10 +12,3 @@
 * Glance and/or cinder plugins.
 * Performance - measure and improve it.
-
-* Implement true parallel request handling.  Currently
-  NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS and
-  NBDKIT_THREAD_MODEL_PARALLEL are the same, because we handle
-  requests within each connection synchronously one at a time.  We
-  could (and should) be able to handle them in parallel by having
-  another thread pool for requests.
diff --git a/docs/nbdkit.pod b/docs/nbdkit.pod
index e3043ba..4593391 100644
--- a/docs/nbdkit.pod
+++ b/docs/nbdkit.pod
@@ -9,7 +9,7 @@ nbdkit - A toolkit for creating NBD servers
  nbdkit [-e EXPORTNAME] [--exit-with-parent] [-f]
         [-g GROUP] [-i IPADDR]
         [--newstyle] [--oldstyle] [-P PIDFILE] [-p PORT] [-r]
-        [--run CMD] [-s] [--selinux-label LABEL]
+        [--run CMD] [-s] [--selinux-label LABEL] [-t THREADS]
         [--tls=off|on|require] [--tls-certificates /path/to/certificates]
         [--tls-verify-peer]
         [-U SOCKET] [-u USER] [-v] [-V]
@@ -230,6 +230,16 @@ Unix domain sockets:
  nbdkit --selinux-label system_u:object_r:svirt_t:s0 ...
+=item B<-t> THREADS
+
+= item B<--threads> THREADS
+
+Set the number of threads to be used per connection, which in turn
+controls the number of outstanding requests that can be processed at
+once.  Only matters for plugins with thread_model=parallel (where it
+defaults to 16).  To force serialized behavior (useful if the client
+is not prepared for out-of-order responses), set this to 1.
+
 =item B<--tls=off>
 =item B<--tls=on>
diff --git a/nbdkit.in b/nbdkit.in
index 6be89ec..9c3d625 100644
--- a/nbdkit.in
+++ b/nbdkit.in
@@ -65,7 +65,7 @@ verbose while [ $# -gt 0 ]; do
     case "$1" in
         # Flags that take an argument.  We must not rewrite the argument.
-        -e | --export* | -g | --group | -i | --ip* | -P | --pid* | -p | --port
| --run | --selinux-label | --tls | --tls-certificates | -U | --unix | -u |
--user)
+        -e | --export* | -g | --group | -i | --ip* | -P | --pid* | -p | --port
| --run | --selinux-label | -t | --threads | --tls | --tls-certificates | -U |
--unix | -u | --user)
             args[$i]="$1"
             ((++i))
             args[$i]="$2"
diff --git a/plugins/file/file.c b/plugins/file/file.c
index a603be8..ef5da3d 100644
--- a/plugins/file/file.c
+++ b/plugins/file/file.c
@@ -200,7 +200,7 @@ file_close (void *handle)
   free (h);
 }
-#define THREAD_MODEL NBDKIT_THREAD_MODEL_SERIALIZE_REQUESTS
+#define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL
 /* Get the file size. */
 static int64_t
diff --git a/src/connections.c b/src/connections.c
index 41371fb..2d83b89 100644
--- a/src/connections.c
+++ b/src/connections.c
@@ -59,6 +59,9 @@
 /* Maximum length of any option data (bytes). */
 #define MAX_OPTION_LENGTH 4096
+/* Default number of parallel requests. */
+#define DEFAULT_PARALLEL_REQUESTS 16
+
 /* Connection structure. */
 struct connection {
   pthread_mutex_t request_lock;
@@ -205,9 +208,11 @@ _handle_single_connection (int sockin, int sockout)
 {
   int r = -1;
   struct connection *conn;
-  int nworkers = 1; /* TODO default to 16 for parallel plugins, with
command-line override */
+  int nworkers = threads ? threads : DEFAULT_PARALLEL_REQUESTS;
   pthread_t *workers = NULL;
+  if (!plugin_is_parallel() || nworkers == 1)
+    nworkers = 0;
   conn = new_connection (sockin, sockout, nworkers);
   if (!conn)
     goto done;
@@ -221,10 +226,9 @@ _handle_single_connection (int sockin, int sockout)
   if (negotiate_handshake (conn) == -1)
     goto done;
-  if (nworkers <= 1) {
+  if (!nworkers) {
     /* No need for a separate thread. */
     debug ("handshake complete, processing requests serially");
-    conn->nworkers = 0;
     while (!quit && get_status (conn) > 0)
       recv_request_send_reply (conn);
   }
diff --git a/src/internal.h b/src/internal.h
index a0bbef7..73bc09e 100644
--- a/src/internal.h
+++ b/src/internal.h
@@ -103,6 +103,8 @@ extern const char *tls_certificates_dir;
 extern int tls_verify_peer;
 extern char *unixsocket;
 extern int verbose;
+extern int threads;
+
 extern volatile int quit;
 extern int quit_fd;
@@ -152,6 +154,7 @@ extern void plugin_lock_connection (void);
 extern void plugin_unlock_connection (void);
 extern void plugin_lock_request (struct connection *conn);
 extern void plugin_unlock_request (struct connection *conn);
+extern bool plugin_is_parallel (void);
 extern int plugin_errno_is_preserved (void);
 extern int plugin_open (struct connection *conn, int readonly);
 extern void plugin_close (struct connection *conn);
diff --git a/src/main.c b/src/main.c
index 224ee8a..79f0136 100644
--- a/src/main.c
+++ b/src/main.c
@@ -84,6 +84,7 @@ int readonly;                   /* -r */
 char *run;                      /* --run */
 int listen_stdin;               /* -s */
 const char *selinux_label;      /* --selinux-label */
+int threads;                    /* -t */
 int tls;                        /* --tls : 0=off 1=on 2=require */
 const char *tls_certificates_dir; /* --tls-certificates */
 int tls_verify_peer;            /* --tls-verify-peer */
@@ -105,7 +106,7 @@ static char *random_fifo = NULL;
 enum { HELP_OPTION = CHAR_MAX + 1 };
-static const char *short_options = "e:fg:i:nop:P:rsu:U:vV";
+static const char *short_options = "e:fg:i:nop:P:rst:u:U:vV";
 static const struct option long_options[] = {
   { "help",       0, NULL, HELP_OPTION },
   { "dump-config",0, NULL, 0 },
@@ -132,6 +133,7 @@ static const struct option long_options[] = {
   { "selinux-label", 1, NULL, 0 },
   { "single",     0, NULL, 's' },
   { "stdin",      0, NULL, 's' },
+  { "threads",    1, NULL, 't' },
   { "tls",        1, NULL, 0 },
   { "tls-certificates", 1, NULL, 0 },
   { "tls-verify-peer", 0, NULL, 0 },
@@ -149,7 +151,7 @@ usage (void)
           "       [-e EXPORTNAME] [--exit-with-parent] [-f]\n"
           "       [-g GROUP] [-i IPADDR]\n"
           "       [--newstyle] [--oldstyle] [-P PIDFILE] [-p PORT]
[-r]\n"
-          "       [--run CMD] [-s] [--selinux-label LABEL]\n"
+          "       [--run CMD] [-s] [--selinux-label LABEL] [-t
THREADS]\n"
           "       [--tls=off|on|require] [--tls-certificates
/path/to/certificates]\n"
           "       [--tls-verify-peer]\n"
           "       [-U SOCKET] [-u USER] [-v] [-V]\n"
@@ -337,6 +339,20 @@ main (int argc, char *argv[])
       listen_stdin = 1;
       break;
+    case 't':
+      {
+        char *end;
+
+        errno = 0;
+        threads = strtoul (optarg, &end, 0);
+        if (errno || *end) {
+          fprintf (stderr, "%s: cannot parse '%s' into
threads\n",
+                   program_name, optarg);
+          exit (EXIT_FAILURE);
+        }
+        /* XXX Worth a maximimum limit on threads? */
+      }
+
     case 'U':
       if (socket_activation) {
         fprintf (stderr, "%s: cannot use socket activation with -U
flag\n",
diff --git a/src/plugins.c b/src/plugins.c
index e8c6b28..47c4fa5 100644
--- a/src/plugins.c
+++ b/src/plugins.c
@@ -360,6 +360,14 @@ plugin_unlock_request (struct connection *conn)
   }
 }
+bool
+plugin_is_parallel (void)
+{
+  assert (dl);
+
+  return plugin._thread_model >= NBDKIT_THREAD_MODEL_PARALLEL;
+}
+
 int
 plugin_errno_is_preserved (void)
 {
-- 
2.13.6
Eric Blake
2017-Nov-20  19:38 UTC
[Libguestfs] [nbdkit PATCH v2 7/8] tests: Annotate Makefile.am conditionals
We have a lot of nested conditionals controlling which tests to run; I'm about to add more, and it's easier to do correctly if we exploit automake's ability to warn us about improper nesting by annotating which 'if' each 'endif' belongs to. Signed-off-by: Eric Blake <eblake@redhat.com> --- tests/Makefile.am | 26 +++++++++++++------------- 1 file changed, 13 insertions(+), 13 deletions(-) diff --git a/tests/Makefile.am b/tests/Makefile.am index f2775bd..6522e05 100644 --- a/tests/Makefile.am +++ b/tests/Makefile.am @@ -1,5 +1,5 @@ # nbdkit -# Copyright (C) 2013-2016 Red Hat Inc. +# Copyright (C) 2013-2017 Red Hat Inc. # All rights reserved. # # Redistribution and use in source and binary forms, with or without @@ -100,7 +100,7 @@ check_PROGRAMS += \ test_socket_activation_SOURCES = test-socket-activation.c test_socket_activation_CFLAGS = $(WARNINGS_CFLAGS) -endif +endif HAVE_PLUGINS if HAVE_CXX # This builds a plugin using the C++ compiler. The plugin @@ -124,7 +124,7 @@ test_cxx_plugin_la_CXXFLAGS = \ # https://lists.gnu.org/archive/html/libtool/2007-07/msg00067.html test_cxx_plugin_la_LDFLAGS = \ -module -avoid-version -shared -rpath /nowhere -endif +endif HAVE_CXX # Exit with parent test. check_PROGRAMS += test-exit-with-parent @@ -219,8 +219,8 @@ disk.gz: disk rm -f $@ gzip -9 -c disk > $@ -endif -endif +endif HAVE_GUESTFISH +endif HAVE_ZLIB # xz plugin test. if HAVE_LIBLZMA @@ -239,8 +239,8 @@ disk.xz: disk rm -f $@ xz --best -c disk > $@ -endif -endif +endif HAVE_GUESTFISH +endif HAVE_LIBLZMA # perl plugin test. if HAVE_PERL @@ -254,7 +254,7 @@ test_perl_CFLAGS = \ $(WARNINGS_CFLAGS) $(LIBGUESTFS_CFLAGS) test_perl_LDADD = libtest.la $(LIBGUESTFS_LIBS) -endif +endif HAVE_PERL # python plugin test. if HAVE_PYTHON @@ -268,7 +268,7 @@ test_python_CFLAGS = \ $(WARNINGS_CFLAGS) $(LIBGUESTFS_CFLAGS) test_python_LDADD = libtest.la $(LIBGUESTFS_LIBS) -endif +endif HAVE_PYTHON # OCaml plugin test. if HAVE_OCAML @@ -290,7 +290,7 @@ test-ocaml-plugin.so: test_ocaml_plugin.ml ../plugins/ocaml/libnbdkitocaml.la .. NBDKit.cmx $< \ -cclib -L../plugins/ocaml/.libs -cclib -lnbdkitocaml -endif +endif HAVE_OCAML # Ruby plugin test. if HAVE_RUBY @@ -304,7 +304,7 @@ test_ruby_CFLAGS = \ $(WARNINGS_CFLAGS) $(LIBGUESTFS_CFLAGS) test_ruby_LDADD = libtest.la $(LIBGUESTFS_LIBS) -endif +endif HAVE_RUBY # streaming plugin test. check_PROGRAMS += test-streaming @@ -315,5 +315,5 @@ test_streaming_SOURCES = test-streaming.c test.h test_streaming_CFLAGS = $(WARNINGS_CFLAGS) $(LIBGUESTFS_CFLAGS) test_streaming_LDADD = libtest.la $(LIBGUESTFS_LIBS) -endif -endif +endif HAVE_PLUGINS +endif HAVE_LIBGUESTFS -- 2.13.6
Eric Blake
2017-Nov-20  19:38 UTC
[Libguestfs] [nbdkit PATCH v2 8/8] tests: Test parallel behavior
Codify the testing procedure mentioned during the addition of --threads
into a full-blown member of 'make check'.  With the delay values I
chose, the test requires at least 5 seconds to complete; I don't know
if it is worth trying to fine-tune it to run faster while still being
robust in the presence of a heavy load.
This requires a slight refactoring of the Makefile.am to provide
file-data to more consumers.
Signed-off-by: Eric Blake <eblake@redhat.com>
---
 configure.ac                |  4 +++
 tests/Makefile.am           | 29 +++++++++++++-----
 tests/test-parallel-file.sh | 71 +++++++++++++++++++++++++++++++++++++++++++++
 3 files changed, 96 insertions(+), 8 deletions(-)
 create mode 100755 tests/test-parallel-file.sh
diff --git a/configure.ac b/configure.ac
index 130ab78..68b0938 100644
--- a/configure.ac
+++ b/configure.ac
@@ -407,6 +407,10 @@ dnl Check for guestfish (only needed for some of the
tests).
 AC_CHECK_PROG([GUESTFISH], [guestfish], [guestfish], [no])
 AM_CONDITIONAL([HAVE_GUESTFISH], [test "x$GUESTFISH" !=
"xno"])
+dnl Check for qemu-io (only needed for some of the tests).
+AC_CHECK_PROG([QEMU_IO], [qemu-io], [qemu-io], [no])
+AM_CONDITIONAL([HAVE_QEMU_IO], [test "x$QEMU_IO" != "xno"])
+
 dnl See plugins/vddk/README.VDDK.
 AC_CHECK_SIZEOF([size_t])
 AS_IF([test "x$ac_cv_sizeof_size_t" =
"x4"],[bits=32],[bits=64])
diff --git a/tests/Makefile.am b/tests/Makefile.am
index 6522e05..c5429cb 100644
--- a/tests/Makefile.am
+++ b/tests/Makefile.am
@@ -47,6 +47,7 @@ EXTRA_DIST = \
 	test-ipv4.sh \
 	test_ocaml_plugin.ml \
 	test-ocaml.c \
+	test-parallel-file.sh \
 	test.pl \
 	test.py \
 	test.rb \
@@ -138,7 +139,26 @@ check_DATA += pki/.stamp
 pki/.stamp: $(srcdir)/make-pki.sh
 	$(srcdir)/make-pki.sh
-# In-depth tests need libguestfs, since that is a convenient way to
+if HAVE_PLUGINS
+# Common data shared by multiple tests
+check_DATA += file-data
+MAINTAINERCLEANFILES += file-data
+file-data:
+	rm -f $@ $@-t
+	for f in `seq 1 512`; do echo -ne '\x01\x02\x03\x04\x05\x06\x07\x08';
done > $@-t
+	mv $@-t $@
+
+# While most tests need libguestfs, testing parallel I/O is easier when
+# using qemu-io to kick off asynchronous requests.
+if HAVE_QEMU_IO
+TESTS_ENVIRONMENT += QEMU_IO=$(QEMU_IO)
+TESTS += test-parallel-file.sh
+endif HAVE_QEMU_IO
+
+endif HAVE_PLUGINS
+
+
+# Most in-depth tests need libguestfs, since that is a convenient way to
 # drive qemu.
 if HAVE_LIBGUESTFS
@@ -185,18 +205,11 @@ test_oldstyle_LDADD = libtest.la $(LIBGUESTFS_LIBS)
 # file plugin test.
 check_PROGRAMS += test-file
 TESTS += test-file
-check_DATA += file-data
-MAINTAINERCLEANFILES += file-data
 test_file_SOURCES = test-file.c test.h
 test_file_CFLAGS = $(WARNINGS_CFLAGS) $(LIBGUESTFS_CFLAGS)
 test_file_LDADD = libtest.la $(LIBGUESTFS_LIBS)
-file-data:
-	rm -f $@ $@-t
-	for f in `seq 1 512`; do echo -ne '\x01\x02\x03\x04\x05\x06\x07\x08';
done > $@-t
-	mv $@-t $@
-
 # gzip plugin test.
 if HAVE_ZLIB
 if HAVE_GUESTFISH
diff --git a/tests/test-parallel-file.sh b/tests/test-parallel-file.sh
new file mode 100755
index 0000000..63b5684
--- /dev/null
+++ b/tests/test-parallel-file.sh
@@ -0,0 +1,71 @@
+#!/bin/bash -
+# nbdkit
+# Copyright (C) 2017 Red Hat Inc.
+# All rights reserved.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are
+# met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# * Neither the name of Red Hat nor the names of its contributors may be
+# used to endorse or promote products derived from this software without
+# specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY RED HAT AND CONTRIBUTORS ''AS
IS'' AND
+# ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO,
+# THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A
+# PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL RED HAT OR
+# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF
+# USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND
+# ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
+# OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT
+# OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF
+# SUCH DAMAGE.
+
+# Makefile sets $QEMU_IO and builds file-data, but it's also nice if the
+# script runs again standalone afterwards for diagnosing any failures
+test -f file-data || { echo "Missing file-data"; exit 77; }
+: ${QEMU_IO=qemu-io}
+
+# Sanity check that qemu-io can issue parallel requests
+$QEMU_IO -f raw -c "aio_read 0 1" -c "aio_write -P 2 1 1"
-c aio_flush \
+	 file-data || exit 77;
+
+# Set up the file plugin to delay both reads and writes (for a good chance
+# that parallel requests are in flight), and with reads longer than writes
+# (to more easily detect if out-of-order completion happens).  This test
+# may have spurious failures under heavy loads on the test machine, where
+# tuning the delays may help.
+
+trap 'rm -f test-parallel-file.out' 0 1 2 3 15
+
+# With --threads=1, the read should complete first because it was issued first
+nbdkit -v -t 1 -U - file file=file-data rdelay=2 wdelay=1 --run '
+    $QEMU_IO -f raw -c "aio_read 0 1" -c "aio_write -P 2 1
1" -c aio_flush $nbd
+' | tee test-parallel-file.out
+if test "$(grep '1/1' test-parallel-file.out)" != \
+"read 1/1 bytes at offset 0
+wrote 1/1 bytes at offset 1"; then
+    exit 1
+fi
+
+# With default --threads, the faster write should complete first
+nbdkit -v -U - file file=file-data rdelay=2 wdelay=1 --run '
+    $QEMU_IO -f raw -c "aio_read 0 1" -c "aio_write -P 2 1
1" -c aio_flush $nbd
+' | tee test-parallel-file.out
+if test "$(grep '1/1' test-parallel-file.out)" != \
+"wrote 1/1 bytes at offset 1
+read 1/1 bytes at offset 0"; then
+    exit 1
+fi
+
+exit 0
-- 
2.13.6
Richard W.M. Jones
2017-Nov-21  10:31 UTC
Re: [Libguestfs] [nbdkit PATCH v2 0/8] Support parallel transactions within single connection
ACK series. I'll do an upstream release once you've pushed this. Thanks, Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com libguestfs lets you edit virtual machines. Supports shell scripting, bindings from many languages. http://libguestfs.org
Eric Blake
2017-Nov-21  19:38 UTC
Re: [Libguestfs] [nbdkit PATCH v2 0/8] Support parallel transactions within single connection
On 11/21/2017 04:31 AM, Richard W.M. Jones wrote:> > ACK series. > > I'll do an upstream release once you've pushed this.I've pushed the two series that you've reviewed, and have finally posted my parallel nbd series; once that is reviewed, I agree that we are ready for a release. There's other things that may still be worth adding to the nbd forwarder over time (auto-fragmenting of large requests down to the 32M limits mentioned in the spec, or even configurable down to the 1M limit of ancient qemu 2.5; allowing a TCP address and port rather than just a Unix socket; incorporating TLS client communications to allow 'old client => Unix => nbd => encrypted TCP => new server' as a nice counterpart to our existing 'new client => encrypted TCP => nbd => Unix => old server'; implementing upstream NBD extensions such as NBD_OPT_GO support) - but I don't plan to write them this week, so they aren't a reason to hold up an upstream release. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3266 Virtualization: qemu.org | libvirt.org
Reasonably Related Threads
- [libnbd PATCH] API: Add nbd_set_opt_mode to expose NEGOTIATING state
- [libnbd PATCH] api: Add set_handshake_flags for integration
- [nbdkit PATCH 0/2] more protocol.h tweaks
- [nbdkit PATCH v2 0/8] Support parallel transactions within single connection
- [nbdkit PATCH v2 0/7] Spec compliance patches