Richard W.M. Jones
2019-Jul-15 18:38 UTC
[Libguestfs] [PATCH libnbd] examples: Include an example of integrating with the glibc main loop.
** NOT WORKING ** This patch shows how to integrate libnbd and the glib main loop. Posted mainly as a point of discussion as it doesn't quite work yet. Rich.
Richard W.M. Jones
2019-Jul-15 18:38 UTC
[Libguestfs] [PATCH libnbd] examples: Include an example of integrating with the glib main loop.
--- .gitignore | 1 + README | 2 + configure.ac | 9 + examples/Makefile.am | 22 ++ examples/glib-main-loop.c | 501 ++++++++++++++++++++++++++++++++++++++ 5 files changed, 535 insertions(+) diff --git a/.gitignore b/.gitignore index edbf941..79b95b5 100644 --- a/.gitignore +++ b/.gitignore @@ -41,6 +41,7 @@ Makefile.in /docs/libnbd-api.3 /docs/libnbd-api.pod /examples/batched-read-write +/examples/glib-main-loop /examples/threaded-reads-and-writes /examples/simple-fetch-first-sector /examples/simple-reads-and-writes diff --git a/README b/README index 3045312..443bb99 100644 --- a/README +++ b/README @@ -72,6 +72,8 @@ Optional: * qemu, qemu-io, qemu-img for interoperability testing. + * glib2 for examples that interoperate with the glib main loop. + * OCaml and ocamlfind are both needed to generate the OCaml bindings. * Perl Pod::Man and Pod::Simple to generate the documentation. diff --git a/configure.ac b/configure.ac index 9c706c9..ccb19b5 100644 --- a/configure.ac +++ b/configure.ac @@ -153,6 +153,15 @@ AM_CONDITIONAL([HAVE_NBD_SERVER], [test "x$NBD_SERVER" != "x"]) AC_CHECK_PROG([QEMU_NBD], [qemu-nbd], [qemu-nbd]) AM_CONDITIONAL([HAVE_QEMU_NBD], [test "x$QEMU_NBD" != "x"]) +dnl glib2 main loop for examples that interoperate with the glib main loop. +PKG_CHECK_MODULES([GLIB], [glib-2.0], [ + AC_SUBST([GLIB_CFLAGS]) + AC_SUBST([GLIB_LIBS]) +],[ + AC_MSG_WARN([glib2 not found, some examples will not be compiled]) +]) +AM_CONDITIONAL([HAVE_GLIB], [test "x$GLIB_LIBS" != "x"]) + dnl Check we have enough to run podwrapper. AC_CHECK_PROG([PERL],[perl],[perl],[no]) AS_IF([test "x$PERL" != "xno"],[ diff --git a/examples/Makefile.am b/examples/Makefile.am index 7560855..de3f090 100644 --- a/examples/Makefile.am +++ b/examples/Makefile.am @@ -27,6 +27,11 @@ noinst_PROGRAMS = \ strict-structured-reads \ $(NULL) +if HAVE_GLIB +noinst_PROGRAMS += \ + glib-main-loop +endif + simple_fetch_first_sector_SOURCES = \ simple-fetch-first-sector.c \ $(NULL) @@ -93,3 +98,20 @@ batched_read_write_CFLAGS = \ batched_read_write_LDADD = \ $(top_builddir)/lib/libnbd.la \ $(NULL) + +if HAVE_GLIB +glib_main_loop_SOURCES = \ + glib-main-loop.c \ + $(NULL) +glib_main_loop_CPPFLAGS = \ + -I$(top_srcdir)/include \ + $(NULL) +glib_main_loop_CFLAGS = \ + $(WARNINGS_CFLAGS) \ + $(GLIB_CFLAGS) \ + $(NULL) +glib_main_loop_LDADD = \ + $(top_builddir)/lib/libnbd.la \ + $(GLIB_LIBS) \ + $(NULL) +endif diff --git a/examples/glib-main-loop.c b/examples/glib-main-loop.c new file mode 100644 index 0000000..dea4666 --- /dev/null +++ b/examples/glib-main-loop.c @@ -0,0 +1,501 @@ +/* This example shows you how to make libnbd interoperate with the + * glib main loop. For more information about glib main loop see: + * + * https://developer.gnome.org/glib/stable/glib-The-Main-Event-Loop.html + * + * To run it, simply do: + * + * ./examples/glib-main-loop + * + * For debugging, do: + * + * LIBNBD_DEBUG=1 ./examples/glib-main-loop + */ + +#include <stdio.h> +#include <stdlib.h> +#include <stdbool.h> +#include <stdint.h> +#include <assert.h> + +#include <libnbd.h> + +#include <glib.h> + +struct NBDSource; +typedef void (*connecting_callback_t) (struct NBDSource *); +typedef void (*connected_callback_t) (struct NBDSource *); + +/* This is the derived GSource type. */ +struct NBDSource { + /* The base type. This MUST be the first element in this struct. */ + GSource source; + + /* The underlying libnbd handle. */ + struct nbd_handle *nbd; + bool debug; /* true if handle has debug set */ + + /* The poll file descriptor, only valid when poll_registered == true. */ + bool poll_registered; + GPollFD pollfd; + + /* You can optionally register callbacks to be called when the + * handle changes state: + * + * connecting_callback is called once when the handle moves from + * created to connecting state. + * + * connected_callback is called once when the handle moves from + * connecting to connected (ready) state. + */ + connecting_callback_t connecting_callback; + connected_callback_t connected_callback; + bool called_connected_callback; + + /* Arbitrary pointer for use by caller. */ + gpointer user_data; +}; + +/* Print debug statements when debugging is set for the handle. */ +#define DEBUG(source, fs, ...) \ + do { \ + if ((source)->debug) \ + fprintf (stderr, "glib: debug: " fs "\n", ## __VA_ARGS__); \ + } while (0) + +/* These are the GSource functions for libnbd handles. */ +static inline int +events_from_nbd (struct nbd_handle *nbd) +{ + int dir = nbd_aio_get_direction (nbd); + int r = 0; + + if ((dir & LIBNBD_AIO_DIRECTION_READ) != 0) + r |= G_IO_IN; + if ((dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) + r |= G_IO_OUT; + return r; +} + +static gboolean +prepare (GSource *sp, gint *timeout_) +{ + struct NBDSource *source = (struct NBDSource *) sp; + + /* When the NBD handle moves out of the created state (which means + * that it first has a socket associated with it) we must initialize + * and register the pollfd. + */ + if (!source->poll_registered && !nbd_aio_is_created (source->nbd)) { + int fd; + + if (source->connecting_callback) { + DEBUG (source, "calling connecting_callback"); + source->connecting_callback (source); + } + + fd = nbd_aio_get_fd (source->nbd); + assert (fd >= 0); + + source->pollfd.fd = fd; + source->pollfd.events = events_from_nbd (source->nbd); + g_source_add_poll ((GSource *)source, &source->pollfd); + + DEBUG (source, "registered pollfd"); + + source->poll_registered = true; + } + + if (!source->poll_registered) + return FALSE; + + if (source->connected_callback && + !source->called_connected_callback && + nbd_aio_is_ready (source->nbd)) { + DEBUG (source, "calling connected_callback"); + source->connected_callback (source); + source->called_connected_callback = true; + } + + source->pollfd.events = events_from_nbd (source->nbd); + *timeout_ = -1; + + DEBUG (source, "prepare: events = 0x%x%s%s", + source->pollfd.events, + source->pollfd.events & G_IO_IN ? " G_IO_IN" : "", + source->pollfd.events & G_IO_OUT ? " G_IO_OUT" : ""); + + return FALSE; +} + +static gboolean +check (GSource *sp) +{ + struct NBDSource *source = (struct NBDSource *) sp; + int dir; + + if (!source->poll_registered) + return FALSE; + + dir = nbd_aio_get_direction (source->nbd); + + DEBUG (source, "check: direction = 0x%x%s%s, revents = 0x%x%s%s", + dir, + dir & LIBNBD_AIO_DIRECTION_READ ? " READ" : "", + dir & LIBNBD_AIO_DIRECTION_WRITE ? " WRITE" : "", + source->pollfd.revents, + source->pollfd.revents & G_IO_IN ? " G_IO_IN" : "", + source->pollfd.revents & G_IO_OUT ? " G_IO_OUT" : ""); + + if ((source->pollfd.revents & G_IO_IN) != 0 && + (dir & LIBNBD_AIO_DIRECTION_READ) != 0) + return TRUE; + if ((source->pollfd.revents & G_IO_OUT) != 0 && + (dir & LIBNBD_AIO_DIRECTION_WRITE) != 0) + return TRUE; + + return FALSE; +} + +static gboolean +dispatch (GSource *sp, + GSourceFunc callback, + gpointer user_data) +{ + struct NBDSource *source = (struct NBDSource *) sp; + + DEBUG (source, "dispatch: revents = 0x%x%s%s", + source->pollfd.revents, + source->pollfd.revents & G_IO_IN ? " G_IO_IN" : "", + source->pollfd.revents & G_IO_OUT ? " G_IO_OUT" : ""); + + if ((source->pollfd.revents & G_IO_IN) != 0) + nbd_aio_notify_read (source->nbd); + else if ((source->pollfd.revents & G_IO_OUT) != 0) + nbd_aio_notify_write (source->nbd); + + return TRUE; +} + +static void +finalize (GSource *sp) +{ + struct NBDSource *source = (struct NBDSource *) sp; + + DEBUG (source, "finalize"); + + nbd_close (source->nbd); +} + +GSourceFuncs nbd_source_funcs = { + .prepare = prepare, + .check = check, + .dispatch = dispatch, + .finalize = finalize, +}; + +/* Create a libnbd GSource from a libnbd handle. + * + * Note that the return value is also a ‘GSource *’, you just have to + * cast the return value if you need a GSource pointer. + */ +static struct NBDSource * +create_libnbd_gsource (struct nbd_handle *nbd) +{ + struct NBDSource *source; + + source + (struct NBDSource *) g_source_new (&nbd_source_funcs, sizeof *source); + source->nbd = nbd; + source->debug = nbd_get_debug (nbd); + source->poll_registered = false; + + return source; +} + +/*----------------------------------------------------------------------*/ + +/* The rest of this file is an example showing how to use the GSource + * defined above to control two nbdkit subprocesses, copying from one + * to the other in parallel. + */ + +/* Source and destination nbdkit instances. */ +static struct NBDSource *gssrc, *gsdest; + +#define SIZE (1024*1024*1024) + +static const char *src_args[] = { + "nbdkit", "-s", "-r", "pattern", "size=1G", NULL +}; + +static const char *dest_args[] = { + "nbdkit", "-s", "memory", "size=1G", NULL +}; + +/* The list of buffers waiting to be written. Note that the source + * server can answer requests out of order so these buffers may not be + * sorted by offset. + */ +#define MAX_BUFFERS 16 +#define BUFFER_SIZE 65536 + +enum buffer_state { + BUFFER_READING, + BUFFER_READ_COMPLETED, + BUFFER_WRITING, +}; + +struct buffer { + uint64_t offset; + int64_t cookie; + enum buffer_state state; + char *data; +}; + +static struct buffer buffers[MAX_BUFFERS]; +static size_t nr_buffers; + +static bool finished, reader_paused; + +static GMainLoop *loop; + +static void connected (struct NBDSource *source); +static gboolean read_data (gpointer user_data); +static int finished_read (void *vp, int64_t cookie, int *error); +static gboolean write_data (gpointer user_data); +static int finished_write (void *vp, int64_t cookie, int *error); + +int +main (int argc, char *argv[]) +{ + struct nbd_handle *src, *dest; + GMainContext *loopctx = NULL; + + /* Create the main loop. */ + loop = g_main_loop_new (loopctx, FALSE); + + /* Create the two NBD handles and nbdkit instances. */ + src = nbd_create (); + if (!src) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + dest = nbd_create (); + if (!dest) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + /* Create the GSource main loop sources from each handle. */ + gssrc = create_libnbd_gsource (src); + gsdest = create_libnbd_gsource (dest); + loopctx = g_main_loop_get_context (loop); + g_source_attach ((GSource *) gssrc, loopctx); + g_source_attach ((GSource *) gsdest, loopctx); + + /* Make sure we get called back when each handle connects. */ + gssrc->connected_callback = connected; + gsdest->connected_callback = connected; + + /* Asynchronously start each handle connecting. */ + if (nbd_aio_connect_command (src, (char **) src_args) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + if (nbd_aio_connect_command (dest, (char **) dest_args) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + /* Run the main loop until quit. */ + g_main_loop_run (loop); + exit (EXIT_SUCCESS); +} + +/* This is called back when either handle becomes connected. By + * counting the number of times this happens (there are two handles) + * we can tell when both handles have finished connecting. + */ +static void +connected (struct NBDSource *source) +{ + static int count = 0; + + count++; + if (count == 2) { + DEBUG (source, "both handles are connected"); + + /* Now that both handles are connected, we can begin copying. + * Register an idle handler that will repeatedly read from the + * source. + */ + g_idle_add (read_data, NULL); + } +} + +/* This idle callback reads data from the source nbdkit until the ring + * is full. + */ +static gboolean +read_data (gpointer user_data) +{ + static uint64_t posn = 0; + const size_t i = nr_buffers; + + if (gssrc == NULL) + return FALSE; + + /* Finished reading from the source nbdkit? */ + if (posn >= SIZE) { + DEBUG (gssrc, "read_data: finished reading from source"); + finished = true; + return FALSE; + } + + /* If too many read requests are in flight, return FALSE so this + * idle callback is unregistered. It will be registered by the + * write callback when nr_buffers decreases. + */ + if (nr_buffers >= MAX_BUFFERS) { + DEBUG (gssrc, "read_data: buffer full, pausing reads from source"); + reader_paused = true; + return FALSE; + } + + /* Begin reading into the new buffer. */ + assert (buffers[i].data == NULL); + buffers[i].data = g_new (char, BUFFER_SIZE); + buffers[i].state = BUFFER_READING; + buffers[i].offset = posn; + nr_buffers++; + posn += BUFFER_SIZE; + + buffers[i].cookie + nbd_aio_pread_callback (gssrc->nbd, buffers[i].data, + BUFFER_SIZE, buffers[i].offset, + NULL, finished_read, 0); + if (buffers[i].cookie == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + return TRUE; +} + +/* This callback is called from libnbd when any read command finishes. */ +static int +finished_read (void *vp, int64_t cookie, int *error) +{ + size_t i; + + if (gssrc == NULL) + return 0; + + DEBUG (gssrc, "finished_read: read completed"); + + /* Find the corresponding buffer and mark it as completed. */ + for (i = 0; i < nr_buffers; ++i) { + if (buffers[i].cookie == cookie) + goto found; + } + /* This should never happen. */ + abort (); + + found: + buffers[i].state = BUFFER_READ_COMPLETED; + + /* Create a writer idle handler. */ + g_idle_add (write_data, NULL); + + return 0; +} + +/* This idle callback schedules a write. */ +static gboolean +write_data (gpointer user_data) +{ + size_t i; + + if (gsdest == NULL) + return FALSE; + + /* Find the first read-completed buffer and schedule it to be + * written. + */ + for (i = 0; i < nr_buffers; ++i) { + if (buffers[i].state == BUFFER_READ_COMPLETED) + goto found; + } + goto out; + + found: + buffers[i].cookie + nbd_aio_pwrite_callback (gsdest->nbd, buffers[i].data, + BUFFER_SIZE, buffers[i].offset, + NULL, finished_write, 0); + if (buffers[i].cookie == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + buffers[i].state = BUFFER_WRITING; + + out: + /* We always unregister this idle handler because the read side + * creates a new idle handler for every buffer that has to be + * written. + */ + return FALSE; +} + +/* This callback is called from libnbd when any write command finishes. */ +static int +finished_write (void *vp, int64_t cookie, int *error) +{ + size_t i; + + if (gsdest == NULL) + return 0; + + DEBUG (gsdest, "finished_write: write completed"); + + /* Find the corresponding buffer and free it. */ + for (i = 0; i < nr_buffers; ++i) { + if (buffers[i].cookie == cookie) + goto found; + } + /* This should never happen. */ + abort (); + + found: + g_free (buffers[i].data); + memmove (&buffers[i], &buffers[i+1], + sizeof (struct buffer) * (nr_buffers-(i+1))); + nr_buffers--; + buffers[nr_buffers].data = NULL; + + /* If the number of buffers was MAX_BUFFERS and has now gone down to + * MAX_BUFFERS-1 then we need to restart the read handler. + */ + if (nr_buffers == MAX_BUFFERS-1 && reader_paused) { + DEBUG (gsdest, "finished_write: restarting reader"); + g_idle_add (read_data, NULL); + reader_paused = false; + } + + /* If the reader has finished and there are no more buffers then we + * have done. + */ + if (finished && nr_buffers == 0) { + DEBUG (gsdest, "finished_write: all finished"); + g_source_remove (g_source_get_id ((GSource *) gssrc)); + g_source_unref ((GSource *) gssrc); + gssrc = NULL; + g_source_remove (g_source_get_id ((GSource *) gsdest)); + g_source_unref ((GSource *) gsdest); + gsdest = NULL; + g_main_loop_quit (loop); + } + + return 0; +} -- 2.22.0
Eric Blake
2019-Jul-17 03:24 UTC
Re: [Libguestfs] [PATCH libnbd] examples: Include an example of integrating with the glib main loop.
On 7/15/19 1:38 PM, Richard W.M. Jones wrote:> --- > .gitignore | 1 + > README | 2 + > configure.ac | 9 + > examples/Makefile.am | 22 ++ > examples/glib-main-loop.c | 501 ++++++++++++++++++++++++++++++++++++++ > 5 files changed, 535 insertions(+) >Rough review (since I've never used a glib main loop before)...> +static gboolean > +prepare (GSource *sp, gint *timeout_) > +{ > + struct NBDSource *source = (struct NBDSource *) sp; > + > + /* When the NBD handle moves out of the created state (which means > + * that it first has a socket associated with it) we must initialize > + * and register the pollfd. > + */ > + if (!source->poll_registered && !nbd_aio_is_created (source->nbd)) { > + int fd; > + > + if (source->connecting_callback) { > + DEBUG (source, "calling connecting_callback"); > + source->connecting_callback (source); > + }If I understand nbd_aio_connect_tcp properly, there are cases where we end up trying several options presented from getaddrinfo() with a possible reset back to START, at least until we get an fd that sticks. Does this need to take into account that an fd might change while still trying to do the initial connect?> + > + fd = nbd_aio_get_fd (source->nbd); > + assert (fd >= 0); > + > + source->pollfd.fd = fd; > + source->pollfd.events = events_from_nbd (source->nbd); > + g_source_add_poll ((GSource *)source, &source->pollfd);https://developer.gnome.org/glib/stable/glib-The-Main-Event-Loop.html#g-source-add-poll states that g_source_add_unix_fd is preferred these days; do you have a minimum glib version in mind for targetting?> +static gboolean > +dispatch (GSource *sp, > + GSourceFunc callback, > + gpointer user_data) > +{ > + struct NBDSource *source = (struct NBDSource *) sp; > + > + DEBUG (source, "dispatch: revents = 0x%x%s%s", > + source->pollfd.revents, > + source->pollfd.revents & G_IO_IN ? " G_IO_IN" : "", > + source->pollfd.revents & G_IO_OUT ? " G_IO_OUT" : ""); > + > + if ((source->pollfd.revents & G_IO_IN) != 0) > + nbd_aio_notify_read (source->nbd); > + else if ((source->pollfd.revents & G_IO_OUT) != 0) > + nbd_aio_notify_write (source->nbd);No error checking?> + > + return TRUE;https://developer.gnome.org/glib/stable/glib-The-Main-Event-Loop.html#GSourceFuncs says dispatch() should return G_SOURCE_CONTINUE (which I assume is an alias of TRUE?). Should this ever return G_SOURCE_REMOVE (perhaps when we detect that the connection has moved to CLOSED)?> +} > + > +static void > +finalize (GSource *sp) > +{ > + struct NBDSource *source = (struct NBDSource *) sp; > + > + DEBUG (source, "finalize"); > + > + nbd_close (source->nbd); > +} > + > +GSourceFuncs nbd_source_funcs = { > + .prepare = prepare, > + .check = check, > + .dispatch = dispatch, > + .finalize = finalize, > +}; > + > +/* Create a libnbd GSource from a libnbd handle. > + * > + * Note that the return value is also a ‘GSource *’, you just have to > + * cast the return value if you need a GSource pointer. > + */ > +static struct NBDSource * > +create_libnbd_gsource (struct nbd_handle *nbd) > +{ > + struct NBDSource *source; > + > + source > + (struct NBDSource *) g_source_new (&nbd_source_funcs, sizeof *source); > + source->nbd = nbd; > + source->debug = nbd_get_debug (nbd); > + source->poll_registered = false; > + > + return source; > +} > + > +/*----------------------------------------------------------------------*/ > + > +/* The rest of this file is an example showing how to use the GSource > + * defined above to control two nbdkit subprocesses, copying from one > + * to the other in parallel. > + */ > + > +/* Source and destination nbdkit instances. */ > +static struct NBDSource *gssrc, *gsdest; > + > +#define SIZE (1024*1024*1024) > + > +static const char *src_args[] = { > + "nbdkit", "-s", "-r", "pattern", "size=1G", NULL > +}; > + > +static const char *dest_args[] = { > + "nbdkit", "-s", "memory", "size=1G", NULL > +};Do you want --exit-with-parent?> + > +/* The list of buffers waiting to be written. Note that the source > + * server can answer requests out of order so these buffers may not be > + * sorted by offset. > + */ > +#define MAX_BUFFERS 16 > +#define BUFFER_SIZE 65536 > + > +enum buffer_state { > + BUFFER_READING, > + BUFFER_READ_COMPLETED, > + BUFFER_WRITING, > +}; > + > +struct buffer { > + uint64_t offset; > + int64_t cookie; > + enum buffer_state state; > + char *data; > +}; > + > +static struct buffer buffers[MAX_BUFFERS]; > +static size_t nr_buffers; > + > +static bool finished, reader_paused; > + > +static GMainLoop *loop; > + > +static void connected (struct NBDSource *source); > +static gboolean read_data (gpointer user_data); > +static int finished_read (void *vp, int64_t cookie, int *error); > +static gboolean write_data (gpointer user_data); > +static int finished_write (void *vp, int64_t cookie, int *error); > + > +int > +main (int argc, char *argv[]) > +{ > + struct nbd_handle *src, *dest; > + GMainContext *loopctx = NULL; > + > + /* Create the main loop. */ > + loop = g_main_loop_new (loopctx, FALSE); > + > + /* Create the two NBD handles and nbdkit instances. */ > + src = nbd_create (); > + if (!src) { > + fprintf (stderr, "%s\n", nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + dest = nbd_create (); > + if (!dest) { > + fprintf (stderr, "%s\n", nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + > + /* Create the GSource main loop sources from each handle. */ > + gssrc = create_libnbd_gsource (src); > + gsdest = create_libnbd_gsource (dest); > + loopctx = g_main_loop_get_context (loop); > + g_source_attach ((GSource *) gssrc, loopctx); > + g_source_attach ((GSource *) gsdest, loopctx); > + > + /* Make sure we get called back when each handle connects. */ > + gssrc->connected_callback = connected; > + gsdest->connected_callback = connected; > + > + /* Asynchronously start each handle connecting. */ > + if (nbd_aio_connect_command (src, (char **) src_args) == -1) { > + fprintf (stderr, "%s\n", nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + if (nbd_aio_connect_command (dest, (char **) dest_args) == -1) { > + fprintf (stderr, "%s\n", nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + > + /* Run the main loop until quit. */ > + g_main_loop_run (loop); > + exit (EXIT_SUCCESS);I guess the condition for causing the main loop to run is later on in one of the callbacks?> +} > + > +/* This is called back when either handle becomes connected. By > + * counting the number of times this happens (there are two handles) > + * we can tell when both handles have finished connecting. > + */ > +static void > +connected (struct NBDSource *source) > +{ > + static int count = 0; > + > + count++; > + if (count == 2) { > + DEBUG (source, "both handles are connected"); > + > + /* Now that both handles are connected, we can begin copying. > + * Register an idle handler that will repeatedly read from the > + * source. > + */ > + g_idle_add (read_data, NULL); > + } > +} > + > +/* This idle callback reads data from the source nbdkit until the ring > + * is full. > + */ > +static gboolean > +read_data (gpointer user_data) > +{ > + static uint64_t posn = 0; > + const size_t i = nr_buffers; > + > + if (gssrc == NULL) > + return FALSE; > + > + /* Finished reading from the source nbdkit? */ > + if (posn >= SIZE) { > + DEBUG (gssrc, "read_data: finished reading from source"); > + finished = true; > + return FALSE; > + } > + > + /* If too many read requests are in flight, return FALSE so this > + * idle callback is unregistered. It will be registered by the > + * write callback when nr_buffers decreases. > + */ > + if (nr_buffers >= MAX_BUFFERS) { > + DEBUG (gssrc, "read_data: buffer full, pausing reads from source"); > + reader_paused = true; > + return FALSE; > + } > + > + /* Begin reading into the new buffer. */ > + assert (buffers[i].data == NULL); > + buffers[i].data = g_new (char, BUFFER_SIZE); > + buffers[i].state = BUFFER_READING; > + buffers[i].offset = posn; > + nr_buffers++; > + posn += BUFFER_SIZE; > + > + buffers[i].cookie > + nbd_aio_pread_callback (gssrc->nbd, buffers[i].data, > + BUFFER_SIZE, buffers[i].offset, > + NULL, finished_read, 0);Of course, if you like my parameter reordering patch for 0.1.8, this gets a tweak.> + if (buffers[i].cookie == -1) { > + fprintf (stderr, "%s\n", nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + > + return TRUE; > +} > + > +/* This callback is called from libnbd when any read command finishes. */ > +static int > +finished_read (void *vp, int64_t cookie, int *error) > +{ > + size_t i; > + > + if (gssrc == NULL) > + return 0; > + > + DEBUG (gssrc, "finished_read: read completed"); > + > + /* Find the corresponding buffer and mark it as completed. */ > + for (i = 0; i < nr_buffers; ++i) { > + if (buffers[i].cookie == cookie) > + goto found; > + } > + /* This should never happen. */ > + abort (); > + > + found: > + buffers[i].state = BUFFER_READ_COMPLETED;You asked on IRC if we should change the semantics of the *_callback variants to auto-retire the command (right now, the commands are not retired until nbd_aio_command_completed, but you can't call that from this callback because of the nbd handle lock - and calling it shouldn't reveal any more information than what you already have access to here). That may be a good idea to play with.> + > + /* Create a writer idle handler. */ > + g_idle_add (write_data, NULL); > + > + return 0; > +} > + > +/* This idle callback schedules a write. */ > +static gboolean > +write_data (gpointer user_data) > +{ > + size_t i; > + > + if (gsdest == NULL) > + return FALSE; > + > + /* Find the first read-completed buffer and schedule it to be > + * written. > + */ > + for (i = 0; i < nr_buffers; ++i) { > + if (buffers[i].state == BUFFER_READ_COMPLETED) > + goto found; > + } > + goto out; > + > + found: > + buffers[i].cookie > + nbd_aio_pwrite_callback (gsdest->nbd, buffers[i].data, > + BUFFER_SIZE, buffers[i].offset, > + NULL, finished_write, 0); > + if (buffers[i].cookie == -1) { > + fprintf (stderr, "%s\n", nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + buffers[i].state = BUFFER_WRITING; > + > + out: > + /* We always unregister this idle handler because the read side > + * creates a new idle handler for every buffer that has to be > + * written. > + */ > + return FALSE; > +} > + > +/* This callback is called from libnbd when any write command finishes. */ > +static int > +finished_write (void *vp, int64_t cookie, int *error) > +{ > + size_t i; > + > + if (gsdest == NULL) > + return 0; > + > + DEBUG (gsdest, "finished_write: write completed"); > + > + /* Find the corresponding buffer and free it. */ > + for (i = 0; i < nr_buffers; ++i) { > + if (buffers[i].cookie == cookie) > + goto found; > + } > + /* This should never happen. */ > + abort (); > + > + found: > + g_free (buffers[i].data); > + memmove (&buffers[i], &buffers[i+1], > + sizeof (struct buffer) * (nr_buffers-(i+1))); > + nr_buffers--; > + buffers[nr_buffers].data = NULL; > + > + /* If the number of buffers was MAX_BUFFERS and has now gone down to > + * MAX_BUFFERS-1 then we need to restart the read handler. > + */ > + if (nr_buffers == MAX_BUFFERS-1 && reader_paused) { > + DEBUG (gsdest, "finished_write: restarting reader"); > + g_idle_add (read_data, NULL); > + reader_paused = false; > + } > + > + /* If the reader has finished and there are no more buffers then we > + * have done. > + */ > + if (finished && nr_buffers == 0) { > + DEBUG (gsdest, "finished_write: all finished"); > + g_source_remove (g_source_get_id ((GSource *) gssrc)); > + g_source_unref ((GSource *) gssrc); > + gssrc = NULL; > + g_source_remove (g_source_get_id ((GSource *) gsdest)); > + g_source_unref ((GSource *) gsdest); > + gsdest = NULL; > + g_main_loop_quit (loop); > + } > + > + return 0; > +} >-- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Seemingly Similar Threads
- [PATCH libnbd] examples: Include an example of integrating with the glib main loop.
- [PATCH libnbd v2] examples: Include an example of integrating with the glib main loop.
- [PATCH libnbd] examples: Fix theoretical cookie race in example.
- [PATCH libnbd] examples: Include an example of integrating with the glibc main loop.
- [PATCH libnbd v2] examples: Include an example of integrating with glib main loop.