Nir Soffer
2022-Feb-20 12:14 UTC
[Libguestfs] [PATCH libnbd 8/8] copy: Adaptive queue size
Limit the size of the copy queue also by the number of queued bytes. This allows using many concurrent small requests, required to get good performance, but limiting the number of large requests that are actually faster with lower concurrency. New --queue-size option added to control the maximum queue size. With 2 MiB we can have 8 256 KiB requests per connection. The default queue size is 16 MiB, to match the default --requests value (64) with the default --request-size (256 KiB). Testing show that using more than 16 256 KiB requests with one connection do not improve anything. The new option will simplify limiting memory usage when using large requests, like this change in virt-v2v: https://github.com/libguestfs/virt-v2v/commit/c943420219fa0ee971fc228aa4d9127c5ce973f7 I tested this change with 3 images: - Fedora 35 + 3g of random data - hopefully simulates a real image - Fully allocated image - the special case when every read command is converted to a write command. - Zero image - the special case when every read command is converted to a zero command. On 2 machines: - laptop: Intel(R) Core(TM) i7-10850H CPU @ 2.70GHz, 12 cpus, 1.5 MiB L2 cache per 2 cpus, 12 MiB L3 cache. - server: Intel(R) Xeon(R) Gold 5218R CPU @ 2.10GHz, 80 cpus, 1 MiB L2 cache per cpu, 27.5 MiB L3 cache. In all cases, both source and destination are served by qemu-nbd, using --cache=none --aio=native. Because qemu-nbd does not support MULTI_CON for writing, we are testing a single connection when copying an to qemu-nbd. I tested also copying to null: since in this case we use 4 connections (these tests are marked with /ro). Results for copying all images on all machines with nbdcopy v1.11.0 and this change. "before" and "after" are average time of 10 runs. image machine before after queue size improvement ==================================================================fedora laptop 3.044 2.129 2m +43% full laptop 4.900 3.136 2m +56% zero laptop 3.147 2.624 2m +20% ------------------------------------------------------------------- fedora server 2.324 2.189 16m +6% full server 3.521 3.380 8m +4% zero server 2.297 2.338 16m -2% ------------------------------------------------------------------- fedora/ro laptop 2.040 1.663 1m +23% fedora/ro server 1.585 1.393 2m +14% Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- copy/main.c | 52 ++++++++++++++++++++++++------------- copy/multi-thread-copying.c | 18 +++++++------ copy/nbdcopy.h | 1 + copy/nbdcopy.pod | 12 +++++++-- 4 files changed, 55 insertions(+), 28 deletions(-) diff --git a/copy/main.c b/copy/main.c index 390de1eb..832f99da 100644 --- a/copy/main.c +++ b/copy/main.c @@ -36,53 +36,55 @@ #include <pthread.h> #include <libnbd.h> #include "ispowerof2.h" #include "human-size.h" #include "version.h" #include "nbdcopy.h" -bool allocated; /* --allocated flag */ -unsigned connections = 4; /* --connections */ -bool destination_is_zero; /* --destination-is-zero flag */ -bool extents = true; /* ! --no-extents flag */ -bool flush; /* --flush flag */ -unsigned max_requests = 64; /* --requests */ -bool progress; /* -p flag */ -int progress_fd = -1; /* --progress=FD */ -unsigned request_size = 1<<18; /* --request-size */ -unsigned sparse_size = 4096; /* --sparse */ -bool synchronous; /* --synchronous flag */ -unsigned threads; /* --threads */ -struct rw *src, *dst; /* The source and destination. */ -bool verbose; /* --verbose flag */ - -const char *prog; /* program name (== basename argv[0]) */ +bool allocated; /* --allocated flag */ +unsigned connections = 4; /* --connections */ +bool destination_is_zero; /* --destination-is-zero flag */ +bool extents = true; /* ! --no-extents flag */ +bool flush; /* --flush flag */ +unsigned max_requests = 64; /* --requests */ +bool progress; /* -p flag */ +int progress_fd = -1; /* --progress=FD */ +unsigned request_size = 1<<18; /* --request-size */ +unsigned queue_size = 16<<20; /* --queue-size */ +unsigned sparse_size = 4096; /* --sparse */ +bool synchronous; /* --synchronous flag */ +unsigned threads; /* --threads */ +struct rw *src, *dst; /* The source and destination. */ +bool verbose; /* --verbose flag */ + +const char *prog; /* program name (== basename argv[0]) */ static bool is_nbd_uri (const char *s); static struct rw *open_local (const char *filename, direction d); static void print_rw (struct rw *rw, const char *prefix, FILE *fp); static void __attribute__((noreturn)) usage (FILE *fp, int exitcode) { fprintf (fp, "\n" "Copy to and from an NBD server:\n" "\n" " nbdcopy [--allocated] [-C N|--connections=N]\n" " [--destination-is-zero|--target-is-zero] [--flush]\n" " [--no-extents] [-p|--progress|--progress=FD]\n" -" [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N]\n" -" [--synchronous] [-T N|--threads=N] [-v|--verbose]\n" +" [--request-size=N] [--queue-size=N] [-R N|--requests=N]\n" +" [-S N|--sparse=N] [--synchronous] [-T N|--threads=N] \n" +" [-v|--verbose]\n" " SOURCE DESTINATION\n" "\n" " SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ]\n" " DESTINATION += null:\n" "\n" "Other options:\n" "\n" " nbdcopy --help\n" " nbdcopy -V|--version\n" "\n" @@ -104,33 +106,35 @@ main (int argc, char *argv[]) { enum { HELP_OPTION = CHAR_MAX + 1, LONG_OPTIONS, SHORT_OPTIONS, ALLOCATED_OPTION, DESTINATION_IS_ZERO_OPTION, FLUSH_OPTION, NO_EXTENTS_OPTION, REQUEST_SIZE_OPTION, + QUEUE_SIZE_OPTION, SYNCHRONOUS_OPTION, }; const char *short_options = "C:pR:S:T:vV"; const struct option long_options[] = { { "help", no_argument, NULL, HELP_OPTION }, { "long-options", no_argument, NULL, LONG_OPTIONS }, { "allocated", no_argument, NULL, ALLOCATED_OPTION }, { "connections", required_argument, NULL, 'C' }, { "destination-is-zero",no_argument, NULL, DESTINATION_IS_ZERO_OPTION }, { "flush", no_argument, NULL, FLUSH_OPTION }, { "no-extents", no_argument, NULL, NO_EXTENTS_OPTION }, { "progress", optional_argument, NULL, 'p' }, { "request-size", required_argument, NULL, REQUEST_SIZE_OPTION }, + { "queue-size", required_argument, NULL, QUEUE_SIZE_OPTION }, { "requests", required_argument, NULL, 'R' }, { "short-options", no_argument, NULL, SHORT_OPTIONS }, { "sparse", required_argument, NULL, 'S' }, { "synchronous", no_argument, NULL, SYNCHRONOUS_OPTION }, { "target-is-zero", no_argument, NULL, DESTINATION_IS_ZERO_OPTION }, { "threads", required_argument, NULL, 'T' }, { "verbose", no_argument, NULL, 'v' }, { "version", no_argument, NULL, 'V' }, { NULL } }; @@ -212,20 +216,28 @@ main (int argc, char *argv[]) } if (request_size < MIN_REQUEST_SIZE || request_size > MAX_REQUEST_SIZE || !is_power_of_2 (request_size)) { fprintf (stderr, "%s: --request-size: must be a power of 2 within %d-%d\n", prog, MIN_REQUEST_SIZE, MAX_REQUEST_SIZE); exit (EXIT_FAILURE); } break; + case QUEUE_SIZE_OPTION: + if (sscanf (optarg, "%u", &queue_size) != 1) { + fprintf (stderr, "%s: --queue-size: could not parse: %s\n", + prog, optarg); + exit (EXIT_FAILURE); + } + break; + case 'R': if (sscanf (optarg, "%u", &max_requests) != 1 || max_requests == 0) { fprintf (stderr, "%s: --requests: could not parse: %s\n", prog, optarg); exit (EXIT_FAILURE); } break; case 'S': if (sscanf (optarg, "%u", &sparse_size) != 1) { @@ -360,20 +372,24 @@ main (int argc, char *argv[]) } if (synchronous) connections = 1; if (connections < threads) threads = connections; if (threads < connections) connections = threads; + /* Adapt queue to size to request size if needed. */ + if (request_size > queue_size) + queue_size = request_size; + /* Truncate the destination to the same size as the source. Only * has an effect on regular files. */ if (dst->ops->truncate) dst->ops->truncate (dst, src->size); /* Check if the source is bigger than the destination, since that * would truncate (ie. lose) data. Copying from smaller to larger * is OK. */ diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index 620dc571..d9f17285 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -123,21 +123,21 @@ multi_thread_copying (void) if (err != 0) { errno = err; perror ("pthread_join"); exit (EXIT_FAILURE); } } free (workers); } -static void wait_for_request_slots (size_t index); +static void wait_for_request_slots (struct worker *worker); static unsigned in_flight (size_t index); static void poll_both_ends (size_t index); static int finished_read (void *vp, int *error); static int finished_command (void *vp, int *error); static void free_command (struct command *command); static void fill_dst_range_with_zeroes (struct command *command); static struct command *create_command (uint64_t offset, size_t len, bool zero, struct worker *worker); /* Tracking worker queue size. @@ -148,20 +148,21 @@ static struct command *create_command (uint64_t offset, size_t len, bool zero, * subcommand in finished_read(), or when a write command completes in * finished_command(). * * Zero commands are not considered in the queue size since they have no * payload. */ static inline void increase_queue_size(struct worker *worker, size_t len) { + assert (worker->queue_size < queue_size); worker->queue_size += len; } static inline void decrease_queue_size(struct worker *worker, size_t len) { assert (worker->queue_size >= len); worker->queue_size -= len; } @@ -203,21 +204,21 @@ worker_thread (void *wp) * requests. */ while (exts.ptr[i].length > 0) { len = exts.ptr[i].length; if (len > request_size) len = request_size; command = create_command (exts.ptr[i].offset, len, false, w); - wait_for_request_slots (w->index); + wait_for_request_slots (w); /* NOTE: Must increase the queue size after waiting. */ increase_queue_size (w, len); /* Begin the asynch read operation. */ src->ops->asynch_read (src, command, (nbd_completion_callback) { .callback = finished_read, .user_data = command, }); @@ -233,36 +234,37 @@ worker_thread (void *wp) } /* Wait for in flight NBD requests to finish. */ while (in_flight (w->index) > 0) poll_both_ends (w->index); free (exts.ptr); return NULL; } -/* If the number of requests in flight exceeds the limit, poll - * waiting for at least one request to finish. This enforces - * the user --requests option. +/* If the number of requests in flight or the number of queued bytes + * exceed the limit, poll waiting for at least one request to finish. + * This enforces the user --requests and --queue-size options. * * NB: Unfortunately it's not possible to call this from a callback, * since it will deadlock trying to grab the libnbd handle lock. This * means that although the worker thread calls this and enforces the * limit, when we split up requests into subrequests (eg. doing * sparseness detection) we will probably exceed the user request * limit. XXX */ static void -wait_for_request_slots (size_t index) +wait_for_request_slots (struct worker *worker) { - while (in_flight (index) >= max_requests) - poll_both_ends (index); + while (in_flight (worker->index) >= max_requests || + worker->queue_size >= queue_size) + poll_both_ends (worker->index); } /* Count the number of asynchronous commands in flight. */ static unsigned in_flight (size_t index) { return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index); } /* Poll (optional) NBD src and NBD dst, moving the state machine(s) diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h index d6bd63f0..19797dfd 100644 --- a/copy/nbdcopy.h +++ b/copy/nbdcopy.h @@ -226,20 +226,21 @@ extern void asynch_notify_read_write_not_supported (struct rw *rw, extern bool allocated; extern unsigned connections; extern bool destination_is_zero; extern bool extents; extern bool flush; extern unsigned max_requests; extern bool progress; extern int progress_fd; extern unsigned request_size; +extern unsigned queue_size; extern unsigned sparse_size; extern bool synchronous; extern unsigned threads; extern bool verbose; extern const char *prog; extern void progress_bar (off_t pos, int64_t size); extern void synch_copying (void); extern void multi_thread_copying (void); diff --git a/copy/nbdcopy.pod b/copy/nbdcopy.pod index f7100935..22ceca8e 100644 --- a/copy/nbdcopy.pod +++ b/copy/nbdcopy.pod @@ -1,21 +1,22 @@ =head1 NAME nbdcopy - copy to and from an NBD server =head1 SYNOPSIS nbdcopy [--allocated] [-C N|--connections=N] [--destination-is-zero|--target-is-zero] [--flush] [--no-extents] [-p|--progress|--progress=FD] - [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N] - [--synchronous] [-T N|--threads=N] [-v|--verbose] + [--request-size=N] [--queue-size=N] [-R N|--requests=N] + [-S N|--sparse=N] [--synchronous] [-T N|--threads=N] + [-v|--verbose] SOURCE DESTINATION SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ] DESTINATION += null: nbdcopy --help nbdcopy -V|--version =head1 EXAMPLES @@ -156,20 +157,27 @@ following shell commands: Set the maximum request size in bytes. The maximum value is 32 MiB, specified by the NBD protocol. =item B<-R> N =item B<--requests=>N Set the maximum number of requests in flight per NBD connection. +=item B<--queue-size=>N + +Set the maximum number of bytes to queue for in flight requests. The +default value is 16 MiB, allowing up to 64 256k requests per NBD +connection. If you use larger B<--request-size> you may want to increase +this value. + =item B<-S> N =item B<--sparse=>N Detect all zero blocks of size N (bytes) and make them sparse on the output. You can also turn off sparse detection using S<I<-S 0>>. The default is 4096 bytes. =item B<--synchronous> -- 2.35.1
Richard W.M. Jones
2022-Feb-20 18:56 UTC
[Libguestfs] [PATCH libnbd 8/8] copy: Adaptive queue size
On Sun, Feb 20, 2022 at 02:14:03PM +0200, Nir Soffer wrote:> Limit the size of the copy queue also by the number of queued bytes. > This allows using many concurrent small requests, required to get good > performance, but limiting the number of large requests that are actually > faster with lower concurrency. > > New --queue-size option added to control the maximum queue size. With 2 > MiB we can have 8 256 KiB requests per connection. The default queue > size is 16 MiB, to match the default --requests value (64) with the > default --request-size (256 KiB). Testing show that using more than 16 > 256 KiB requests with one connection do not improve anything. > > The new option will simplify limiting memory usage when using large > requests, like this change in virt-v2v: > https://github.com/libguestfs/virt-v2v/commit/c943420219fa0ee971fc228aa4d9127c5ce973f7 > > I tested this change with 3 images: > > - Fedora 35 + 3g of random data - hopefully simulates a real image > - Fully allocated image - the special case when every read command is > converted to a write command. > - Zero image - the special case when every read command is converted to > a zero command. > > On 2 machines: > > - laptop: Intel(R) Core(TM) i7-10850H CPU @ 2.70GHz, 12 cpus, > 1.5 MiB L2 cache per 2 cpus, 12 MiB L3 cache. > - server: Intel(R) Xeon(R) Gold 5218R CPU @ 2.10GHz, 80 cpus, > 1 MiB L2 cache per cpu, 27.5 MiB L3 cache. > > In all cases, both source and destination are served by qemu-nbd, using > --cache=none --aio=native. Because qemu-nbd does not support MULTI_CON > for writing, we are testing a single connection when copying an to > qemu-nbd. I tested also copying to null: since in this case we use 4 > connections (these tests are marked with /ro). > > Results for copying all images on all machines with nbdcopy v1.11.0 and > this change. "before" and "after" are average time of 10 runs. > > image machine before after queue size improvement > ==================================================================> fedora laptop 3.044 2.129 2m +43% > full laptop 4.900 3.136 2m +56% > zero laptop 3.147 2.624 2m +20% > ------------------------------------------------------------------- > fedora server 2.324 2.189 16m +6% > full server 3.521 3.380 8m +4% > zero server 2.297 2.338 16m -2% > ------------------------------------------------------------------- > fedora/ro laptop 2.040 1.663 1m +23% > fedora/ro server 1.585 1.393 2m +14% > > Signed-off-by: Nir Soffer <nsoffer at redhat.com> > --- > copy/main.c | 52 ++++++++++++++++++++++++------------- > copy/multi-thread-copying.c | 18 +++++++------ > copy/nbdcopy.h | 1 + > copy/nbdcopy.pod | 12 +++++++-- > 4 files changed, 55 insertions(+), 28 deletions(-) > > diff --git a/copy/main.c b/copy/main.c > index 390de1eb..832f99da 100644 > --- a/copy/main.c > +++ b/copy/main.c > @@ -36,53 +36,55 @@ > > #include <pthread.h> > > #include <libnbd.h> > > #include "ispowerof2.h" > #include "human-size.h" > #include "version.h" > #include "nbdcopy.h" > > -bool allocated; /* --allocated flag */ > -unsigned connections = 4; /* --connections */ > -bool destination_is_zero; /* --destination-is-zero flag */ > -bool extents = true; /* ! --no-extents flag */ > -bool flush; /* --flush flag */ > -unsigned max_requests = 64; /* --requests */ > -bool progress; /* -p flag */ > -int progress_fd = -1; /* --progress=FD */ > -unsigned request_size = 1<<18; /* --request-size */ > -unsigned sparse_size = 4096; /* --sparse */ > -bool synchronous; /* --synchronous flag */ > -unsigned threads; /* --threads */ > -struct rw *src, *dst; /* The source and destination. */ > -bool verbose; /* --verbose flag */ > - > -const char *prog; /* program name (== basename argv[0]) */ > +bool allocated; /* --allocated flag */ > +unsigned connections = 4; /* --connections */ > +bool destination_is_zero; /* --destination-is-zero flag */ > +bool extents = true; /* ! --no-extents flag */ > +bool flush; /* --flush flag */ > +unsigned max_requests = 64; /* --requests */ > +bool progress; /* -p flag */ > +int progress_fd = -1; /* --progress=FD */ > +unsigned request_size = 1<<18; /* --request-size */ > +unsigned queue_size = 16<<20; /* --queue-size */ > +unsigned sparse_size = 4096; /* --sparse */ > +bool synchronous; /* --synchronous flag */ > +unsigned threads; /* --threads */ > +struct rw *src, *dst; /* The source and destination. */ > +bool verbose; /* --verbose flag */ > + > +const char *prog; /* program name (== basename argv[0]) */ > > static bool is_nbd_uri (const char *s); > static struct rw *open_local (const char *filename, direction d); > static void print_rw (struct rw *rw, const char *prefix, FILE *fp); > > static void __attribute__((noreturn)) > usage (FILE *fp, int exitcode) > { > fprintf (fp, > "\n" > "Copy to and from an NBD server:\n" > "\n" > " nbdcopy [--allocated] [-C N|--connections=N]\n" > " [--destination-is-zero|--target-is-zero] [--flush]\n" > " [--no-extents] [-p|--progress|--progress=FD]\n" > -" [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N]\n" > -" [--synchronous] [-T N|--threads=N] [-v|--verbose]\n" > +" [--request-size=N] [--queue-size=N] [-R N|--requests=N]\n" > +" [-S N|--sparse=N] [--synchronous] [-T N|--threads=N] \n" > +" [-v|--verbose]\n" > " SOURCE DESTINATION\n" > "\n" > " SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ]\n" > " DESTINATION += null:\n" > "\n" > "Other options:\n" > "\n" > " nbdcopy --help\n" > " nbdcopy -V|--version\n" > "\n" > @@ -104,33 +106,35 @@ main (int argc, char *argv[]) > { > enum { > HELP_OPTION = CHAR_MAX + 1, > LONG_OPTIONS, > SHORT_OPTIONS, > ALLOCATED_OPTION, > DESTINATION_IS_ZERO_OPTION, > FLUSH_OPTION, > NO_EXTENTS_OPTION, > REQUEST_SIZE_OPTION, > + QUEUE_SIZE_OPTION, > SYNCHRONOUS_OPTION, > }; > const char *short_options = "C:pR:S:T:vV"; > const struct option long_options[] = { > { "help", no_argument, NULL, HELP_OPTION }, > { "long-options", no_argument, NULL, LONG_OPTIONS }, > { "allocated", no_argument, NULL, ALLOCATED_OPTION }, > { "connections", required_argument, NULL, 'C' }, > { "destination-is-zero",no_argument, NULL, DESTINATION_IS_ZERO_OPTION }, > { "flush", no_argument, NULL, FLUSH_OPTION }, > { "no-extents", no_argument, NULL, NO_EXTENTS_OPTION }, > { "progress", optional_argument, NULL, 'p' }, > { "request-size", required_argument, NULL, REQUEST_SIZE_OPTION }, > + { "queue-size", required_argument, NULL, QUEUE_SIZE_OPTION }, > { "requests", required_argument, NULL, 'R' }, > { "short-options", no_argument, NULL, SHORT_OPTIONS }, > { "sparse", required_argument, NULL, 'S' }, > { "synchronous", no_argument, NULL, SYNCHRONOUS_OPTION }, > { "target-is-zero", no_argument, NULL, DESTINATION_IS_ZERO_OPTION }, > { "threads", required_argument, NULL, 'T' }, > { "verbose", no_argument, NULL, 'v' }, > { "version", no_argument, NULL, 'V' }, > { NULL } > }; > @@ -212,20 +216,28 @@ main (int argc, char *argv[]) > } > if (request_size < MIN_REQUEST_SIZE || request_size > MAX_REQUEST_SIZE || > !is_power_of_2 (request_size)) { > fprintf (stderr, > "%s: --request-size: must be a power of 2 within %d-%d\n", > prog, MIN_REQUEST_SIZE, MAX_REQUEST_SIZE); > exit (EXIT_FAILURE); > } > break; > > + case QUEUE_SIZE_OPTION: > + if (sscanf (optarg, "%u", &queue_size) != 1) { > + fprintf (stderr, "%s: --queue-size: could not parse: %s\n", > + prog, optarg); > + exit (EXIT_FAILURE); > + } > + break; > + > case 'R': > if (sscanf (optarg, "%u", &max_requests) != 1 || max_requests == 0) { > fprintf (stderr, "%s: --requests: could not parse: %s\n", > prog, optarg); > exit (EXIT_FAILURE); > } > break; > > case 'S': > if (sscanf (optarg, "%u", &sparse_size) != 1) { > @@ -360,20 +372,24 @@ main (int argc, char *argv[]) > } > > if (synchronous) > connections = 1; > > if (connections < threads) > threads = connections; > if (threads < connections) > connections = threads; > > + /* Adapt queue to size to request size if needed. */ > + if (request_size > queue_size) > + queue_size = request_size; > + > /* Truncate the destination to the same size as the source. Only > * has an effect on regular files. > */ > if (dst->ops->truncate) > dst->ops->truncate (dst, src->size); > > /* Check if the source is bigger than the destination, since that > * would truncate (ie. lose) data. Copying from smaller to larger > * is OK. > */ > diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c > index 620dc571..d9f17285 100644 > --- a/copy/multi-thread-copying.c > +++ b/copy/multi-thread-copying.c > @@ -123,21 +123,21 @@ multi_thread_copying (void) > if (err != 0) { > errno = err; > perror ("pthread_join"); > exit (EXIT_FAILURE); > } > } > > free (workers); > } > > -static void wait_for_request_slots (size_t index); > +static void wait_for_request_slots (struct worker *worker); > static unsigned in_flight (size_t index); > static void poll_both_ends (size_t index); > static int finished_read (void *vp, int *error); > static int finished_command (void *vp, int *error); > static void free_command (struct command *command); > static void fill_dst_range_with_zeroes (struct command *command); > static struct command *create_command (uint64_t offset, size_t len, bool zero, > struct worker *worker); > > /* Tracking worker queue size. > @@ -148,20 +148,21 @@ static struct command *create_command (uint64_t offset, size_t len, bool zero, > * subcommand in finished_read(), or when a write command completes in > * finished_command(). > * > * Zero commands are not considered in the queue size since they have no > * payload. > */ > > static inline void > increase_queue_size(struct worker *worker, size_t len) > { > + assert (worker->queue_size < queue_size); > worker->queue_size += len; > } > > static inline void > decrease_queue_size(struct worker *worker, size_t len) > { > assert (worker->queue_size >= len); > worker->queue_size -= len; > } > > @@ -203,21 +204,21 @@ worker_thread (void *wp) > * requests. > */ > while (exts.ptr[i].length > 0) { > len = exts.ptr[i].length; > if (len > request_size) > len = request_size; > > command = create_command (exts.ptr[i].offset, len, > false, w); > > - wait_for_request_slots (w->index); > + wait_for_request_slots (w); > > /* NOTE: Must increase the queue size after waiting. */ > increase_queue_size (w, len); > > /* Begin the asynch read operation. */ > src->ops->asynch_read (src, command, > (nbd_completion_callback) { > .callback = finished_read, > .user_data = command, > }); > @@ -233,36 +234,37 @@ worker_thread (void *wp) > } > > /* Wait for in flight NBD requests to finish. */ > while (in_flight (w->index) > 0) > poll_both_ends (w->index); > > free (exts.ptr); > return NULL; > } > > -/* If the number of requests in flight exceeds the limit, poll > - * waiting for at least one request to finish. This enforces > - * the user --requests option. > +/* If the number of requests in flight or the number of queued bytes > + * exceed the limit, poll waiting for at least one request to finish. > + * This enforces the user --requests and --queue-size options. > * > * NB: Unfortunately it's not possible to call this from a callback, > * since it will deadlock trying to grab the libnbd handle lock. This > * means that although the worker thread calls this and enforces the > * limit, when we split up requests into subrequests (eg. doing > * sparseness detection) we will probably exceed the user request > * limit. XXX > */ > static void > -wait_for_request_slots (size_t index) > +wait_for_request_slots (struct worker *worker) > { > - while (in_flight (index) >= max_requests) > - poll_both_ends (index); > + while (in_flight (worker->index) >= max_requests || > + worker->queue_size >= queue_size) > + poll_both_ends (worker->index); > } > > /* Count the number of asynchronous commands in flight. */ > static unsigned > in_flight (size_t index) > { > return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index); > } > > /* Poll (optional) NBD src and NBD dst, moving the state machine(s) > diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h > index d6bd63f0..19797dfd 100644 > --- a/copy/nbdcopy.h > +++ b/copy/nbdcopy.h > @@ -226,20 +226,21 @@ extern void asynch_notify_read_write_not_supported (struct rw *rw, > > extern bool allocated; > extern unsigned connections; > extern bool destination_is_zero; > extern bool extents; > extern bool flush; > extern unsigned max_requests; > extern bool progress; > extern int progress_fd; > extern unsigned request_size; > +extern unsigned queue_size; > extern unsigned sparse_size; > extern bool synchronous; > extern unsigned threads; > extern bool verbose; > > extern const char *prog; > > extern void progress_bar (off_t pos, int64_t size); > extern void synch_copying (void); > extern void multi_thread_copying (void); > diff --git a/copy/nbdcopy.pod b/copy/nbdcopy.pod > index f7100935..22ceca8e 100644 > --- a/copy/nbdcopy.pod > +++ b/copy/nbdcopy.pod > @@ -1,21 +1,22 @@ > =head1 NAME > > nbdcopy - copy to and from an NBD server > > =head1 SYNOPSIS > > nbdcopy [--allocated] [-C N|--connections=N] > [--destination-is-zero|--target-is-zero] [--flush] > [--no-extents] [-p|--progress|--progress=FD] > - [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N] > - [--synchronous] [-T N|--threads=N] [-v|--verbose] > + [--request-size=N] [--queue-size=N] [-R N|--requests=N] > + [-S N|--sparse=N] [--synchronous] [-T N|--threads=N] > + [-v|--verbose] > SOURCE DESTINATION > > SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ] > DESTINATION += null: > > nbdcopy --help > > nbdcopy -V|--version > > =head1 EXAMPLES > @@ -156,20 +157,27 @@ following shell commands: > > Set the maximum request size in bytes. The maximum value is 32 MiB, > specified by the NBD protocol. > > =item B<-R> N > > =item B<--requests=>N > > Set the maximum number of requests in flight per NBD connection. > > +=item B<--queue-size=>N > + > +Set the maximum number of bytes to queue for in flight requests. The > +default value is 16 MiB, allowing up to 64 256k requests per NBD > +connection. If you use larger B<--request-size> you may want to increase > +this value. > + > =item B<-S> N > > =item B<--sparse=>N > > Detect all zero blocks of size N (bytes) and make them sparse on the > output. You can also turn off sparse detection using S<I<-S 0>>. > The default is 4096 bytes. > > =item B<--synchronous>Patch series looks pretty sensible to me. Let's see if Eric has any comments. Should the queue size be represented by a size_t? While it seems unlikely now you'd want a queue of larger than 4GB, I guess there might be a niche case for it (and maybe in 10 years we'll all have massive laptops!) Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com libguestfs lets you edit virtual machines. Supports shell scripting, bindings from many languages. http://libguestfs.org
Eric Blake
2022-Feb-21 15:40 UTC
[Libguestfs] [PATCH libnbd 8/8] copy: Adaptive queue size
On Sun, Feb 20, 2022 at 02:14:03PM +0200, Nir Soffer wrote:> Limit the size of the copy queue also by the number of queued bytes. > This allows using many concurrent small requests, required to get good > performance, but limiting the number of large requests that are actually > faster with lower concurrency. > > New --queue-size option added to control the maximum queue size. With 2 > MiB we can have 8 256 KiB requests per connection. The default queue > size is 16 MiB, to match the default --requests value (64) with the > default --request-size (256 KiB). Testing show that using more than 16 > 256 KiB requests with one connection do not improve anything.s/do/does/> > The new option will simplify limiting memory usage when using large > requests, like this change in virt-v2v: > https://github.com/libguestfs/virt-v2v/commit/c943420219fa0ee971fc228aa4d9127c5ce973f7 > > I tested this change with 3 images: > > - Fedora 35 + 3g of random data - hopefully simulates a real image > - Fully allocated image - the special case when every read command is > converted to a write command. > - Zero image - the special case when every read command is converted to > a zero command. > > On 2 machines: > > - laptop: Intel(R) Core(TM) i7-10850H CPU @ 2.70GHz, 12 cpus, > 1.5 MiB L2 cache per 2 cpus, 12 MiB L3 cache. > - server: Intel(R) Xeon(R) Gold 5218R CPU @ 2.10GHz, 80 cpus, > 1 MiB L2 cache per cpu, 27.5 MiB L3 cache. > > In all cases, both source and destination are served by qemu-nbd, using > --cache=none --aio=native. Because qemu-nbd does not support MULTI_CONMULTI_CONN> for writing, we are testing a single connection when copying an toDid you mean 'copying an image to'?> qemu-nbd. I tested also copying to null: since in this case we use 4 > connections (these tests are marked with /ro). > > Results for copying all images on all machines with nbdcopy v1.11.0 and > this change. "before" and "after" are average time of 10 runs. > > image machine before after queue size improvement > ==================================================================> fedora laptop 3.044 2.129 2m +43% > full laptop 4.900 3.136 2m +56% > zero laptop 3.147 2.624 2m +20% > ------------------------------------------------------------------- > fedora server 2.324 2.189 16m +6% > full server 3.521 3.380 8m +4% > zero server 2.297 2.338 16m -2% > ------------------------------------------------------------------- > fedora/ro laptop 2.040 1.663 1m +23% > fedora/ro server 1.585 1.393 2m +14% > > Signed-off-by: Nir Soffer <nsoffer at redhat.com> > --- > copy/main.c | 52 ++++++++++++++++++++++++------------- > copy/multi-thread-copying.c | 18 +++++++------ > copy/nbdcopy.h | 1 + > copy/nbdcopy.pod | 12 +++++++-- > 4 files changed, 55 insertions(+), 28 deletions(-) >> static void __attribute__((noreturn)) > usage (FILE *fp, int exitcode) > { > fprintf (fp, > "\n" > "Copy to and from an NBD server:\n" > "\n" > " nbdcopy [--allocated] [-C N|--connections=N]\n" > " [--destination-is-zero|--target-is-zero] [--flush]\n" > " [--no-extents] [-p|--progress|--progress=FD]\n" > -" [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N]\n" > -" [--synchronous] [-T N|--threads=N] [-v|--verbose]\n" > +" [--request-size=N] [--queue-size=N] [-R N|--requests=N]\n"The options are listed in mostly alphabetic order already, so --queue-size before --request-size makes more sense to me.> @@ -104,33 +106,35 @@ main (int argc, char *argv[]) > { > enum { > HELP_OPTION = CHAR_MAX + 1, > LONG_OPTIONS, > SHORT_OPTIONS, > ALLOCATED_OPTION, > DESTINATION_IS_ZERO_OPTION, > FLUSH_OPTION, > NO_EXTENTS_OPTION, > REQUEST_SIZE_OPTION, > + QUEUE_SIZE_OPTION,Likewise here.> SYNCHRONOUS_OPTION, > }; > const char *short_options = "C:pR:S:T:vV"; > const struct option long_options[] = { > { "help", no_argument, NULL, HELP_OPTION }, > { "long-options", no_argument, NULL, LONG_OPTIONS }, > { "allocated", no_argument, NULL, ALLOCATED_OPTION }, > { "connections", required_argument, NULL, 'C' }, > { "destination-is-zero",no_argument, NULL, DESTINATION_IS_ZERO_OPTION }, > { "flush", no_argument, NULL, FLUSH_OPTION }, > { "no-extents", no_argument, NULL, NO_EXTENTS_OPTION }, > { "progress", optional_argument, NULL, 'p' }, > { "request-size", required_argument, NULL, REQUEST_SIZE_OPTION }, > + { "queue-size", required_argument, NULL, QUEUE_SIZE_OPTION },and here.> { "requests", required_argument, NULL, 'R' }, > { "short-options", no_argument, NULL, SHORT_OPTIONS }, > { "sparse", required_argument, NULL, 'S' }, > { "synchronous", no_argument, NULL, SYNCHRONOUS_OPTION }, > { "target-is-zero", no_argument, NULL, DESTINATION_IS_ZERO_OPTION }, > { "threads", required_argument, NULL, 'T' }, > { "verbose", no_argument, NULL, 'v' }, > { "version", no_argument, NULL, 'V' }, > { NULL } > }; > @@ -212,20 +216,28 @@ main (int argc, char *argv[]) > } > if (request_size < MIN_REQUEST_SIZE || request_size > MAX_REQUEST_SIZE || > !is_power_of_2 (request_size)) { > fprintf (stderr, > "%s: --request-size: must be a power of 2 within %d-%d\n", > prog, MIN_REQUEST_SIZE, MAX_REQUEST_SIZE); > exit (EXIT_FAILURE); > } > break; > > + case QUEUE_SIZE_OPTION:and here.> + if (sscanf (optarg, "%u", &queue_size) != 1) {Matches pre-existing use, but *scanf("%u") has an inherent limitation of being non-portable when it comes to dealing with overflow. Better is to use the strtol* family of functions when parsing user input into an integer - but that fits in a separate patch.> > -/* If the number of requests in flight exceeds the limit, poll > - * waiting for at least one request to finish. This enforces > - * the user --requests option. > +/* If the number of requests in flight or the number of queued bytes > + * exceed the limit, poll waiting for at least one request to finish.Pre-existing difficulty in reading, made worse by more words. I would suggest: If the number of requests or queued bytes in flight exceed limits, then poll until enough requests finish.> + * This enforces the user --requests and --queue-size options. > * > * NB: Unfortunately it's not possible to call this from a callback, > * since it will deadlock trying to grab the libnbd handle lock. This > * means that although the worker thread calls this and enforces the > * limit, when we split up requests into subrequests (eg. doing > * sparseness detection) we will probably exceed the user request > * limit. XXX > */ > static void > -wait_for_request_slots (size_t index) > +wait_for_request_slots (struct worker *worker) > { > - while (in_flight (index) >= max_requests) > - poll_both_ends (index); > + while (in_flight (worker->index) >= max_requests || > + worker->queue_size >= queue_size) > + poll_both_ends (worker->index); > }Looks like a nice improvement.> =head1 SYNOPSIS > > nbdcopy [--allocated] [-C N|--connections=N] > [--destination-is-zero|--target-is-zero] [--flush] > [--no-extents] [-p|--progress|--progress=FD] > - [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N] > - [--synchronous] [-T N|--threads=N] [-v|--verbose] > + [--request-size=N] [--queue-size=N] [-R N|--requests=N]Another spot for my alphabetic sorting request.> @@ -156,20 +157,27 @@ following shell commands: > > Set the maximum request size in bytes. The maximum value is 32 MiB, > specified by the NBD protocol. > > =item B<-R> N > > =item B<--requests=>N > > Set the maximum number of requests in flight per NBD connection. > > +=item B<--queue-size=>NAnd again. I'm okay with you making the fixes that we pointed out, without necessarily needing to see a v2 post. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3266 Virtualization: qemu.org | libvirt.org