Richard W.M. Jones
2019-Jun-03  15:29 UTC
[Libguestfs] [PATCH libnbd discussion only 0/5] api: Implement concurrent writer.
This works, but there's no time saving and I'm still investigating whether it does what I think it does. Nevertheless I thought I would post it because it (probably) implements the idea I had last night outlined in: https://www.redhat.com/archives/libguestfs/2019-June/msg00010.html The meat of the change is patch 4. Patch 5 is an example which I would probably fold into patch 4 for submission. The others are non-controversial documentation and refactorings to prepare for the change. Rich.
Richard W.M. Jones
2019-Jun-03  15:29 UTC
[Libguestfs] [PATCH libnbd discussion only 1/5] docs: Document NBD URI support.
---
 docs/libnbd.pod | 19 +++++++++++++++++++
 1 file changed, 19 insertions(+)
diff --git a/docs/libnbd.pod b/docs/libnbd.pod
index b909833..f299ef1 100644
--- a/docs/libnbd.pod
+++ b/docs/libnbd.pod
@@ -228,6 +228,8 @@ For C<nbd_connect_tcp> the third parameter is the port
name or number,
 which can either be a name from F</etc/services> or the port number as
 a string (eg. C<"10809">).
 
+=head2 Connecting to a subprocess
+
 Some NBD servers — notably L<nbdkit(1)> with the C<-s> parameter —
can
 also accept a single NBD connection on stdin/stdout.  You can run
 these servers as a subprocess of your main program.  This example
@@ -237,6 +239,23 @@ as the libnbd handle is closed:
  char *argv[] = { "nbdkit", "-s", "memory",
"1G", NULL };
  nbd_connect_command (nbd, argv);
 
+=head2 Connecting to a URI
+
+libnbd supports the NBD URI specification.
+
+This specification is currently evolving, and discussion about it can
+be found on the L<NBD mailing list|https://lists.debian.org/nbd/>.  A
+final link to the specification will be added to this documentation
+when it is available.
+
+To connect to a URI via the high level API, use:
+
+ nbd_connect_uri (nbd, "nbd://example.com/");
+
+This feature is implemented by calling other libnbd APIs to set up the
+export name, TLS parameters, and finally connect over a Unix domain
+socket or TCP.
+
 =head1 EXPORTS AND FLAGS
 
 It is possible for NBD servers to serve different content on different
-- 
2.21.0
Richard W.M. Jones
2019-Jun-03  15:29 UTC
[Libguestfs] [PATCH libnbd discussion only 2/5] docs: Document multiple in-flight requests.
---
 docs/libnbd.pod | 29 ++++++++++++++++++++++++++++-
 1 file changed, 28 insertions(+), 1 deletion(-)
diff --git a/docs/libnbd.pod b/docs/libnbd.pod
index f299ef1..7cbb9cd 100644
--- a/docs/libnbd.pod
+++ b/docs/libnbd.pod
@@ -334,7 +334,30 @@ to prefetch.
 
 =back
 
-=head1 MULTI-CONN
+=head1 PERFORMANCE
+
+=head2 Issuing multiple in-flight requests
+
+NBD servers which properly implement the spec can handle multiple
+requests in flight over the same connection at the same time.  Libnbd
+supports this when using the low level API.  To use it you simply
+issue more requests as needed (eg. using calls like C<nbd_aio_pread>,
+C<nbd_aio_pwrite>) without waiting for previous commands to complete.
+
+Each request is identified by a unique 64 bit handle (assigned by
+libnbd), allowing libnbd and callers to match replies to requests.
+Replies may arrive out of order.
+
+Although in theory you can have an indefinite number of requests in
+flight at the same time, in practice it's a good idea to limit them to
+some number.  It is suggested to start with a limit of 16 requests in
+flight (per NBD connection), and measure how adjusting the limit up
+and down affects performance for your local configuration.
+
+There is a full example using multiple in-flight requests available at
+L<https://github.com/libguestfs/libnbd/blob/master/examples/threaded-reads-and-writes.c>
+
+=head2 Multi-conn
 
 Some NBD servers advertise “multi-conn” which means that it is safe to
 make multiple connections to the server and load-balance commands
@@ -358,6 +381,10 @@ If multi-conn is supported then you can open further
connections:
    nbd[i]_connect_tcp (nbd[i], "server", "10809");
  }
 
