Nir Soffer
2022-Feb-20 12:14 UTC
[Libguestfs] [PATCH libnbd 7/8] copy: Track worker queue size
Tracking the number of queued bytes per worker will allow optimizing the number of in flight requests based on the actual requests size. The goal is to allow large number of small requests, required to get good performance, and in the same time limit the number of large requests, that can be faster with lower number of requests. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- copy/multi-thread-copying.c | 33 +++++++++++++++++++++++++++++++++ copy/nbdcopy.h | 6 ++++++ 2 files changed, 39 insertions(+) diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index 8ba721fe..620dc571 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -133,20 +133,45 @@ multi_thread_copying (void) static void wait_for_request_slots (size_t index); static unsigned in_flight (size_t index); static void poll_both_ends (size_t index); static int finished_read (void *vp, int *error); static int finished_command (void *vp, int *error); static void free_command (struct command *command); static void fill_dst_range_with_zeroes (struct command *command); static struct command *create_command (uint64_t offset, size_t len, bool zero, struct worker *worker); +/* Tracking worker queue size. + * + * The queue size is increased when starting a read command. + * + * The queue size is decreased when a read command is converted to zero + * subcommand in finished_read(), or when a write command completes in + * finished_command(). + * + * Zero commands are not considered in the queue size since they have no + * payload. + */ + +static inline void +increase_queue_size(struct worker *worker, size_t len) +{ + worker->queue_size += len; +} + +static inline void +decrease_queue_size(struct worker *worker, size_t len) +{ + assert (worker->queue_size >= len); + worker->queue_size -= len; +} + /* There are 'threads' worker threads, each copying work ranges from * src to dst until there are no more work ranges. */ static void * worker_thread (void *wp) { struct worker *w = wp; uint64_t offset, count; extent_list exts = empty_vector; @@ -180,20 +205,23 @@ worker_thread (void *wp) while (exts.ptr[i].length > 0) { len = exts.ptr[i].length; if (len > request_size) len = request_size; command = create_command (exts.ptr[i].offset, len, false, w); wait_for_request_slots (w->index); + /* NOTE: Must increase the queue size after waiting. */ + increase_queue_size (w, len); + /* Begin the asynch read operation. */ src->ops->asynch_read (src, command, (nbd_completion_callback) { .callback = finished_read, .user_data = command, }); exts.ptr[i].offset += len; exts.ptr[i].length -= len; } @@ -455,20 +483,21 @@ finished_read (void *vp, int *error) /* It's data. If the last was data too, do nothing => * coalesce. Otherwise write the last zero range and start a * new data. */ if (last_is_zero) { /* Write the last zero range (if any). */ if (i - last_offset > 0) { newcommand = create_subcommand (command, last_offset, i - last_offset, true); + decrease_queue_size (command->worker, newcommand->slice.len); fill_dst_range_with_zeroes (newcommand); } /* Start the new data. */ last_offset = i; last_is_zero = false; } } } /* for i */ /* Write the last_offset up to i. */ @@ -480,20 +509,21 @@ finished_read (void *vp, int *error) dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { .callback = finished_command, .user_data = newcommand, }); } else { newcommand = create_subcommand (command, last_offset, i - last_offset, true); + decrease_queue_size (command->worker, newcommand->slice.len); fill_dst_range_with_zeroes (newcommand); } } /* There may be an unaligned tail, so write that. */ if (end - i > 0) { newcommand = create_subcommand (command, i, end - i, false); dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { .callback = finished_command, @@ -573,20 +603,23 @@ static int finished_command (void *vp, int *error) { struct command *command = vp; if (*error) { fprintf (stderr, "write at offset %" PRId64 " failed: %s\n", command->offset, strerror (*error)); exit (EXIT_FAILURE); } + if (command->slice.buffer) + decrease_queue_size (command->worker, command->slice.len); + free_command (command); return 1; /* auto-retires the command */ } static void free_command (struct command *command) { struct buffer *buffer = command->slice.buffer; diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h index 8027836b..d6bd63f0 100644 --- a/copy/nbdcopy.h +++ b/copy/nbdcopy.h @@ -75,20 +75,26 @@ struct slice { * or NULL). */ }; #define slice_ptr(slice) ((slice).buffer->data + (slice).base) /* Worker state used by multi-threaded copying. */ struct worker { pthread_t thread; size_t index; + + /* The number of bytes queued for in flight read and write requests. + * Tracking this allows issuing many small requests, but limiting the + * number of large requests. + */ + size_t queue_size; }; /* Commands for asynchronous operations in flight. * * We don't store the command type (read/write/zero/etc) because it is * implicit in the function being called and because commands * naturally change from read -> write/zero/etc as they progress. * * slice.buffer may be NULL for commands (like zero) that have no * associated data. -- 2.35.1
Richard W.M. Jones
2022-Feb-20 18:52 UTC
[Libguestfs] [PATCH libnbd 7/8] copy: Track worker queue size
On Sun, Feb 20, 2022 at 02:14:02PM +0200, Nir Soffer wrote:> +static inline void > +increase_queue_size(struct worker *worker, size_t len)^ space and the same in the next function:> +{ > + worker->queue_size += len; > +} > + > +static inline void > +decrease_queue_size(struct worker *worker, size_t len) > +{ > + assert (worker->queue_size >= len); > + worker->queue_size -= len; > +}Do we not need any locking here? Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com libguestfs lets you edit virtual machines. Supports shell scripting, bindings from many languages. http://libguestfs.org