Richard W.M. Jones
2019-Jul-17 13:00 UTC
[Libguestfs] [PATCH libnbd v2] examples: Include an example of integrating with glib main loop.
This is working now, and incorporates all of the changes in Eric's review, *except* that it still doesn't retire commands (although this seems to make no obvious difference, except possibly a performance and memory impact). Rich.
Richard W.M. Jones
2019-Jul-17 13:00 UTC
[Libguestfs] [PATCH libnbd v2] examples: Include an example of integrating with the glib main loop.
--- .gitignore | 1 + README | 2 + configure.ac | 9 + examples/Makefile.am | 22 ++ examples/glib-main-loop.c | 511 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 545 insertions(+) diff --git a/.gitignore b/.gitignore index ce02aef..9a8ba37 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,7 @@ Makefile.in /docs/libnbd-api.3 /docs/libnbd-api.pod /examples/batched-read-write +/examples/glib-main-loop /examples/threaded-reads-and-writes /examples/simple-fetch-first-sector /examples/simple-reads-and-writes diff --git a/README b/README index 3045312..443bb99 100644 --- a/README +++ b/README @@ -72,6 +72,8 @@ Optional: * qemu, qemu-io, qemu-img for interoperability testing. + * glib2 for examples that interoperate with the glib main loop. + * OCaml and ocamlfind are both needed to generate the OCaml bindings. * Perl Pod::Man and Pod::Simple to generate the documentation. diff --git a/configure.ac b/configure.ac index 4636c2c..6ed1f4a 100644 --- a/configure.ac +++ b/configure.ac @@ -153,6 +153,15 @@ AM_CONDITIONAL([HAVE_NBD_SERVER], [test "x$NBD_SERVER" != "x"]) AC_CHECK_PROG([QEMU_NBD], [qemu-nbd], [qemu-nbd]) AM_CONDITIONAL([HAVE_QEMU_NBD], [test "x$QEMU_NBD" != "x"]) +dnl glib2 main loop for examples that interoperate with the glib main loop. +PKG_CHECK_MODULES([GLIB], [glib-2.0], [ + AC_SUBST([GLIB_CFLAGS]) + AC_SUBST([GLIB_LIBS]) +],[ + AC_MSG_WARN([glib2 not found, some examples will not be compiled]) +]) +AM_CONDITIONAL([HAVE_GLIB], [test "x$GLIB_LIBS" != "x"]) + dnl Check we have enough to run podwrapper. AC_CHECK_PROG([PERL],[perl],[perl],[no]) AS_IF([test "x$PERL" != "xno"],[ diff --git a/examples/Makefile.am b/examples/Makefile.am index 7560855..de3f090 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -27,6 +27,11 @@ noinst_PROGRAMS = \ strict-structured-reads \ $(NULL) +if HAVE_GLIB +noinst_PROGRAMS += \ + glib-main-loop +endif + simple_fetch_first_sector_SOURCES = \ simple-fetch-first-sector.c \ $(NULL) @@ -93,3 +98,20 @@ batched_read_write_CFLAGS = \ batched_read_write_LDADD = \ $(top_builddir)/lib/libnbd.la \ $(NULL) + +if HAVE_GLIB +glib_main_loop_SOURCES = \ + glib-main-loop.c \ + $(NULL) +glib_main_loop_CPPFLAGS = \ + -I$(top_srcdir)/include \ + $(NULL) +glib_main_loop_CFLAGS = \ + $(WARNINGS_CFLAGS) \ + $(GLIB_CFLAGS) \ + $(NULL) +glib_main_loop_LDADD = \ + $(top_builddir)/lib/libnbd.la \ + $(GLIB_LIBS) \ + $(NULL) +endif diff --git a/examples/glib-main-loop.c b/examples/glib-main-loop.c new file mode 100644 index 0000000..c633c1d --- /dev/null +++ b/examples/glib-main-loop.c @@ -0,0 +1,511 @@ +/* This example shows you how to make libnbd interoperate with the + * glib main loop. For more information about glib main loop see: + * + * https://developer.gnome.org/glib/stable/glib-The-Main-Event-Loop.html + * + * To run it, simply do: + * + * ./examples/glib-main-loop + * + * For debugging, do: + * + * LIBNBD_DEBUG=1 ./examples/glib-main-loop + */ + +#include <stdio.h> +#include <stdlib.h> +#include <stdbool.h> +#include <stdint.h> +#include <inttypes.h> +#include <assert.h> + +#include <libnbd.h> + +#include <glib.h> + +struct NBDSource; +typedef void (*connecting_callback_t) (struct NBDSource *); +typedef void (*connected_callback_t) (struct NBDSource *); + +/* This is the derived GSource type. */ +struct NBDSource { + /* The base type. This MUST be the first element in this struct. */ + GSource source; + + /* The underlying libnbd handle. */ + struct nbd_handle *nbd; + bool debug; /* true if handle has debug set */ + + /* The poll file descriptor. We store the value here as well as in + * the GSource so we can see if it changes. + */ + int fd; + gpointer tag; + + /* You can optionally register callbacks to be called when the + * handle changes state: + * + * connected_callback is called once when the handle moves from + * connecting to connected (ready) state. + */ + connected_callback_t connected_callback; + bool called_connected_callback; + + /* Arbitrary pointer for use by caller. */ + gpointer user_data; +}; + +/* Print debug statements when debugging is set for the handle. */ +#define DEBUG(source, fs, ...) \ + do { \ + if ((source)->debug) \ + fprintf (stderr, "glib: debug: " fs "\n", ## __VA_ARGS__); \ + } while (0) + +/* These are the GSource functions for libnbd handles. */ +static inline int +events_from_nbd (struct nbd_handle *nbd) +{ + int dir = nbd_aio_get_direction (nbd); + int r = 0; + + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0) + r |= G_IO_IN; + if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) + r |= G_IO_OUT; + return r; +} + +static gboolean +prepare (GSource *sp, gint *timeout_) +{ + struct NBDSource *source = (struct NBDSource *) sp; + int new_fd; + int events; + + /* The poll file descriptor can change or become invalid at any + * time. + */ + new_fd = nbd_aio_get_fd (source->nbd); + if (source->fd != new_fd) { + if (source->tag != NULL) { + g_source_remove_unix_fd ((GSource *) source, source->tag); + source->fd = -1; + source->tag = NULL; + } + if (new_fd >= 0) { + source->fd = new_fd; + source->tag = g_source_add_unix_fd ((GSource *) source, new_fd, 0); + } + } + + if (!source->tag) + return FALSE; + + events = events_from_nbd (source->nbd); + g_source_modify_unix_fd ((GSource *) source, source->tag, events); + *timeout_ = -1; + + DEBUG (source, "prepare: events = 0x%x%s%s", + events, + events & G_IO_IN ? " G_IO_IN" : "", + events & G_IO_OUT ? " G_IO_OUT" : ""); + + if (source->connected_callback && + !source->called_connected_callback && + nbd_aio_is_ready (source->nbd)) { + DEBUG (source, "calling connected_callback"); + source->connected_callback (source); + source->called_connected_callback = true; + } + + return FALSE; +} + +static gboolean +check (GSource *sp) +{ + struct NBDSource *source = (struct NBDSource *) sp; + int dir; + int revents; + + if (!source->tag) + return FALSE; + + revents = g_source_query_unix_fd ((GSource *) source, source->tag); + dir = nbd_aio_get_direction (source->nbd); + + DEBUG (source, "check: direction = 0x%x%s%s, revents = 0x%x%s%s", + dir, + dir & LIBNBD_AIO_DIRECTION_READ ? " READ" : "", + dir & LIBNBD_AIO_DIRECTION_WRITE ? " WRITE" : "", + revents, + revents & G_IO_IN ? " G_IO_IN" : "", + revents & G_IO_OUT ? " G_IO_OUT" : ""); + + if ((revents & G_IO_IN) != 0 && (dir & LIBNBD_AIO_DIRECTION_READ) != 0) + return TRUE; + if ((revents & G_IO_OUT) != 0 && (dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) + return TRUE; + + return FALSE; +} + +static gboolean +dispatch (GSource *sp, + GSourceFunc callback, + gpointer user_data) +{ + struct NBDSource *source = (struct NBDSource *) sp; + int revents; + int r; + + revents = g_source_query_unix_fd ((GSource *) source, source->tag); + + DEBUG (source, "dispatch: revents = 0x%x%s%s", + revents, + revents & G_IO_IN ? " G_IO_IN" : "", + revents & G_IO_OUT ? " G_IO_OUT" : ""); + + r = 0; + if ((revents & G_IO_IN) != 0) + r = nbd_aio_notify_read (source->nbd); + else if ((revents & G_IO_OUT) != 0) + r = nbd_aio_notify_write (source->nbd); + + if (r == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + return G_SOURCE_REMOVE; + } + + return G_SOURCE_CONTINUE; +} + +static void +finalize (GSource *sp) +{ + struct NBDSource *source = (struct NBDSource *) sp; + + DEBUG (source, "finalize"); + + nbd_close (source->nbd); +} + +GSourceFuncs nbd_source_funcs = { + .prepare = prepare, + .check = check, + .dispatch = dispatch, + .finalize = finalize, +}; + +/* Create a libnbd GSource from a libnbd handle. + * + * Note that the return value is also a ‘GSource *’, you just have to + * cast the return value if you need a GSource pointer. + */ +static struct NBDSource * +create_libnbd_gsource (struct nbd_handle *nbd) +{ + struct NBDSource *source; + + source + (struct NBDSource *) g_source_new (&nbd_source_funcs, sizeof *source); + source->nbd = nbd; + source->debug = nbd_get_debug (nbd); + source->fd = -1; + + return source; +} + +/*----------------------------------------------------------------------*/ + +/* The rest of this file is an example showing how to use the GSource + * defined above to control two nbdkit subprocesses, copying from one + * to the other in parallel. + */ + +/* Source and destination nbdkit instances. */ +static struct NBDSource *gssrc, *gsdest; + +#define SIZE (1024*1024*1024) + +static const char *src_args[] = { + "nbdkit", "-s", "--exit-with-parent", "-r", "pattern", "size=1G", NULL +}; + +static const char *dest_args[] = { + "nbdkit", "-s", "--exit-with-parent", "memory", "size=1G", NULL +}; + +/* The list of buffers waiting to be written. Note that the source + * server can answer requests out of order so these buffers may not be + * sorted by offset. + */ +#define MAX_BUFFERS 16 +#define BUFFER_SIZE 65536 + +enum buffer_state { + BUFFER_READING, + BUFFER_READ_COMPLETED, + BUFFER_WRITING, +}; + +struct buffer { + uint64_t offset; + /* Note that command cookies are only unique per libnbd handle. + * Since we have two handles but we must look up completed commands + * in the buffer table by cookie we must maintain separate read and + * write cookies. + */ + int64_t rcookie; + int64_t wcookie; + enum buffer_state state; + char *data; +}; + +static struct buffer buffers[MAX_BUFFERS]; +static size_t nr_buffers; + +static bool finished, reader_paused; + +static GMainLoop *loop; + +static void connected (struct NBDSource *source); +static gboolean read_data (gpointer user_data); +static int finished_read (void *vp, int64_t rcookie, int *error); +static gboolean write_data (gpointer user_data); +static int finished_write (void *vp, int64_t wcookie, int *error); + +int +main (int argc, char *argv[]) +{ + struct nbd_handle *src, *dest; + GMainContext *loopctx = NULL; + + /* Create the main loop. */ + loop = g_main_loop_new (loopctx, FALSE); + + /* Create the two NBD handles and nbdkit instances. */ + src = nbd_create (); + if (!src) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + dest = nbd_create (); + if (!dest) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + /* Create the GSource main loop sources from each handle. */ + gssrc = create_libnbd_gsource (src); + gsdest = create_libnbd_gsource (dest); + loopctx = g_main_loop_get_context (loop); + g_source_attach ((GSource *) gssrc, loopctx); + g_source_attach ((GSource *) gsdest, loopctx); + + /* Make sure we get called back when each handle connects. */ + gssrc->connected_callback = connected; + gsdest->connected_callback = connected; + + /* Asynchronously start each handle connecting. */ + if (nbd_aio_connect_command (src, (char **) src_args) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + if (nbd_aio_connect_command (dest, (char **) dest_args) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + /* Run the main loop until quit. */ + g_main_loop_run (loop); + exit (EXIT_SUCCESS); +} + +/* This is called back when either handle becomes connected. By + * counting the number of times this happens (there are two handles) + * we can tell when both handles have finished connecting. + */ +static void +connected (struct NBDSource *source) +{ + static int count = 0; + + count++; + if (count == 2) { + DEBUG (source, "both handles are connected"); + + /* Now that both handles are connected, we can begin copying. + * Register an idle handler that will repeatedly read from the + * source. + */ + g_idle_add (read_data, NULL); + } +} + +/* This idle callback reads data from the source nbdkit until the ring + * is full. + */ +static gboolean +read_data (gpointer user_data) +{ + static uint64_t posn = 0; + const size_t i = nr_buffers; + + if (gssrc == NULL) + return FALSE; + + /* Finished reading from the source nbdkit? */ + if (posn >= SIZE) { + DEBUG (gssrc, "read_data: finished reading from source"); + finished = true; + return FALSE; + } + + /* If too many read requests are in flight, return FALSE so this + * idle callback is unregistered. It will be registered by the + * write callback when nr_buffers decreases. + */ + if (nr_buffers >= MAX_BUFFERS) { + DEBUG (gssrc, "read_data: buffer full, pausing reads from source"); + reader_paused = true; + return FALSE; + } + + /* Begin reading into the new buffer. */ + assert (buffers[i].data == NULL); + buffers[i].data = g_new (char, BUFFER_SIZE); + buffers[i].state = BUFFER_READING; + buffers[i].offset = posn; + nr_buffers++; + posn += BUFFER_SIZE; + + buffers[i].rcookie + nbd_aio_pread_callback (gssrc->nbd, buffers[i].data, + BUFFER_SIZE, buffers[i].offset, + finished_read, NULL, 0); + if (buffers[i].rcookie == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + return TRUE; +} + +/* This callback is called from libnbd when any read command finishes. */ +static int +finished_read (void *vp, int64_t rcookie, int *error) +{ + size_t i; + + if (gssrc == NULL) + return 0; + + DEBUG (gssrc, "finished_read: read completed"); + + /* Find the corresponding buffer and mark it as completed. */ + for (i = 0; i < nr_buffers; ++i) { + if (buffers[i].rcookie == rcookie) + goto found; + } + /* This should never happen. */ + abort (); + + found: + buffers[i].state = BUFFER_READ_COMPLETED; + + /* Create a writer idle handler. */ + g_idle_add (write_data, NULL); + + return 0; +} + +/* This idle callback schedules a write. */ +static gboolean +write_data (gpointer user_data) +{ + size_t i; + + if (gsdest == NULL) + return FALSE; + + /* Find the first read-completed buffer and schedule it to be + * written. + */ + for (i = 0; i < nr_buffers; ++i) { + if (buffers[i].state == BUFFER_READ_COMPLETED) + goto found; + } + /* This should never happen. */ + abort (); + + found: + buffers[i].wcookie + nbd_aio_pwrite_callback (gsdest->nbd, buffers[i].data, + BUFFER_SIZE, buffers[i].offset, + finished_write, NULL, 0); + if (buffers[i].wcookie == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + buffers[i].state = BUFFER_WRITING; + + /* We always unregister this idle handler because the read side + * creates a new idle handler for every buffer that has to be + * written. + */ + return FALSE; +} + +/* This callback is called from libnbd when any write command finishes. */ +static int +finished_write (void *vp, int64_t wcookie, int *error) +{ + size_t i; + + if (gsdest == NULL) + return 0; + + DEBUG (gsdest, "finished_write: write completed"); + + /* Find the corresponding buffer and free it. */ + for (i = 0; i < nr_buffers; ++i) { + if (buffers[i].wcookie == wcookie) + goto found; + } + /* This should never happen. */ + abort (); + + found: + g_free (buffers[i].data); + memmove (&buffers[i], &buffers[i+1], + sizeof (struct buffer) * (nr_buffers-(i+1))); + nr_buffers--; + buffers[nr_buffers].data = NULL; + + /* If the number of buffers was MAX_BUFFERS and has now gone down to + * MAX_BUFFERS-1 then we need to restart the read handler. + */ + if (nr_buffers == MAX_BUFFERS-1 && reader_paused) { + DEBUG (gsdest, "finished_write: restarting reader"); + g_idle_add (read_data, NULL); + reader_paused = false; + } + + /* If the reader has finished and there are no more buffers then we + * have done. + */ + if (finished && nr_buffers == 0) { + DEBUG (gsdest, "finished_write: all finished"); + g_source_remove (g_source_get_id ((GSource *) gssrc)); + g_source_unref ((GSource *) gssrc); + gssrc = NULL; + g_source_remove (g_source_get_id ((GSource *) gsdest)); + g_source_unref ((GSource *) gsdest); + gsdest = NULL; + g_main_loop_quit (loop); + } + + return 0; +} -- 2.22.0
Eric Blake
2019-Jul-17 13:40 UTC
Re: [Libguestfs] [PATCH libnbd v2] examples: Include an example of integrating with the glib main loop.
On 7/17/19 8:00 AM, Richard W.M. Jones wrote:> --- > .gitignore | 1 + > README | 2 + > configure.ac | 9 + > examples/Makefile.am | 22 ++ > examples/glib-main-loop.c | 511 ++++++++++++++++++++++++++++++++++++++ > 5 files changed, 545 insertions(+)Looks good.> > + revents = g_source_query_unix_fd ((GSource *) source, source->tag); > + > + DEBUG (source, "dispatch: revents = 0x%x%s%s", > + revents, > + revents & G_IO_IN ? " G_IO_IN" : "", > + revents & G_IO_OUT ? " G_IO_OUT" : ""); > + > + r = 0; > + if ((revents & G_IO_IN) != 0) > + r = nbd_aio_notify_read (source->nbd); > + else if ((revents & G_IO_OUT) != 0) > + r = nbd_aio_notify_write (source->nbd); > +If revents ever contains G_IO_ERR/G_IO_HUP, and if we ever add nbd_aio_notify_error(), we should inform the state machine about that fd problem. I've mentioned that idea before; I guess it is more of an optimization - we will probably notice the problem eventually if the client ever tries to issue another command or detects EOF while reading a response; but tying up the resource until the next client command vs. identifying the problem immediately may matter to some clients. Also, there may still be a possible race:> +/* This idle callback reads data from the source nbdkit until the ring > + * is full. > + */ > +static gboolean > +read_data (gpointer user_data) > +{ > + static uint64_t posn = 0;> + buffers[i].rcookie > + nbd_aio_pread_callback (gssrc->nbd, buffers[i].data, > + BUFFER_SIZE, buffers[i].offset, > + finished_read, NULL, 0);It may be possible in rare situations that the libnbd state machine can send() the command AND see data ready to recv() (perhaps from a previous command still in flight), where it ends up read()ing until blocking and happens to get the server's reply to this command and fire off the callback, all prior to the nbd_aio_pread_callback() returning. If that ever happens, then buffers[i].rcookie will still be unset at the time the callback fires...> +/* This callback is called from libnbd when any read command finishes. */ > +static int > +finished_read (void *vp, int64_t rcookie, int *error) > +{ > + size_t i; > + > + if (gssrc == NULL) > + return 0; > + > + DEBUG (gssrc, "finished_read: read completed"); > + > + /* Find the corresponding buffer and mark it as completed. */ > + for (i = 0; i < nr_buffers; ++i) { > + if (buffers[i].rcookie == rcookie) > + goto found; > + } > + /* This should never happen. */ > + abort ();...such that we can hit this abort(). (Same for the write callback). But I don't know how likely that scenario is in practice, and don't see it as a reason to defer committing this patch as-is. I don't know if there is any way to rework the state machine to guarantee that a completion callback will not fire during the same aio_FOO_callback that registered the callback (that is, to guarantee that h->lock will be dropped and must be re-obtained prior to firing off a callback), but it's worth thinking about. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Apparently Analagous Threads
- Re: [PATCH libnbd v2] examples: Include an example of integrating with the glib main loop.
- [PATCH libnbd v2] examples: Include an example of integrating with the glib main loop.
- [PATCH libnbd v2] examples: Include an example of integrating with glib main loop.
- [PATCH libnbd] examples: Include an example of integrating with the glib main loop.
- Re: [PATCH libnbd] examples: Include an example of integrating with the glib main loop.