+If you are issuing multiple in-flight requests (see above) and
+limiting the number, then the limit should be applied to each
+individual NBD connection.
+
 =head1 ENCRYPTION AND AUTHENTICATION
 
 The NBD protocol and libnbd supports TLS (sometimes incorrectly called
-- 
2.21.0
Richard W.M. Jones
2019-Jun-03  15:29 UTC
[Libguestfs] [PATCH libnbd discussion only 3/5] lib: Pass handle to socket recv and send calls.
Just a simple refactoring in preparation for forthcoming work.
---
 generator/states-reply.c | 2 +-
 generator/states.c       | 4 ++--
 lib/crypto.c             | 5 +++--
 lib/internal.h           | 6 ++++--
 lib/socket.c             | 5 +++--
 5 files changed, 13 insertions(+), 9 deletions(-)
diff --git a/generator/states-reply.c b/generator/states-reply.c
index 5be3431..f0ef47c 100644
--- a/generator/states-reply.c
+++ b/generator/states-reply.c
@@ -36,7 +36,7 @@
   h->rbuf = &h->sbuf;
   h->rlen = sizeof h->sbuf.simple_reply;
 
-  r = h->sock->ops->recv (h->sock, h->rbuf, h->rlen);
+  r = h->sock->ops->recv (h, h->sock, h->rbuf, h->rlen);
   if (r == -1) {
     /* This should never happen because when we enter this state we
      * should have notification that the socket is ready to read.
diff --git a/generator/states.c b/generator/states.c
index 834fa44..bce4f85 100644
--- a/generator/states.c
+++ b/generator/states.c
@@ -61,7 +61,7 @@ recv_into_rbuf (struct nbd_handle *h)
     rlen = h->rlen > sizeof buf ? sizeof buf : h->rlen;
   }
 
-  r = h->sock->ops->recv (h->sock, rbuf, rlen);
+  r = h->sock->ops->recv (h, h->sock, rbuf, rlen);
   if (r == -1) {
     if (errno == EAGAIN || errno == EWOULDBLOCK)
       return 1;                 /* more data */
@@ -92,7 +92,7 @@ send_from_wbuf (struct nbd_handle *h)
 
   if (h->wlen == 0)
     return 0;                   /* move to next state */
-  r = h->sock->ops->send (h->sock, h->wbuf, h->wlen);
+  r = h->sock->ops->send (h, h->sock, h->wbuf, h->wlen);
   if (r == -1) {
     if (errno == EAGAIN || errno == EWOULDBLOCK)
       return 1;                 /* more data */
diff --git a/lib/crypto.c b/lib/crypto.c
index c437788..aba2e27 100644
--- a/lib/crypto.c
+++ b/lib/crypto.c
@@ -145,7 +145,7 @@ nbd_unlocked_set_tls_psk_file (struct nbd_handle *h, const
char *filename)
 #ifdef HAVE_GNUTLS
 
 static ssize_t
-tls_recv (struct socket *sock, void *buf, size_t len)
+tls_recv (struct nbd_handle *h, struct socket *sock, void *buf, size_t len)
 {
   ssize_t r;
 
@@ -163,7 +163,8 @@ tls_recv (struct socket *sock, void *buf, size_t len)
 }
 
 static ssize_t
-tls_send (struct socket *sock, const void *buf, size_t len)
+tls_send (struct nbd_handle *h,
+          struct socket *sock, const void *buf, size_t len)
 {
   ssize_t r;
 
diff --git a/lib/internal.h b/lib/internal.h
index 73cb3f9..c8e5094 100644
--- a/lib/internal.h
+++ b/lib/internal.h
@@ -188,8 +188,10 @@ struct close_callback {
 };
 
 struct socket_ops {
-  ssize_t (*recv) (struct socket *sock, void *buf, size_t len);
-  ssize_t (*send) (struct socket *sock, const void *buf, size_t len);
+  ssize_t (*recv) (struct nbd_handle *h,
+                   struct socket *sock, void *buf, size_t len);
+  ssize_t (*send) (struct nbd_handle *h,
+                   struct socket *sock, const void *buf, size_t len);
   int (*get_fd) (struct socket *sock);
   int (*close) (struct socket *sock);
 };
diff --git a/lib/socket.c b/lib/socket.c
index df933be..f48e455 100644
--- a/lib/socket.c
+++ b/lib/socket.c
@@ -30,7 +30,7 @@
 #include "internal.h"
 
 static ssize_t
-socket_recv (struct socket *sock, void *buf, size_t len)
+socket_recv (struct nbd_handle *h, struct socket *sock, void *buf, size_t len)
 {
   ssize_t r;
 
@@ -41,7 +41,8 @@ socket_recv (struct socket *sock, void *buf, size_t len)
 }
 
 static ssize_t
-socket_send (struct socket *sock, const void *buf, size_t len)
+socket_send (struct nbd_handle *h,
+             struct socket *sock, const void *buf, size_t len)
 {
   ssize_t r;
 
-- 
2.21.0
Richard W.M. Jones
2019-Jun-03  15:29 UTC
[Libguestfs] [PATCH libnbd discussion only 4/5] api: Implement concurrent writer.
---
 docs/libnbd.pod     | 73 +++++++++++++++++++++++++++++++++++++++++++++
 generator/generator | 52 +++++++++++++++++++++++++++-----
 lib/handle.c        | 32 ++++++++++++++++++++
 lib/internal.h      |  7 +++++
 lib/socket.c        | 22 +++++++++++---
 podwrapper.pl.in    |  3 +-
 6 files changed, 177 insertions(+), 12 deletions(-)
diff --git a/docs/libnbd.pod b/docs/libnbd.pod
index 7cbb9cd..ab74be3 100644
--- a/docs/libnbd.pod
+++ b/docs/libnbd.pod
@@ -385,6 +385,79 @@ If you are issuing multiple in-flight requests (see above)
and
 limiting the number, then the limit should be applied to each
 individual NBD connection.
 
+=head2 Concurrent writer thread
+
+To achieve the maximum possible performance from libnbd and NBD
+servers, as well as the above techniques you must also use a
+concurrent writer thread.  This feature allows requests to be issued
+on the NBD socket at the same time that replies are being read from
+the socket.  In other words L<send(2)> and L<recv(2)> calls will be
+running at the same time on the same socket.
+
+There is a full example using a concurrent writer available at
+L<https://github.com/libguestfs/libnbd/blob/master/examples/concurrent-writer.c>
+
+To implement this, you change your ordinary AIO code in four ways:
+
+=over 4
+
+=item 1. Call nbd_set_concurrent_writer
+
+ struct writer_data {
+   struct nbd_handle *nbd;
+   /* other data here as required */
+ } data;
+ 
+ nbd_set_concurrent_writer (nbd, &data, writer);
+
+This function can be called on the handle at any time, either after
+the handle is created or after the connection and handshaking has
+completed.
+
+=item 2. Implement non-blocking writer callback
+
+C<writer> is a I<non-blocking> callback which enqueues the buffer
into
+a ring or similar FIFO structure:
+
+ struct ring_item {
+   struct writer_data *data;
+   const void *buf;
+   size_t len;
+ };
+ 
+ void writer (void *data, const void *buf, size_t len)
+ {
+   struct ring_item item;
+ 
+   /* add (data, buf, len) to a shared ring */
+   item.data = data;
+   item.buf = malloc (len);
+   memcpy (item.buf, buf, len);
+   item.len = len;
+   ring_add (&item);
+ 
+   writer_signal ();   /* kick the writer thread */
+ }
+
+=item 3. Implement writer thread
+
+You must also supply another thread which picks up data off the ring
+and writes it to the socket (see C<nbd_aio_get_fd>).  If there an
+error when writing to the socket, call C<nbd_concurrent_writer_error>
+with the C<errno>.
+
+You have a choice of whether to implement one thread per nbd_handle or
+one thread shared between all handles.
+
+=item 4. Modify main loop
+
+Finally your main loop can unconditionally call
+C<nbd_aio_notify_write> when C<nbd_aio_get_direction> returns
C<WRITE>
+or C<BOTH> (since the concurrent thread can always enqueue more data
+and so is always "ready to write").
+
+=back
+
 =head1 ENCRYPTION AND AUTHENTICATION
 
 The NBD protocol and libnbd supports TLS (sometimes incorrectly called
diff --git a/generator/generator b/generator/generator
index db7c10f..2b48c67 100755
--- a/generator/generator
+++ b/generator/generator
@@ -1094,6 +1094,35 @@ C<\"qemu:dirty-bitmap:...\"> for qemu-nbd
 (see qemu-nbd I<-B> option).  See also C<nbd_block_status>.";
   };
 
+  "set_concurrent_writer", {
+    default_call with
+    args = [ Opaque "data";
+             CallbackPersist ("writer", [Opaque "data";
+                                         BytesIn ("buf",
"len")]) ];
+    ret = RErr;
+    permitted_states = [ Created; Connecting; Connected ];
+    shortdesc = "set a concurrent writer thread";
+    longdesc = "\
+Provide an optional concurrent writer thread for better performance.
+See L<libnbd(3)/Concurrent writer thread> for how to use this.";
+  };
+
+  "concurrent_writer_error", {
+    default_call with
+    args = [ Int "err" ]; ret = RErr;
+    shortdesc = "signal an error from the concurrent writer thread";
+    longdesc = "\
+This can be called from the concurrent writer thread to signal
+that there was an error writing to the socket.  As there is no
+way to recover from such errors, the connection will move to the
+dead state soon after.
+
+The parameter is the C<errno> returned by the failed L<send(2)>
call.
+It must be non-zero.
+
+See L<libnbd(3)/Concurrent writer thread> for how to use this.";
+  };
+
   "connect_uri", {
     default_call with
     args = [ String "uri" ]; ret = RErr;
@@ -3157,12 +3186,13 @@ let print_python_binding name { args; ret }             
pr "  PyObject *py_%s = PyList_New (%s);\n" n len;
             pr "  for (size_t i = 0; i < %s; ++i)\n" len;
             pr "    PyList_SET_ITEM (py_%s, i, PyLong_FromUnsignedLong
(%s[i]));\n" n n
+         | BytesIn _ -> ()
          | Opaque n ->
             pr "  struct %s_%s_data *_data = %s;\n" name cb_name n
          | String n
          | UInt64 n -> ()
          (* The following not yet implemented for callbacks XXX *)
-         | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+         | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist
_
          | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
@@ -3173,11 +3203,12 @@ let print_python_binding name { args; ret }        
List.iter (
          function
          | ArrayAndLen (UInt32 n, len) -> pr " \"O\""
+         | BytesIn (n, len) -> pr " \"y#\""
          | Opaque n -> pr " \"O\""
          | String n -> pr " \"s\""
          | UInt64 n -> pr " \"K\""
          (* The following not yet implemented for callbacks XXX *)
-         | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+         | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist
_
          | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
@@ -3186,11 +3217,12 @@ let print_python_binding name { args; ret }        
List.iter (
          function
          | ArrayAndLen (UInt32 n, _) -> pr ", py_%s" n
+         | BytesIn (n, len) -> pr ", %s, (int) %s" n len
          | Opaque _ -> pr ", _data->data"
          | String n
          | UInt64 n -> pr ", %s" n
          (* The following not yet implemented for callbacks XXX *)
-         | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+         | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist
_
          | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
@@ -3217,11 +3249,12 @@ let print_python_binding name { args; ret }          
function
          | ArrayAndLen (UInt32 n, _) ->
             pr "  Py_DECREF (py_%s);\n" n
+         | BytesIn _
          | String _
          | UInt64 _
          | Opaque _ -> ()
          (* The following not yet implemented for callbacks XXX *)
-         | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+         | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist
_
          | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
@@ -3899,10 +3932,11 @@ let print_ocaml_binding (name, { args; ret })        
let argnames           List.map (
            function
-           | ArrayAndLen (UInt32 n, _) | String n | UInt64 n | Opaque n ->
+           | ArrayAndLen (UInt32 n, _) | BytesIn (n, _)
+           | String n | UInt64 n | Opaque n ->
               n ^ "v"
            (* The following not yet implemented for callbacks XXX *)
-           | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+           | ArrayAndLen _ | Bool _ | BytesOut _
            | BytesPersistIn _ | BytesPersistOut _
            | Callback _ | CallbackPersist _
            | Flags _ | Int _ | Int64 _ | Path _
@@ -3928,6 +3962,9 @@ let print_ocaml_binding (name, { args; ret })           |
ArrayAndLen (UInt32 n, count) ->
             pr "  %sv = nbd_internal_ocaml_alloc_int32_array (%s,
%s);\n"
                n n count;
+         | BytesIn (n, len) ->
+            pr "  %sv = caml_alloc_string (%s);\n" n len;
+            pr "  memcpy (String_val (%sv), %s, %s);\n" n n len
          | String n ->
             pr "  %sv = caml_copy_string (%s);\n" n n
          | UInt64 n ->
@@ -3937,7 +3974,7 @@ let print_ocaml_binding (name, { args; ret })             
pr "  fnv = *_%s->cb;\n" n;
             pr "  %sv = *_%s->data;\n" n n
          (* The following not yet implemented for callbacks XXX *)
-         | ArrayAndLen _ | Bool _ | BytesIn _ | BytesOut _
+         | ArrayAndLen _ | Bool _ | BytesOut _
          | BytesPersistIn _ | BytesPersistOut _ | Callback _ | CallbackPersist
_
          | Flags _ | Int _ | Int64 _ | Path _ | SockAddrAndLen _ | StringList _
          | UInt _ | UInt32 _ -> assert false
@@ -4192,6 +4229,7 @@ let generate_ocaml_nbd_c ()    pr "\n";
   pr "#include <stdio.h>\n";
   pr "#include <stdlib.h>\n";
+  pr "#include <string.h>\n";
   pr "\n";
   pr "#include <libnbd.h>\n";
   pr "\n";
diff --git a/lib/handle.c b/lib/handle.c
index cc311ba..cc5d40f 100644
--- a/lib/handle.c
+++ b/lib/handle.c
@@ -215,6 +215,38 @@ nbd_add_close_callback (struct nbd_handle *h,
nbd_close_callback cb, void *data)
   return ret;
 }
 
+int
+nbd_unlocked_set_concurrent_writer (struct nbd_handle *h,
+                                    void *data, writer_cb writer)
+{
+  /* I suppose we could allow this, but it seems more likely that
+   * it's an error rather than intentional.
+   */
+  if (h->writer != NULL) {
+    set_error (EINVAL, "concurrent writer was already set for this
handle");
+    return -1;
+  }
+
+  h->writer = writer;
+  h->writer_data = data;
+  return 0;
+}
+
+int
+nbd_unlocked_concurrent_writer_error (struct nbd_handle *h, int err)
+{
+  if (err != 0) {
+    set_error (EINVAL, "concurrent writer error parameter must be
non-zero");
+    return -1;
+  }
+
+  /* Ignore second and subsequent calls, record only the first error. */
+  if (h->writer_error == 0)
+    h->writer_error = err;
+
+  return 0;
+}
+
 const char *
 nbd_unlocked_get_package_name (struct nbd_handle *h)
 {
diff --git a/lib/internal.h b/lib/internal.h
index c8e5094..c41741d 100644
--- a/lib/internal.h
+++ b/lib/internal.h
@@ -43,6 +43,8 @@ struct close_callback;
 struct socket;
 struct command_in_flight;
 
+typedef void (*writer_cb) (void *data, const void *buf, size_t len);
+
 struct nbd_handle {
   /* Lock protecting concurrent access to the handle. */
   pthread_mutex_t lock;
@@ -90,6 +92,11 @@ struct nbd_handle {
   /* The socket or a wrapper if using GnuTLS. */
   struct socket *sock;
 
+  /* Writer callback if using concurrent writer. */
+  void *writer_data;
+  writer_cb writer;
+  int writer_error;
+
   /* Generic way to read into a buffer - set rbuf to point to a
    * buffer, rlen to the amount of data you expect, and in the state
    * machine call recv_into_rbuf.
diff --git a/lib/socket.c b/lib/socket.c
index f48e455..c6fba6d 100644
--- a/lib/socket.c
+++ b/lib/socket.c
@@ -46,10 +46,24 @@ socket_send (struct nbd_handle *h,
 {
   ssize_t r;
 
-  r = send (sock->u.fd, buf, len, 0);
-  if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
-    set_error (errno, "send");
-  return r;
+  if (!h->writer) {
+    r = send (sock->u.fd, buf, len, 0);
+    if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK)
+      set_error (errno, "send");
+    return r;
+  }
+  else if (h->writer_error) {
+    /* Concurrent writer thread signaled an error earlier, so
+     * return that here.
+     */
+    set_error (h->writer_error, "concurrent writer thread error");
+    return -1;
+  }
+  else {
+    /* Pass the buffer to the concurrent writer thread. */
+    h->writer (h->writer_data, buf, len);
+    return len;
+  }
 }
 
 static int
diff --git a/podwrapper.pl.in b/podwrapper.pl.in
index 2471807..ecff2d6 100755
--- a/podwrapper.pl.in
+++ b/podwrapper.pl.in
@@ -324,7 +324,8 @@ foreach (@lines) {
     die "$progname: $input: line too long:\n$_\n"
         if length $_ > 76 &&
         substr ($_, 0, 1) ne ' ' &&
-        ! m/https?:/;
+        ! m/https?:/ &&
+        ! m/connected and finished handshaking/;
 }
 
 # Output man page.
-- 
2.21.0
Richard W.M. Jones
2019-Jun-03  15:29 UTC
[Libguestfs] [PATCH libnbd discussion only 5/5] examples: Add concurrent writer example.
---
 .gitignore                           |   1 +
 examples/Makefile.am                 |  12 +
 examples/concurrent-writer.c         | 450 +++++++++++++++++++++++++++
 examples/threaded-reads-and-writes.c |   2 +-
 4 files changed, 464 insertions(+), 1 deletion(-)
diff --git a/.gitignore b/.gitignore
index 30438c1..e4dad91 100644
--- a/.gitignore
+++ b/.gitignore
@@ -41,6 +41,7 @@ Makefile.in
 /docs/libnbd-api.3
 /docs/libnbd-api.pod
 /examples/batched-read-write
+/examples/concurrent-writer
 /examples/threaded-reads-and-writes
 /examples/simple-fetch-first-sector
 /examples/simple-reads-and-writes
diff --git a/examples/Makefile.am b/examples/Makefile.am
index b933873..b5f7e44 100644
--- a/examples/Makefile.am
+++ b/examples/Makefile.am
@@ -21,6 +21,7 @@ EXTRA_DIST = LICENSE-FOR-EXAMPLES
 
 noinst_PROGRAMS = \
 	batched-read-write \
+	concurrent-writer \
 	simple-fetch-first-sector \
 	simple-reads-and-writes \
 	threaded-reads-and-writes
@@ -54,6 +55,17 @@ threaded_reads_and_writes_LDADD = \
 	$(top_builddir)/lib/libnbd.la \
 	$(PTHREAD_LIBS)
 
+concurrent_writer_SOURCES = \
+	concurrent-writer.c
+concurrent_writer_CPPFLAGS = \
+	-I$(top_srcdir)/include
+concurrent_writer_CFLAGS = \
+	$(WARNINGS_CFLAGS) \
+	$(PTHREAD_CFLAGS)
+concurrent_writer_LDADD = \
+	$(top_builddir)/lib/libnbd.la \
+	$(PTHREAD_LIBS)
+
 batched_read_write_SOURCES = \
 	batched-read-write.c
 batched_read_write_CPPFLAGS = \
diff --git a/examples/concurrent-writer.c b/examples/concurrent-writer.c
new file mode 100644
index 0000000..11a9f22
--- /dev/null
+++ b/examples/concurrent-writer.c
@@ -0,0 +1,450 @@
+/* Example usage with nbdkit:
+ *
+ * nbdkit -U - memory 1M --run './concurrent-writer $unixsocket'
+ *
+ * This will read and write randomly over the first megabyte of the
+ * plugin using multi-conn, multiple threads, multiple requests in
+ * flight on each connection, and concurrent writer threads.
+ *
+ * To run it against a remote server over TCP (note this will destroy
+ * the first megabyte of the remote disk):
+ *
+ * ./concurrent-writer hostname port
+ *  or
+ * ./concurrent-writer nbd://hostname:port
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <stdbool.h>
+#include <stdint.h>
+#include <string.h>
+#include <unistd.h>
+#include <errno.h>
+#include <signal.h>
+#include <poll.h>
+#include <time.h>
+#include <assert.h>
+
+#include <pthread.h>
+
+#include <libnbd.h>
+
+static int64_t exportsize;
+
+/* Number of simultaneous connections to the NBD server.  The number
+ * of threads is NR_MULTI_CONN * 2 because there is one thread reading
+ * plus a concurrent writer thread.  Note that some servers only
+ * support a limited number of simultaneous connections, and/or have a
+ * configurable thread pool internally, and if you exceed those limits
+ * then something will break.
+ */
+#define NR_MULTI_CONN 8
+
+/* Number of commands that can be "in flight" at the same time on
each
+ * connection.  (Therefore the total number of requests in flight may
+ * be up to NR_MULTI_CONN * MAX_IN_FLIGHT).  qemu's NBD client can
+ * have up to 16 requests in flight.
+ *
+ * Some servers do not support multiple requests in flight and may
+ * deadlock or even crash if this is larger than 1, but common NBD
+ * servers should be OK.
+ */
+#define MAX_IN_FLIGHT 16
+
+/* Number of commands we issue (per thread). */
+#define NR_CYCLES 1000000
+
+/* Reader thread. */
+struct reader_status {
+  size_t i;                     /* Thread index, 0 .. NR_MULTI_CONN-1 */
+  int argc;                     /* Command line parameters. */
+  char **argv;
+  int status;                   /* Return status. */
+  unsigned requests;            /* Total number of requests made. */
+  unsigned most_in_flight;      /* Most requests seen in flight. */
+};
+
+static void *start_reader_thread (void *arg);
+
+int
+main (int argc, char *argv[])
+{
+  struct nbd_handle *nbd;
+  pthread_t reader_threads[NR_MULTI_CONN];
+  struct reader_status reader_status[NR_MULTI_CONN];
+  size_t i;
+  int err;
+  unsigned requests, most_in_flight, errors;
+
+  srand (time (NULL));
+
+  if (argc < 2 || argc > 3) {
+    fprintf (stderr, "%s uri | socket | hostname port\n", argv[0]);
+    exit (EXIT_FAILURE);
+  }
+
+  nbd = nbd_create ();
+  if (nbd == NULL) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  /* Connect first to check if the server supports writes and multi-conn. */
+  if (argc == 2) {
+    if (strstr (argv[1], "://")) {
+      if (nbd_connect_uri (nbd, argv[1]) == -1) {
+        fprintf (stderr, "%s\n", nbd_get_error ());
+        exit (EXIT_FAILURE);
+      }
+    }
+    else if (nbd_connect_unix (nbd, argv[1]) == -1) {
+      fprintf (stderr, "%s\n", nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+  else {
+    if (nbd_connect_tcp (nbd, argv[1], argv[2]) == -1) {
+      fprintf (stderr, "%s\n", nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+
+  exportsize = nbd_get_size (nbd);
+  if (exportsize == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  if (nbd_read_only (nbd) == 1) {
+    fprintf (stderr, "%s: error: this NBD export is read-only\n",
argv[0]);
+    exit (EXIT_FAILURE);
+  }
+
+  if (nbd_can_multi_conn (nbd) == 0) {
+    fprintf (stderr, "%s: error: "
+             "this NBD export does not support multi-conn\n",
argv[0]);
+    exit (EXIT_FAILURE);
+  }
+
+  nbd_close (nbd);
+
+  /* Start the reader threads. */
+  for (i = 0; i < NR_MULTI_CONN; ++i) {
+    reader_status[i].i = i;
+    reader_status[i].argc = argc;
+    reader_status[i].argv = argv;
+    reader_status[i].status = 0;
+    reader_status[i].requests = 0;
+    reader_status[i].most_in_flight = 0;
+    err = pthread_create (&reader_threads[i], NULL,
+                          start_reader_thread, &reader_status[i]);
+    if (err != 0) {
+      errno = err;
+      perror ("pthread_create");
+      exit (EXIT_FAILURE);
+    }
+  }
+
+  /* Wait for the threads to exit. */
+  errors = 0;
+  requests = 0;
+  most_in_flight = 0;
+  for (i = 0; i < NR_MULTI_CONN; ++i) {
+    err = pthread_join (reader_threads[i], NULL);
+    if (err != 0) {
+      errno = err;
+      perror ("pthread_join");
+      exit (EXIT_FAILURE);
+    }
+    if (reader_status[i].status != 0) {
+      fprintf (stderr, "thread %zu failed with status %d\n",
+               i, reader_status[i].status);
+      errors++;
+    }
+    requests += reader_status[i].requests;
+    if (reader_status[i].most_in_flight > most_in_flight)
+      most_in_flight = reader_status[i].most_in_flight;
+  }
+
+  /* Make sure the number of requests that were required matches what
+   * we expect.
+   */
+  assert (requests == NR_MULTI_CONN * NR_CYCLES);
+
+  printf ("most requests seen in flight = %u (per thread) "
+          "vs MAX_IN_FLIGHT = %d\n",
+          most_in_flight, MAX_IN_FLIGHT);
+
+  exit (errors == 0 ? EXIT_SUCCESS : EXIT_FAILURE);
+}
+
+struct queue {
+  struct queue *next;
+  void *buf;
+  size_t len;
+};
+
+/* Concurrent writer thread (one per libnbd handle). */
+struct writer_data {
+  size_t i;                     /* Thread index, 0 .. NR_MULTI_CONN-1 */
+  struct nbd_handle *nbd;       /* NBD handle. */
+  struct queue *q, *q_end;      /* Queue of items to write. */
+  pthread_mutex_t q_lock;       /* Lock on queue. */
+  pthread_cond_t q_cond;        /* Condition on queue. */
+};
+
+static void *start_writer_thread (void *arg);
+static void writer (void *data, const void *buf, size_t len);
+
+static void *
+start_reader_thread (void *arg)
+{
+  struct nbd_handle *nbd;
+  struct pollfd fds[1];
+  struct reader_status *status = arg;
+  struct writer_data writer_data;
+  pthread_t writer_thread;
+  int err;
+  char buf[512];
+  size_t i, j;
+  uint64_t offset, handle;
+  uint64_t handles[MAX_IN_FLIGHT];
+  size_t in_flight;        /* counts number of requests in flight */
+  int dir, r, cmd;
+  bool want_to_send;
+
+  nbd = nbd_create ();
+  if (nbd == NULL) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  if (status->argc == 2) {
+    if (strstr (status->argv[1], "://")) {
+      if (nbd_connect_uri (nbd, status->argv[1]) == -1) {
+        fprintf (stderr, "%s\n", nbd_get_error ());
+        exit (EXIT_FAILURE);
+      }
+    }
+    else if (nbd_connect_unix (nbd, status->argv[1]) == -1) {
+      fprintf (stderr, "%s\n", nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+  else {
+    if (nbd_connect_tcp (nbd, status->argv[1], status->argv[2]) == -1) {
+      fprintf (stderr, "%s\n", nbd_get_error ());
+      exit (EXIT_FAILURE);
+    }
+  }
+
+  for (i = 0; i < sizeof buf; ++i)
+    buf[i] = rand ();
+
+  /* Start the concurrent writer thread, one per handle. */
+  writer_data.i = status->i;
+  writer_data.nbd = nbd;
+  writer_data.q = writer_data.q_end = NULL;
+  pthread_mutex_init (&writer_data.q_lock, NULL);
+
+  err = pthread_create (&writer_thread, NULL,
+                        start_writer_thread, &writer_data);
+  if (err != 0) {
+    errno = err;
+    perror ("pthread_create");
+    exit (EXIT_FAILURE);
+  }
+
+  if (nbd_set_concurrent_writer (nbd, &writer_data, writer) == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  /* Issue commands. */
+  in_flight = 0;
+  i = NR_CYCLES;
+  while (i > 0 || in_flight > 0) {
+    if (nbd_aio_is_dead (nbd) || nbd_aio_is_closed (nbd)) {
+      fprintf (stderr, "thread %zu: connection is dead or closed\n",
+               status->i);
+      goto error;
+    }
+
+    /* Do we want to send another request and there's room to issue it
+     * and the connection is in the READY state so it can be used to
+     * issue a request.
+     */
+    want_to_send +      i > 0 && in_flight < MAX_IN_FLIGHT
&& nbd_aio_is_ready (nbd);
+
+    fds[0].fd = nbd_aio_get_fd (nbd);
+    fds[0].events = want_to_send ? POLLOUT : 0;
+    fds[0].revents = 0;
+
+    dir = nbd_aio_get_direction (nbd);
+    if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) {
+      /* The concurrent writer is always writable, we don't have to
+       * test the socket in poll.  Since calling nbd_aio_notify_write
+       * can change the state, after doing it we must restart the
+       * loop.
+       */
+      nbd_aio_notify_write (nbd);
+      continue;
+    }
+
+    if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0)
+      fds[0].events |= POLLIN;
+
+    if (poll (fds, 1, -1) == -1) {
+      perror ("poll");
+      goto error;
+    }
+
+    if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 &&
+        (fds[0].revents & POLLIN) != 0)
+      nbd_aio_notify_read (nbd);
+
+    /* If we can issue another request, do so.  Note that we reuse the
+     * same buffer for multiple in-flight requests.  It doesn't matter
+     * here because we're just trying to write random stuff, but that
+     * would be Very Bad in a real application.
+     */
+    if (want_to_send && (fds[0].revents & POLLOUT) != 0 &&
+        nbd_aio_is_ready (nbd)) {
+      offset = rand () % (exportsize - sizeof buf);
+      cmd = rand () & 1;
+      if (cmd == 0)
+        handle = nbd_aio_pwrite (nbd, buf, sizeof buf, offset, 0);
+      else
+        handle = nbd_aio_pread (nbd, buf, sizeof buf, offset, 0);
+      if (handle == -1) {
+        fprintf (stderr, "%s\n", nbd_get_error ());
+        goto error;
+      }
+      handles[in_flight] = handle;
+      i--;
+      in_flight++;
+      if (in_flight > status->most_in_flight)
+        status->most_in_flight = in_flight;
+    }
+
+    /* If a command is ready to retire, retire it. */
+    for (j = 0; j < in_flight; ++j) {
+      r = nbd_aio_command_completed (nbd, handles[j]);
+      if (r == -1) {
+        fprintf (stderr, "%s\n", nbd_get_error ());
+        goto error;
+      }
+      if (r) {
+        memmove (&handles[j], &handles[j+1],
+                 sizeof (handles[0]) * (in_flight - j - 1));
+        j--;
+        in_flight--;
+        status->requests++;
+      }
+    }
+  }
+
+  if (nbd_shutdown (nbd) == -1) {
+    fprintf (stderr, "%s\n", nbd_get_error ());
+    exit (EXIT_FAILURE);
+  }
+
+  nbd_close (nbd);
+
+  printf ("thread %zu: finished OK\n", status->i);
+
+  status->status = 0;
+  pthread_exit (status);
+
+ error:
+  fprintf (stderr, "thread %zu: failed\n", status->i);
+  status->status = -1;
+  pthread_exit (status);
+}
+
+/* This runs in the reader thread and enqueues the data which will be
+ * picked up by the writer thread.
+ */
+static void
+writer (void *data, const void *buf, size_t len)
+{
+  struct writer_data *writer_data = data;
+  struct queue *item;
+
+  item = malloc (sizeof *item);
+  if (!item) goto error;
+  item->next = NULL;
+  item->buf = malloc (len);
+  if (item->buf == NULL) {
+    free (item);
+    goto error;
+  }
+  memcpy (item->buf, buf, len);
+  item->len = len;
+
+  /* Enqueue the item and signal the writer thread. */
+  pthread_mutex_lock (&writer_data->q_lock);
+  if (writer_data->q_end == NULL)
+    writer_data->q = writer_data->q_end = item;
+  else {
+    writer_data->q_end->next = item;
+    writer_data->q_end = item;
+  }
+  pthread_cond_signal (&writer_data->q_cond);
+  pthread_mutex_unlock (&writer_data->q_lock);
+  return;
+
+ error:
+  nbd_concurrent_writer_error (writer_data->nbd, errno);
+}
+
+static void *
+start_writer_thread (void *arg)
+{
+  struct writer_data *writer_data = arg;
+  struct nbd_handle *nbd = writer_data->nbd;
+  struct queue *item;
+  int fd;
+  struct pollfd fds[1];
+  ssize_t r;
+  void *p;
+
+  fd = nbd_aio_get_fd (nbd);
+
+  for (;;) {
+    /* Pick next job off the queue. */
+    pthread_mutex_lock (&writer_data->q_lock);
+    while (writer_data->q == NULL)
+      pthread_cond_wait (&writer_data->q_cond,
&writer_data->q_lock);
+    item = writer_data->q;
+    writer_data->q = item->next;
+    if (writer_data->q == NULL)
+      writer_data->q_end = NULL;
+    pthread_mutex_unlock (&writer_data->q_lock);
+
+    p = item->buf;
+    while (item->len > 0) {
+      /* Wait for the socket to become ready to write. */
+      fds[0].fd = fd;
+      fds[0].events = POLLOUT;
+      fds[0].revents = 0;
+
+      if (poll (fds, 1, -1) == -1) goto error;
+
+      r = send (fd, p, item->len, 0);
+      if (r == -1) goto error;
+
+      p += r;
+      item->len -= r;
+    }
+
+    free (item->buf);
+    free (item);
+  }
+
+ error:
+  nbd_concurrent_writer_error (nbd, errno);
+  return NULL;
+}
diff --git a/examples/threaded-reads-and-writes.c
b/examples/threaded-reads-and-writes.c
index 3e3fc32..a92e7b5 100644
--- a/examples/threaded-reads-and-writes.c
+++ b/examples/threaded-reads-and-writes.c
@@ -52,7 +52,7 @@ static int64_t exportsize;
 #define MAX_IN_FLIGHT 16
 
 /* Number of commands we issue (per thread). */
-#define NR_CYCLES 10000
+#define NR_CYCLES 1000000
 
 struct thread_status {
   size_t i;                     /* Thread index, 0 .. NR_MULTI_CONN-1 */
-- 
2.21.0
Eric Blake
2019-Jun-03  16:17 UTC
Re: [Libguestfs] [PATCH libnbd discussion only 2/5] docs: Document multiple in-flight requests.
On 6/3/19 10:29 AM, Richard W.M. Jones wrote:> --- > docs/libnbd.pod | 29 ++++++++++++++++++++++++++++- > 1 file changed, 28 insertions(+), 1 deletion(-) > > diff --git a/docs/libnbd.pod b/docs/libnbd.pod > index f299ef1..7cbb9cd 100644 > --- a/docs/libnbd.pod > +++ b/docs/libnbd.pod > @@ -334,7 +334,30 @@ to prefetch. > > =back > > -=head1 MULTI-CONN > +=head1 PERFORMANCE > + > +=head2 Issuing multiple in-flight requests > + > +NBD servers which properly implement the spec can handle multipleShould we spell this out as 'specification' rather than abbreviating to 'spec'?> +requests in flight over the same connection at the same time. Libnbd > +supports this when using the low level API. To use it you simply > +issue more requests as needed (eg. using calls like C<nbd_aio_pread>, > +C<nbd_aio_pwrite>) without waiting for previous commands to complete. > + > +Each request is identified by a unique 64 bit handle (assigned by > +libnbd), allowing libnbd and callers to match replies to requests. > +Replies may arrive out of order. > + > +Although in theory you can have an indefinite number of requests in > +flight at the same time, in practice it's a good idea to limit them to > +some number. It is suggested to start with a limit of 16 requests in > +flight (per NBD connection), and measure how adjusting the limit up > +and down affects performance for your local configuration.Probably also worth advising that parallel in-flight requests should avoid any request that overlaps a portion of the disk that is already the subject of any other in-flight write-like command (a parallel read may see indeterminate data, and a parallel write may even cause disk corruption where the resulting disk contents do not match either of the two writes).> + > +There is a full example using multiple in-flight requests available at > +L<https://github.com/libguestfs/libnbd/blob/master/examples/threaded-reads-and-writes.c> > + > +=head2 Multi-conn > > Some NBD servers advertise “multi-conn” which means that it is safe to > make multiple connections to the server and load-balance commands > @@ -358,6 +381,10 @@ If multi-conn is supported then you can open further connections: > nbd[i]_connect_tcp (nbd[i], "server", "10809"); > } > > +If you are issuing multiple in-flight requests (see above) and > +limiting the number, then the limit should be applied to each > +individual NBD connection. > + > =head1 ENCRYPTION AND AUTHENTICATION > > The NBD protocol and libnbd supports TLS (sometimes incorrectly called >-- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Eric Blake
2019-Jun-03  16:18 UTC
Re: [Libguestfs] [PATCH libnbd discussion only 1/5] docs: Document NBD URI support.
On 6/3/19 10:29 AM, Richard W.M. Jones wrote:> --- > docs/libnbd.pod | 19 +++++++++++++++++++ > 1 file changed, 19 insertions(+) > > diff --git a/docs/libnbd.pod b/docs/libnbd.pod > index b909833..f299ef1 100644 > --- a/docs/libnbd.pod > +++ b/docs/libnbd.pod > @@ -228,6 +228,8 @@ For C<nbd_connect_tcp> the third parameter is the port name or number, > which can either be a name from F</etc/services> or the port number as > a string (eg. C<"10809">). > > +=head2 Connecting to a subprocess > + > Some NBD servers — notably L<nbdkit(1)> with the C<-s> parameter — can > also accept a single NBD connection on stdin/stdout. You can run > these servers as a subprocess of your main program. This example > @@ -237,6 +239,23 @@ as the libnbd handle is closed: > char *argv[] = { "nbdkit", "-s", "memory", "1G", NULL }; > nbd_connect_command (nbd, argv); > > +=head2 Connecting to a URI > + > +libnbd supports the NBD URI specification. > + > +This specification is currently evolving, and discussion about it can > +be found on the L<NBD mailing list|https://lists.debian.org/nbd/>. A > +final link to the specification will be added to this documentation > +when it is available.Probably ought to mention nbd_supports_uri in this discussion.> + > +To connect to a URI via the high level API, use: > + > + nbd_connect_uri (nbd, "nbd://example.com/"); > + > +This feature is implemented by calling other libnbd APIs to set up the > +export name, TLS parameters, and finally connect over a Unix domain > +socket or TCP. > + > =head1 EXPORTS AND FLAGS > > It is possible for NBD servers to serve different content on different >-- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Eric Blake
2019-Jun-03  16:19 UTC
Re: [Libguestfs] [PATCH libnbd discussion only 3/5] lib: Pass handle to socket recv and send calls.
On 6/3/19 10:29 AM, Richard W.M. Jones wrote:> Just a simple refactoring in preparation for forthcoming work. > --- > generator/states-reply.c | 2 +- > generator/states.c | 4 ++-- > lib/crypto.c | 5 +++-- > lib/internal.h | 6 ++++-- > lib/socket.c | 5 +++-- > 5 files changed, 13 insertions(+), 9 deletions(-) > > diff --git a/generator/states-reply.c b/generator/states-reply.c > index 5be3431..f0ef47c 100644 > --- a/generator/states-reply.c > +++ b/generator/states-reply.c > @@ -36,7 +36,7 @@ > h->rbuf = &h->sbuf; > h->rlen = sizeof h->sbuf.simple_reply; > > - r = h->sock->ops->recv (h->sock, h->rbuf, h->rlen); > + r = h->sock->ops->recv (h, h->sock, h->rbuf, h->rlen);Do we need to pass both h and h->sock, or can the caller access h->sock through h? (Maybe I should read patch 4 first?) Otherwise, 1-3 look fine once cleaned up, and can probably be applied now even if you are still working on 4-5. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Eric Blake
2019-Jun-03  18:27 UTC
Re: [Libguestfs] [PATCH libnbd discussion only 4/5] api: Implement concurrent writer.
On 6/3/19 10:29 AM, Richard W.M. Jones wrote:> --- > docs/libnbd.pod | 73 +++++++++++++++++++++++++++++++++++++++++++++ > generator/generator | 52 +++++++++++++++++++++++++++----- > lib/handle.c | 32 ++++++++++++++++++++ > lib/internal.h | 7 +++++ > lib/socket.c | 22 +++++++++++--- > podwrapper.pl.in | 3 +- > 6 files changed, 177 insertions(+), 12 deletions(-) > > diff --git a/docs/libnbd.pod b/docs/libnbd.pod > index 7cbb9cd..ab74be3 100644 > --- a/docs/libnbd.pod > +++ b/docs/libnbd.pod > @@ -385,6 +385,79 @@ If you are issuing multiple in-flight requests (see above) and > limiting the number, then the limit should be applied to each > individual NBD connection. > > +=head2 Concurrent writer thread > + > +To achieve the maximum possible performance from libnbd and NBD > +servers, as well as the above techniques you must also use a > +concurrent writer thread. This feature allows requests to be issued > +on the NBD socket at the same time that replies are being read from > +the socket. In other words L<send(2)> and L<recv(2)> calls will be > +running at the same time on the same socket.maybe add 'from different threads'> + > +There is a full example using a concurrent writer available at > +L<https://github.com/libguestfs/libnbd/blob/master/examples/concurrent-writer.c> > + > +To implement this, you change your ordinary AIO code in four ways: > + > +=over 4 > + > +=item 1. Call nbd_set_concurrent_writer > + > + struct writer_data { > + struct nbd_handle *nbd; > + /* other data here as required */ > + } data; > + > + nbd_set_concurrent_writer (nbd, &data, writer); > + > +This function can be called on the handle at any time, either after > +the handle is created or after the connection and handshaking has > +completed. > + > +=item 2. Implement non-blocking writer callback > + > +C<writer> is a I<non-blocking> callback which enqueues the buffer into > +a ring or similar FIFO structure: > + > + struct ring_item { > + struct writer_data *data; > + const void *buf; > + size_t len; > + }; > + > + void writer (void *data, const void *buf, size_t len)No return value. Comments below at [1]> + { > + struct ring_item item; > + > + /* add (data, buf, len) to a shared ring */ > + item.data = data; > + item.buf = malloc (len); > + memcpy (item.buf, buf, len); > + item.len = len; > + ring_add (&item); > + > + writer_signal (); /* kick the writer thread */ > + } > + > +=item 3. Implement writer thread > + > +You must also supply another thread which picks up data off the ring > +and writes it to the socket (see C<nbd_aio_get_fd>). If there ans/there/there is/> +error when writing to the socket, call C<nbd_concurrent_writer_error> > +with the C<errno>. > + > +You have a choice of whether to implement one thread per nbd_handle or > +one thread shared between all handles. > + > +=item 4. Modify main loop > + > +Finally your main loop can unconditionally call > +C<nbd_aio_notify_write> when C<nbd_aio_get_direction> returns C<WRITE> > +or C<BOTH> (since the concurrent thread can always enqueue more data > +and so is always "ready to write").Will we ever actually reach a state that is blocked on a write completion for aio_get_direction to ever even request a WRITE or BOTH? Or will the state machine always manage to churn through requests in their entirety and back to state READY which is just a READ state?> + > +=back > + > =head1 ENCRYPTION AND AUTHENTICATION > > The NBD protocol and libnbd supports TLS (sometimes incorrectly called > diff --git a/generator/generator b/generator/generator > index db7c10f..2b48c67 100755 > --- a/generator/generator > +++ b/generator/generator > @@ -1094,6 +1094,35 @@ C<\"qemu:dirty-bitmap:...\"> for qemu-nbd > (see qemu-nbd I<-B> option). See also C<nbd_block_status>."; > }; > > + "set_concurrent_writer", { > + default_call with > + args = [ Opaque "data"; > + CallbackPersist ("writer", [Opaque "data"; > + BytesIn ("buf", "len")]) ]; > + ret = RErr; > + permitted_states = [ Created; Connecting; Connected ]; > + shortdesc = "set a concurrent writer thread"; > + longdesc = "\ > +Provide an optional concurrent writer thread for better performance. > +See L<libnbd(3)/Concurrent writer thread> for how to use this."; > + }; > + > + "concurrent_writer_error", { > + default_call with > + args = [ Int "err" ]; ret = RErr; > + shortdesc = "signal an error from the concurrent writer thread"; > + longdesc = "\ > +This can be called from the concurrent writer thread to signal > +that there was an error writing to the socket. As there is no > +way to recover from such errors, the connection will move to the > +dead state soon after. > + > +The parameter is the C<errno> returned by the failed L<send(2)> call. > +It must be non-zero. > + > +See L<libnbd(3)/Concurrent writer thread> for how to use this."; > + }; > + > "connect_uri", {> > +int > +nbd_unlocked_set_concurrent_writer (struct nbd_handle *h, > + void *data, writer_cb writer) > +{ > + /* I suppose we could allow this, but it seems more likely that > + * it's an error rather than intentional. > + */ > + if (h->writer != NULL) { > + set_error (EINVAL, "concurrent writer was already set for this handle"); > + return -1; > + } > + > + h->writer = writer; > + h->writer_data = data; > + return 0; > +}Is it worth a get_concurrent_writer()? Not strictly necessary, though.> + > +int > +nbd_unlocked_concurrent_writer_error (struct nbd_handle *h, int err) > +{ > + if (err != 0) { > + set_error (EINVAL, "concurrent writer error parameter must be non-zero"); > + return -1; > + } > + > + /* Ignore second and subsequent calls, record only the first error. */ > + if (h->writer_error == 0) > + h->writer_error = err; > + > + return 0; > +} > +> +++ b/lib/socket.c > @@ -46,10 +46,24 @@ socket_send (struct nbd_handle *h, > { > ssize_t r; > > - r = send (sock->u.fd, buf, len, 0); > - if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK) > - set_error (errno, "send"); > - return r; > + if (!h->writer) { > + r = send (sock->u.fd, buf, len, 0); > + if (r == -1 && errno != EAGAIN && errno != EWOULDBLOCK) > + set_error (errno, "send"); > + return r; > + } > + else if (h->writer_error) { > + /* Concurrent writer thread signaled an error earlier, so > + * return that here. > + */ > + set_error (h->writer_error, "concurrent writer thread error"); > + return -1; > + } > + else { > + /* Pass the buffer to the concurrent writer thread. */ > + h->writer (h->writer_data, buf, len); > + return len;[1] So h->writer is NOT allowed to fail directly (if it fails, it must call nbd_concurrent_writer_error instead). Stems from the fact that the generator doesn't allow callbacks with a return type, but livable. But may warrant extra wording in the documentation.> + } > } > > static int > diff --git a/podwrapper.pl.in b/podwrapper.pl.in > index 2471807..ecff2d6 100755 > --- a/podwrapper.pl.in > +++ b/podwrapper.pl.in > @@ -324,7 +324,8 @@ foreach (@lines) { > die "$progname: $input: line too long:\n$_\n" > if length $_ > 76 && > substr ($_, 0, 1) ne ' ' && > - ! m/https?:/; > + ! m/https?:/ && > + ! m/connected and finished handshaking/; > } > > # Output man page. >-- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Eric Blake
2019-Jun-03  18:56 UTC
Re: [Libguestfs] [PATCH libnbd discussion only 5/5] examples: Add concurrent writer example.
On 6/3/19 10:29 AM, Richard W.M. Jones wrote:> --- > .gitignore | 1 + > examples/Makefile.am | 12 + > examples/concurrent-writer.c | 450 +++++++++++++++++++++++++++ > examples/threaded-reads-and-writes.c | 2 +-Obviously one is copied from the other; should the change to a larger NR_CYCLES be pushed separately?> +/* Number of simultaneous connections to the NBD server. The number > + * of threads is NR_MULTI_CONN * 2 because there is one thread reading > + * plus a concurrent writer thread. Note that some servers only > + * support a limited number of simultaneous connections, and/or have a > + * configurable thread pool internally, and if you exceed those limits > + * then something will break. > + */ > +#define NR_MULTI_CONN 8 > +Do things actually break or deadlock, or does it merely reach a point where performance levels off or starts to degrade from contention, but where things still continue to work? If a server only has support for X parallel in-flight commands, it will merely quit read()ing further requests until the earlier responses are flushed back to the client, but since our state machine favors response over requests, about all we can do is run out of memory by queuing up too many requests.> +static void * > +start_reader_thread (void *arg) > +{ > + struct nbd_handle *nbd; > + struct pollfd fds[1]; > + struct reader_status *status = arg; > + struct writer_data writer_data; > + pthread_t writer_thread; > + int err; > + char buf[512];Fixed-size buffer...> + for (i = 0; i < sizeof buf; ++i) > + buf[i] = rand (); > + > + /* Start the concurrent writer thread, one per handle. */ > + writer_data.i = status->i; > + writer_data.nbd = nbd; > + writer_data.q = writer_data.q_end = NULL; > + pthread_mutex_init (&writer_data.q_lock, NULL); > + > + err = pthread_create (&writer_thread, NULL, > + start_writer_thread, &writer_data);The writer thread may or may not stall when it is only ever sending 512 bytes per command. Should we rework this to have a variable-sized buffer (maybe even just a binary choice of small or large, where large is perhaps 256k, as that was the size where I was able to provoke a blocked send() when developing the fix tested in batched-read-write)? It may be that you're not seeing improvements in execution because you haven't actually saturated the line with enough large writes.> + if (err != 0) { > + errno = err; > + perror ("pthread_create"); > + exit (EXIT_FAILURE); > + } > + > + if (nbd_set_concurrent_writer (nbd, &writer_data, writer) == -1) { > + fprintf (stderr, "%s\n", nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + > + /* Issue commands. */ > + in_flight = 0; > + i = NR_CYCLES; > + while (i > 0 || in_flight > 0) { > + if (nbd_aio_is_dead (nbd) || nbd_aio_is_closed (nbd)) { > + fprintf (stderr, "thread %zu: connection is dead or closed\n", > + status->i); > + goto error; > + } > + > + /* Do we want to send another request and there's room to issue it > + * and the connection is in the READY state so it can be used to > + * issue a request. > + */ > + want_to_send > + i > 0 && in_flight < MAX_IN_FLIGHT && nbd_aio_is_ready (nbd);Do we still want to check nbd_aio_is_ready() here, or can we bypass that and just issue our commands right away (since commit 6af72b87 allows for a command queue)?> + > + fds[0].fd = nbd_aio_get_fd (nbd); > + fds[0].events = want_to_send ? POLLOUT : 0;Do we really want to be checking for POLLOUT here, or should we leave that job to the writer thread?> + fds[0].revents = 0; > + > + dir = nbd_aio_get_direction (nbd); > + if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) { > + /* The concurrent writer is always writable, we don't have to > + * test the socket in poll. Since calling nbd_aio_notify_write > + * can change the state, after doing it we must restart the > + * loop. > + */I'm still not convinced this code is even reachable.> + nbd_aio_notify_write (nbd); > + continue; > + } > + > + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0) > + fds[0].events |= POLLIN; > + > + if (poll (fds, 1, -1) == -1) { > + perror ("poll"); > + goto error; > + } > + > + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0 && > + (fds[0].revents & POLLIN) != 0) > + nbd_aio_notify_read (nbd); > + > + /* If we can issue another request, do so. Note that we reuse the > + * same buffer for multiple in-flight requests. It doesn't matter > + * here because we're just trying to write random stuff, but that > + * would be Very Bad in a real application. > + */ > + if (want_to_send && (fds[0].revents & POLLOUT) != 0 && > + nbd_aio_is_ready (nbd)) { > + offset = rand () % (exportsize - sizeof buf); > + cmd = rand () & 1; > + if (cmd == 0) > + handle = nbd_aio_pwrite (nbd, buf, sizeof buf, offset, 0); > + else > + handle = nbd_aio_pread (nbd, buf, sizeof buf, offset, 0);Variable-size packets may make this more interesting or realistic.> + > + nbd_close (nbd); > + > + printf ("thread %zu: finished OK\n", status->i); > + > + status->status = 0; > + pthread_exit (status); > + > + error: > + fprintf (stderr, "thread %zu: failed\n", status->i); > + status->status = -1; > + pthread_exit (status);Should we join() the writer thread?> +} > + > +/* This runs in the reader thread and enqueues the data which will be > + * picked up by the writer thread. > + */ > +static void > +writer (void *data, const void *buf, size_t len) > +{ > + struct writer_data *writer_data = data; > + struct queue *item; > + > + item = malloc (sizeof *item); > + if (!item) goto error; > + item->next = NULL; > + item->buf = malloc (len); > + if (item->buf == NULL) { > + free (item); > + goto error; > + } > + memcpy (item->buf, buf, len); > + item->len = len; > + > + /* Enqueue the item and signal the writer thread. */ > + pthread_mutex_lock (&writer_data->q_lock); > + if (writer_data->q_end == NULL) > + writer_data->q = writer_data->q_end = item; > + else { > + writer_data->q_end->next = item; > + writer_data->q_end = item; > + } > + pthread_cond_signal (&writer_data->q_cond); > + pthread_mutex_unlock (&writer_data->q_lock);Good - you DO need correct locking to feed data to the writer thread in a safe manner (perhaps there is a way to implement this queuing with atomics rather than a full mutex, but the point remains that proper multi-threaded access is a must).> + return; > + > + error: > + nbd_concurrent_writer_error (writer_data->nbd, errno); > +} > + > +static void * > +start_writer_thread (void *arg) > +{ > + struct writer_data *writer_data = arg; > + struct nbd_handle *nbd = writer_data->nbd; > + struct queue *item; > + int fd; > + struct pollfd fds[1]; > + ssize_t r; > + void *p; > + > + fd = nbd_aio_get_fd (nbd); > +No check for errors?> + for (;;) { > + /* Pick next job off the queue. */ > + pthread_mutex_lock (&writer_data->q_lock); > + while (writer_data->q == NULL) > + pthread_cond_wait (&writer_data->q_cond, &writer_data->q_lock); > + item = writer_data->q; > + writer_data->q = item->next; > + if (writer_data->q == NULL) > + writer_data->q_end = NULL; > + pthread_mutex_unlock (&writer_data->q_lock); > + > + p = item->buf; > + while (item->len > 0) { > + /* Wait for the socket to become ready to write. */ > + fds[0].fd = fd; > + fds[0].events = POLLOUT; > + fds[0].revents = 0; > + > + if (poll (fds, 1, -1) == -1) goto error; > + > + r = send (fd, p, item->len, 0); > + if (r == -1) goto error; > + > + p += r; > + item->len -= r; > + } > + > + free (item->buf); > + free (item); > + }No cleanup except on poll()/send() error? Nothing join()s the thread?> + > + error: > + nbd_concurrent_writer_error (nbd, errno); > + return NULL; > +} > diff --git a/examples/threaded-reads-and-writes.c b/examples/threaded-reads-and-writes.c > index 3e3fc32..a92e7b5 100644 > --- a/examples/threaded-reads-and-writes.c > +++ b/examples/threaded-reads-and-writes.c > @@ -52,7 +52,7 @@ static int64_t exportsize; > #define MAX_IN_FLIGHT 16 > > /* Number of commands we issue (per thread). */ > -#define NR_CYCLES 10000 > +#define NR_CYCLES 1000000 > > struct thread_status { > size_t i; /* Thread index, 0 .. NR_MULTI_CONN-1 */ >-- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Apparently Analagous Threads
- [libnbd PATCH v2 1/5] generator: Allow Int in callbacks
- [libnbd PATCH 2/2] RFC: generator: Handle shared callbacks in Python
- [libnbd PATCH 3/6] generator: Allow Int64 in callbacks
- [libnbd PATCH 6/8] states: Add nbd_pread_callback API
- [PATCH libnbd] generator: Add Mutable type to the generator.