Nir Soffer
2022-Feb-20 12:13 UTC
[Libguestfs] [PATCH libnbd 0/8] nbdcopy: Adaptive queue size
This series add adaptive queue size feature, which gives great performance improvmemnt on my laptop, but less exciting results on a real server. When qemu-nbd will support MULTI-CON for writes, this should become more interesting. To implement this I added a worker struct for keeping worker state, and cleaned up the completion flow and other stuff. I think these cleanups are a good idea even if we do not add adaptive queue size. Nir Soffer (8): copy: Remove wrong references to holes copy: Rename copy_subcommand to create_subcommand copy: Extract create_command and create_buffer helpers copy: Separate finishing a command from freeing it copy: Introduce worker struct copy: Keep worker pointer in command copy: Track worker queue size copy: Adaptive queue size copy/file-ops.c | 4 +- copy/main.c | 58 +++++--- copy/multi-thread-copying.c | 270 ++++++++++++++++++++++-------------- copy/nbd-ops.c | 16 +-- copy/nbdcopy.h | 31 +++-- copy/nbdcopy.pod | 12 +- copy/null-ops.c | 4 +- copy/pipe-ops.c | 2 +- 8 files changed, 248 insertions(+), 149 deletions(-) -- 2.35.1
Nir Soffer
2022-Feb-20 12:13 UTC
[Libguestfs] [PATCH libnbd 1/8] copy: Remove wrong references to holes
In the past nbdcopy was looking for hole extents instead of zero extents. When we fixed this, we forgot to update some comments and variable names referencing hole instead of zero. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- copy/multi-thread-copying.c | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index 7459b446..632d7006 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -337,36 +337,36 @@ poll_both_ends (uintptr_t index) } } } /* Create a sub-command of an existing command. This creates a slice * referencing the buffer of the existing command in order to avoid * copying. */ static struct command * copy_subcommand (struct command *command, uint64_t offset, size_t len, - bool hole) + bool zero) { const uint64_t end = command->offset + command->slice.len; struct command *newcommand; assert (command->offset <= offset && offset < end); assert (offset + len <= end); newcommand = calloc (1, sizeof *newcommand); if (newcommand == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } newcommand->offset = offset; newcommand->slice.len = len; - if (!hole) { + if (!zero) { newcommand->slice.buffer = command->slice.buffer; newcommand->slice.buffer->refs++; newcommand->slice.base = offset - command->offset; } newcommand->index = command->index; return newcommand; } /* Callback called when src has finished one read command. This @@ -390,76 +390,76 @@ finished_read (void *vp, int *error) dst->ops->asynch_write (dst, command, (nbd_completion_callback) { .callback = free_command, .user_data = command, }); } else { /* Sparseness detection. */ const uint64_t start = command->offset; const uint64_t end = start + command->slice.len; uint64_t last_offset = start; - bool last_is_hole = false; + bool last_is_zero = false; uint64_t i; struct command *newcommand; int dummy = 0; /* Iterate over whole blocks in the command, starting on a block * boundary. */ for (i = MIN (ROUND_UP (start, sparse_size), end); i + sparse_size <= end; i += sparse_size) { if (is_zero (slice_ptr (command->slice) + i-start, sparse_size)) { - /* It's a hole. If the last was a hole too then we do nothing - * here which coalesces. Otherwise write the last data and - * start a new hole. + /* It's a zero range. If the last was a zero too then we do + * nothing here which coalesces. Otherwise write the last data + * and start a new zero range. */ - if (!last_is_hole) { + if (!last_is_zero) { /* Write the last data (if any). */ if (i - last_offset > 0) { newcommand = copy_subcommand (command, last_offset, i - last_offset, false); dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { .callback = free_command, .user_data = newcommand, }); } - /* Start the new hole. */ + /* Start the new zero range. */ last_offset = i; - last_is_hole = true; + last_is_zero = true; } } else { /* It's data. If the last was data too, do nothing => - * coalesce. Otherwise write the last hole and start a new - * data. + * coalesce. Otherwise write the last zero range and start a + * new data. */ - if (last_is_hole) { - /* Write the last hole (if any). */ + if (last_is_zero) { + /* Write the last zero range (if any). */ if (i - last_offset > 0) { newcommand = copy_subcommand (command, last_offset, i - last_offset, true); fill_dst_range_with_zeroes (newcommand); } /* Start the new data. */ last_offset = i; - last_is_hole = false; + last_is_zero = false; } } } /* for i */ /* Write the last_offset up to i. */ if (i - last_offset > 0) { - if (!last_is_hole) { + if (!last_is_zero) { newcommand = copy_subcommand (command, last_offset, i - last_offset, false); dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { .callback = free_command, .user_data = newcommand, }); } else { @@ -483,22 +483,22 @@ finished_read (void *vp, int *error) /* Free the original command since it has been split into * subcommands and the original is no longer needed. */ free_command (command, &dummy); } return 1; /* auto-retires the command */ } /* Fill a range in dst with zeroes. This is called from the copying - * loop when we see a hole in the source. Depending on the command - * line flags this could mean: + * loop when we see a zero range in the source. Depending on the + * command line flags this could mean: * * --destination-is-zero: * do nothing * * --allocated: write zeroes allocating space using an efficient * zeroing command or writing a command of zeroes * * (neither flag) write zeroes punching a hole using an efficient * zeroing command or fallback to writing a command * of zeroes. -- 2.35.1
Nir Soffer
2022-Feb-20 12:13 UTC
[Libguestfs] [PATCH libnbd 2/8] copy: Rename copy_subcommand to create_subcommand
copy_subcommand creates a new command without copying the original command. Rename the function to make this more clear. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- copy/multi-thread-copying.c | 29 ++++++++++++++--------------- 1 file changed, 14 insertions(+), 15 deletions(-) diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index 632d7006..2d16d2df 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -332,26 +332,25 @@ poll_both_ends (uintptr_t index) dst->ops->asynch_notify_write (dst, index); else if ((fds[1].revents & (POLLERR | POLLNVAL)) != 0) { errno = ENOTCONN; perror (dst->name); exit (EXIT_FAILURE); } } } /* Create a sub-command of an existing command. This creates a slice - * referencing the buffer of the existing command in order to avoid - * copying. + * referencing the buffer of the existing command without copying. */ static struct command * -copy_subcommand (struct command *command, uint64_t offset, size_t len, - bool zero) +create_subcommand (struct command *command, uint64_t offset, size_t len, + bool zero) { const uint64_t end = command->offset + command->slice.len; struct command *newcommand; assert (command->offset <= offset && offset < end); assert (offset + len <= end); newcommand = calloc (1, sizeof *newcommand); if (newcommand == NULL) { perror ("calloc"); @@ -409,21 +408,21 @@ finished_read (void *vp, int *error) i + sparse_size <= end; i += sparse_size) { if (is_zero (slice_ptr (command->slice) + i-start, sparse_size)) { /* It's a zero range. If the last was a zero too then we do * nothing here which coalesces. Otherwise write the last data * and start a new zero range. */ if (!last_is_zero) { /* Write the last data (if any). */ if (i - last_offset > 0) { - newcommand = copy_subcommand (command, + newcommand = create_subcommand (command, last_offset, i - last_offset, false); dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { .callback = free_command, .user_data = newcommand, }); } /* Start the new zero range. */ last_offset = i; @@ -431,55 +430,55 @@ finished_read (void *vp, int *error) } } else { /* It's data. If the last was data too, do nothing => * coalesce. Otherwise write the last zero range and start a * new data. */ if (last_is_zero) { /* Write the last zero range (if any). */ if (i - last_offset > 0) { - newcommand = copy_subcommand (command, - last_offset, i - last_offset, - true); + newcommand = create_subcommand (command, + last_offset, i - last_offset, + true); fill_dst_range_with_zeroes (newcommand); } /* Start the new data. */ last_offset = i; last_is_zero = false; } } } /* for i */ /* Write the last_offset up to i. */ if (i - last_offset > 0) { if (!last_is_zero) { - newcommand = copy_subcommand (command, - last_offset, i - last_offset, - false); + newcommand = create_subcommand (command, + last_offset, i - last_offset, + false); dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { .callback = free_command, .user_data = newcommand, }); } else { - newcommand = copy_subcommand (command, - last_offset, i - last_offset, - true); + newcommand = create_subcommand (command, + last_offset, i - last_offset, + true); fill_dst_range_with_zeroes (newcommand); } } /* There may be an unaligned tail, so write that. */ if (end - i > 0) { - newcommand = copy_subcommand (command, i, end - i, false); + newcommand = create_subcommand (command, i, end - i, false); dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { .callback = free_command, .user_data = newcommand, }); } /* Free the original command since it has been split into * subcommands and the original is no longer needed. */ -- 2.35.1
Nir Soffer
2022-Feb-20 12:13 UTC
[Libguestfs] [PATCH libnbd 3/8] copy: Extract create_command and create_buffer helpers
Creating a new command requires lot of boilerplate that makes it harder to focus on the interesting code. Extract a helpers to create a command, and the command slice buffer. create_buffer is called only once, but the compiler is smart enough to inline it, and adding it makes the code much simpler. This change is a refactoring except fixing perror() message for calloc() failure. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- copy/multi-thread-copying.c | 87 +++++++++++++++++++++++-------------- 1 file changed, 54 insertions(+), 33 deletions(-) diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index 2d16d2df..855d1ba4 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -128,20 +128,22 @@ multi_thread_copying (void) free (workers); } static void wait_for_request_slots (uintptr_t index); static unsigned in_flight (uintptr_t index); static void poll_both_ends (uintptr_t index); static int finished_read (void *vp, int *error); static int free_command (void *vp, int *error); static void fill_dst_range_with_zeroes (struct command *command); +static struct command *create_command (uint64_t offset, size_t len, bool zero, + uintptr_t index); /* There are 'threads' worker threads, each copying work ranges from * src to dst until there are no more work ranges. */ static void * worker_thread (void *indexp) { uintptr_t index = (uintptr_t) indexp; uint64_t offset, count; extent_list exts = empty_vector; @@ -150,71 +152,43 @@ worker_thread (void *indexp) size_t i; assert (0 < count && count <= THREAD_WORK_SIZE); if (extents) src->ops->get_extents (src, index, offset, count, &exts); else default_get_extents (src, index, offset, count, &exts); for (i = 0; i < exts.len; ++i) { struct command *command; - struct buffer *buffer; - char *data; size_t len; if (exts.ptr[i].zero) { /* The source is zero so we can proceed directly to skipping, * fast zeroing, or writing zeroes at the destination. */ - command = calloc (1, sizeof *command); - if (command == NULL) { - perror ("malloc"); - exit (EXIT_FAILURE); - } - command->offset = exts.ptr[i].offset; - command->slice.len = exts.ptr[i].length; - command->slice.base = 0; - command->index = index; + command = create_command (exts.ptr[i].offset, exts.ptr[i].length, + true, index); fill_dst_range_with_zeroes (command); } else /* data */ { /* As the extent might be larger than permitted for a single * command, we may have to split this into multiple read * requests. */ while (exts.ptr[i].length > 0) { len = exts.ptr[i].length; if (len > request_size) len = request_size; - data = malloc (len); - if (data == NULL) { - perror ("malloc"); - exit (EXIT_FAILURE); - } - buffer = calloc (1, sizeof *buffer); - if (buffer == NULL) { - perror ("malloc"); - exit (EXIT_FAILURE); - } - buffer->data = data; - buffer->refs = 1; - command = calloc (1, sizeof *command); - if (command == NULL) { - perror ("malloc"); - exit (EXIT_FAILURE); - } - command->offset = exts.ptr[i].offset; - command->slice.len = len; - command->slice.base = 0; - command->slice.buffer = buffer; - command->index = index; + + command = create_command (exts.ptr[i].offset, len, + false, index); wait_for_request_slots (index); /* Begin the asynch read operation. */ src->ops->asynch_read (src, command, (nbd_completion_callback) { .callback = finished_read, .user_data = command, }); @@ -331,20 +305,67 @@ poll_both_ends (uintptr_t index) else if ((fds[1].revents & POLLOUT) != 0) dst->ops->asynch_notify_write (dst, index); else if ((fds[1].revents & (POLLERR | POLLNVAL)) != 0) { errno = ENOTCONN; perror (dst->name); exit (EXIT_FAILURE); } } } +/* Create a new buffer. */ +static struct buffer* +create_buffer (size_t len) +{ + struct buffer *buffer; + + buffer = calloc (1, sizeof *buffer); + if (buffer == NULL) { + perror ("calloc"); + exit (EXIT_FAILURE); + } + + buffer->data = malloc (len); + if (buffer->data == NULL) { + perror ("malloc"); + exit (EXIT_FAILURE); + } + + buffer->refs = 1; + + return buffer; +} + +/* Create a new command for read or zero. */ +static struct command * +create_command (uint64_t offset, size_t len, bool zero, uintptr_t index) +{ + struct command *command; + + command = calloc (1, sizeof *command); + if (command == NULL) { + perror ("calloc"); + exit (EXIT_FAILURE); + } + + command->offset = offset; + command->slice.len = len; + command->slice.base = 0; + + if (!zero) + command->slice.buffer = create_buffer (len); + + command->index = index; + + return command; +} + /* Create a sub-command of an existing command. This creates a slice * referencing the buffer of the existing command without copying. */ static struct command * create_subcommand (struct command *command, uint64_t offset, size_t len, bool zero) { const uint64_t end = command->offset + command->slice.len; struct command *newcommand; -- 2.35.1
Nir Soffer
2022-Feb-20 12:13 UTC
[Libguestfs] [PATCH libnbd 4/8] copy: Separate finishing a command from freeing it
free_command() was abused as a completion callback. Introduce finish_command() completion callback, so code that want to free a command does not have to add dummy errors. This will make it easier to manage worker state when a command completes. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- copy/multi-thread-copying.c | 34 ++++++++++++++++++++-------------- 1 file changed, 20 insertions(+), 14 deletions(-) diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index 855d1ba4..aa6a9f41 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -126,21 +126,22 @@ multi_thread_copying (void) } } free (workers); } static void wait_for_request_slots (uintptr_t index); static unsigned in_flight (uintptr_t index); static void poll_both_ends (uintptr_t index); static int finished_read (void *vp, int *error); -static int free_command (void *vp, int *error); +static int finished_command (void *vp, int *error); +static void free_command (struct command *command); static void fill_dst_range_with_zeroes (struct command *command); static struct command *create_command (uint64_t offset, size_t len, bool zero, uintptr_t index); /* There are 'threads' worker threads, each copying work ranges from * src to dst until there are no more work ranges. */ static void * worker_thread (void *indexp) { @@ -402,53 +403,52 @@ finished_read (void *vp, int *error) command->offset, strerror (*error)); exit (EXIT_FAILURE); } if (allocated || sparse_size == 0) { /* If sparseness detection (see below) is turned off then we write * the whole command. */ dst->ops->asynch_write (dst, command, (nbd_completion_callback) { - .callback = free_command, + .callback = finished_command, .user_data = command, }); } else { /* Sparseness detection. */ const uint64_t start = command->offset; const uint64_t end = start + command->slice.len; uint64_t last_offset = start; bool last_is_zero = false; uint64_t i; struct command *newcommand; - int dummy = 0; /* Iterate over whole blocks in the command, starting on a block * boundary. */ for (i = MIN (ROUND_UP (start, sparse_size), end); i + sparse_size <= end; i += sparse_size) { if (is_zero (slice_ptr (command->slice) + i-start, sparse_size)) { /* It's a zero range. If the last was a zero too then we do * nothing here which coalesces. Otherwise write the last data * and start a new zero range. */ if (!last_is_zero) { /* Write the last data (if any). */ if (i - last_offset > 0) { newcommand = create_subcommand (command, last_offset, i - last_offset, false); dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { - .callback = free_command, + .callback = finished_command, .user_data = newcommand, }); } /* Start the new zero range. */ last_offset = i; last_is_zero = true; } } else { /* It's data. If the last was data too, do nothing => @@ -471,46 +471,46 @@ finished_read (void *vp, int *error) } /* for i */ /* Write the last_offset up to i. */ if (i - last_offset > 0) { if (!last_is_zero) { newcommand = create_subcommand (command, last_offset, i - last_offset, false); dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { - .callback = free_command, + .callback = finished_command, .user_data = newcommand, }); } else { newcommand = create_subcommand (command, last_offset, i - last_offset, true); fill_dst_range_with_zeroes (newcommand); } } /* There may be an unaligned tail, so write that. */ if (end - i > 0) { newcommand = create_subcommand (command, i, end - i, false); dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { - .callback = free_command, + .callback = finished_command, .user_data = newcommand, }); } /* Free the original command since it has been split into * subcommands and the original is no longer needed. */ - free_command (command, &dummy); + free_command (command); } return 1; /* auto-retires the command */ } /* Fill a range in dst with zeroes. This is called from the copying * loop when we see a zero range in the source. Depending on the * command line flags this could mean: * * --destination-is-zero: @@ -523,29 +523,28 @@ finished_read (void *vp, int *error) * zeroing command or fallback to writing a command * of zeroes. * * This takes over ownership of the command and frees it eventually. */ static void fill_dst_range_with_zeroes (struct command *command) { char *data; size_t data_size; - int dummy = 0; if (destination_is_zero) goto free_and_return; /* Try efficient zeroing. */ if (dst->ops->asynch_zero (dst, command, (nbd_completion_callback) { - .callback = free_command, + .callback = finished_command, .user_data = command, }, allocated)) return; /* Fall back to loop writing zeroes. This is going to be slow * anyway, so do it synchronously. XXX */ data_size = MIN (request_size, command->slice.len); data = calloc (1, data_size); @@ -559,36 +558,43 @@ fill_dst_range_with_zeroes (struct command *command) if (len > data_size) len = data_size; dst->ops->synch_write (dst, data, len, command->offset); command->slice.len -= len; command->offset += len; } free (data); free_and_return: - free_command (command, &dummy); + free_command (command); } static int -free_command (void *vp, int *error) +finished_command (void *vp, int *error) { struct command *command = vp; - struct buffer *buffer = command->slice.buffer; if (*error) { fprintf (stderr, "write at offset %" PRId64 " failed: %s\n", command->offset, strerror (*error)); exit (EXIT_FAILURE); } + free_command (command); + + return 1; /* auto-retires the command */ +} + +static void +free_command (struct command *command) +{ + struct buffer *buffer = command->slice.buffer; + if (buffer != NULL) { if (--buffer->refs == 0) { free (buffer->data); free (buffer); } } free (command); - - return 1; /* auto-retires the command */ } -- 2.35.1
Nir Soffer
2022-Feb-20 12:14 UTC
[Libguestfs] [PATCH libnbd 5/8] copy: Introduce worker struct
I want to keep more info per worker, and using a worker struct is the natural way to do this. This also allows cleaning up the ops-* interface which accepted uintptr_t index while the index is never a pointer. I think the pointer is a result of passing the index to the thread using the void* pointer. The worker struct is used only by the multi-threading-copy module, but in future patch I want to keep the worker pointer in the command, to allow commands to update worker state when they finish. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- copy/file-ops.c | 4 +-- copy/main.c | 6 ++--- copy/multi-thread-copying.c | 49 +++++++++++++++++++------------------ copy/nbd-ops.c | 10 ++++---- copy/nbdcopy.h | 24 +++++++++++------- copy/null-ops.c | 4 +-- copy/pipe-ops.c | 2 +- 7 files changed, 53 insertions(+), 46 deletions(-) diff --git a/copy/file-ops.c b/copy/file-ops.c index aaf04ade..ab378754 100644 --- a/copy/file-ops.c +++ b/copy/file-ops.c @@ -614,27 +614,27 @@ file_asynch_zero (struct rw *rw, struct command *command, { int dummy = 0; if (!file_synch_zero (rw, command->offset, command->slice.len, allocate)) return false; cb.callback (cb.user_data, &dummy); return true; } static unsigned -file_in_flight (struct rw *rw, uintptr_t index) +file_in_flight (struct rw *rw, size_t index) { return 0; } static void -file_get_extents (struct rw *rw, uintptr_t index, +file_get_extents (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret) { ret->len = 0; #ifdef SEEK_HOLE struct rw_file *rwf = (struct rw_file *)rw; static pthread_mutex_t lseek_lock = PTHREAD_MUTEX_INITIALIZER; if (rwf->seek_hole_supported) { diff --git a/copy/main.c b/copy/main.c index 67788b5d..390de1eb 100644 --- a/copy/main.c +++ b/copy/main.c @@ -513,44 +513,44 @@ print_rw (struct rw *rw, const char *prefix, FILE *fp) fprintf (fp, "%s: %s \"%s\"\n", prefix, rw->ops->ops_name, rw->name); fprintf (fp, "%s: size=%" PRIi64 " (%s)\n", prefix, rw->size, human_size (buf, rw->size, NULL)); } /* Default implementation of rw->ops->get_extents for backends which * don't/can't support extents. Also used for the --no-extents case. */ void -default_get_extents (struct rw *rw, uintptr_t index, +default_get_extents (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret) { struct extent e; ret->len = 0; e.offset = offset; e.length = count; e.zero = false; if (extent_list_append (ret, e) == -1) { perror ("realloc"); exit (EXIT_FAILURE); } } /* Implementations of get_polling_fd and asynch_notify_* for backends * which don't support polling. */ void -get_polling_fd_not_supported (struct rw *rw, uintptr_t index, +get_polling_fd_not_supported (struct rw *rw, size_t index, int *fd_rtn, int *direction_rtn) { /* Not an error, this causes poll to ignore the fd. */ *fd_rtn = -1; *direction_rtn = LIBNBD_AIO_DIRECTION_READ; } void -asynch_notify_read_write_not_supported (struct rw *rw, uintptr_t index) +asynch_notify_read_write_not_supported (struct rw *rw, size_t index) { /* nothing */ } diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index aa6a9f41..a1a8d09c 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -70,184 +70,185 @@ get_next_offset (uint64_t *offset, uint64_t *count) * the commands. We might move this into a callback, but those * are called from threads and not necessarily in monotonic order * so the progress bar would move erratically. */ progress_bar (*offset, src->size); } pthread_mutex_unlock (&lock); return r; } -static void *worker_thread (void *ip); +static void *worker_thread (void *wp); void multi_thread_copying (void) { - pthread_t *workers; + struct worker *workers; size_t i; int err; /* Some invariants that should be true if the main program called us * correctly. */ assert (threads > 0); assert (threads == connections); /* if (src.ops == &nbd_ops) assert (src.u.nbd.handles.size == connections); if (dst.ops == &nbd_ops) assert (dst.u.nbd.handles.size == connections); */ assert (src->size != -1); - workers = malloc (sizeof (pthread_t) * threads); + workers = calloc (threads, sizeof *workers); if (workers == NULL) { - perror ("malloc"); + perror ("calloc"); exit (EXIT_FAILURE); } /* Start the worker threads. */ for (i = 0; i < threads; ++i) { - err = pthread_create (&workers[i], NULL, worker_thread, - (void *)(uintptr_t)i); + workers[i].index = i; + err = pthread_create (&workers[i].thread, NULL, worker_thread, + &workers[i]); if (err != 0) { errno = err; perror ("pthread_create"); exit (EXIT_FAILURE); } } /* Wait until all worker threads exit. */ for (i = 0; i < threads; ++i) { - err = pthread_join (workers[i], NULL); + err = pthread_join (workers[i].thread, NULL); if (err != 0) { errno = err; perror ("pthread_join"); exit (EXIT_FAILURE); } } free (workers); } -static void wait_for_request_slots (uintptr_t index); -static unsigned in_flight (uintptr_t index); -static void poll_both_ends (uintptr_t index); +static void wait_for_request_slots (size_t index); +static unsigned in_flight (size_t index); +static void poll_both_ends (size_t index); static int finished_read (void *vp, int *error); static int finished_command (void *vp, int *error); static void free_command (struct command *command); static void fill_dst_range_with_zeroes (struct command *command); static struct command *create_command (uint64_t offset, size_t len, bool zero, - uintptr_t index); + size_t index); /* There are 'threads' worker threads, each copying work ranges from * src to dst until there are no more work ranges. */ static void * -worker_thread (void *indexp) +worker_thread (void *wp) { - uintptr_t index = (uintptr_t) indexp; + struct worker *w = wp; uint64_t offset, count; extent_list exts = empty_vector; while (get_next_offset (&offset, &count)) { size_t i; assert (0 < count && count <= THREAD_WORK_SIZE); if (extents) - src->ops->get_extents (src, index, offset, count, &exts); + src->ops->get_extents (src, w->index, offset, count, &exts); else - default_get_extents (src, index, offset, count, &exts); + default_get_extents (src, w->index, offset, count, &exts); for (i = 0; i < exts.len; ++i) { struct command *command; size_t len; if (exts.ptr[i].zero) { /* The source is zero so we can proceed directly to skipping, * fast zeroing, or writing zeroes at the destination. */ command = create_command (exts.ptr[i].offset, exts.ptr[i].length, - true, index); + true, w->index); fill_dst_range_with_zeroes (command); } else /* data */ { /* As the extent might be larger than permitted for a single * command, we may have to split this into multiple read * requests. */ while (exts.ptr[i].length > 0) { len = exts.ptr[i].length; if (len > request_size) len = request_size; command = create_command (exts.ptr[i].offset, len, - false, index); + false, w->index); - wait_for_request_slots (index); + wait_for_request_slots (w->index); /* Begin the asynch read operation. */ src->ops->asynch_read (src, command, (nbd_completion_callback) { .callback = finished_read, .user_data = command, }); exts.ptr[i].offset += len; exts.ptr[i].length -= len; } } offset += count; count = 0; } /* for extents */ } /* Wait for in flight NBD requests to finish. */ - while (in_flight (index) > 0) - poll_both_ends (index); + while (in_flight (w->index) > 0) + poll_both_ends (w->index); free (exts.ptr); return NULL; } /* If the number of requests in flight exceeds the limit, poll * waiting for at least one request to finish. This enforces * the user --requests option. * * NB: Unfortunately it's not possible to call this from a callback, * since it will deadlock trying to grab the libnbd handle lock. This * means that although the worker thread calls this and enforces the * limit, when we split up requests into subrequests (eg. doing * sparseness detection) we will probably exceed the user request * limit. XXX */ static void -wait_for_request_slots (uintptr_t index) +wait_for_request_slots (size_t index) { while (in_flight (index) >= max_requests) poll_both_ends (index); } /* Count the number of asynchronous commands in flight. */ static unsigned -in_flight (uintptr_t index) +in_flight (size_t index) { return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index); } /* Poll (optional) NBD src and NBD dst, moving the state machine(s) * along. This is a lightly modified nbd_poll. */ static void -poll_both_ends (uintptr_t index) +poll_both_ends (size_t index) { struct pollfd fds[2]; int r, direction; memset (fds, 0, sizeof fds); /* Note: if polling is not supported, this function will * set fd == -1 which poll ignores. */ src->ops->get_polling_fd (src, index, &fds[0].fd, &direction); @@ -331,21 +332,21 @@ create_buffer (size_t len) exit (EXIT_FAILURE); } buffer->refs = 1; return buffer; } /* Create a new command for read or zero. */ static struct command * -create_command (uint64_t offset, size_t len, bool zero, uintptr_t index) +create_command (uint64_t offset, size_t len, bool zero, size_t index) { struct command *command; command = calloc (1, sizeof *command); if (command == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } command->offset = offset; diff --git a/copy/nbd-ops.c b/copy/nbd-ops.c index 10551d3a..dca86e88 100644 --- a/copy/nbd-ops.c +++ b/copy/nbd-ops.c @@ -377,32 +377,32 @@ add_extent (void *vp, const char *metacontext, exit (EXIT_FAILURE); } offset += entries[i]; } return 0; } static unsigned -nbd_ops_in_flight (struct rw *rw, uintptr_t index) +nbd_ops_in_flight (struct rw *rw, size_t index) { struct rw_nbd *rwn = (struct rw_nbd *) rw; /* Since the commands are auto-retired in the callbacks we don't * need to count "done" commands. */ return nbd_aio_in_flight (rwn->handles.ptr[index]); } static void -nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index, +nbd_ops_get_polling_fd (struct rw *rw, size_t index, int *fd, int *direction) { struct rw_nbd *rwn = (struct rw_nbd *) rw; struct nbd_handle *nbd; nbd = rwn->handles.ptr[index]; *fd = nbd_aio_get_fd (nbd); if (*fd == -1) goto error; @@ -412,47 +412,47 @@ nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index, goto error; return; error: fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } static void -nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index) +nbd_ops_asynch_notify_read (struct rw *rw, size_t index) { struct rw_nbd *rwn = (struct rw_nbd *) rw; if (nbd_aio_notify_read (rwn->handles.ptr[index]) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } } static void -nbd_ops_asynch_notify_write (struct rw *rw, uintptr_t index) +nbd_ops_asynch_notify_write (struct rw *rw, size_t index) { struct rw_nbd *rwn = (struct rw_nbd *) rw; if (nbd_aio_notify_write (rwn->handles.ptr[index]) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } } /* This is done synchronously, but that's fine because commands from * the previous work range in flight continue to run, it's difficult * to (sanely) start new work until we have the full list of extents, * and in almost every case the remote NBD server can answer our * request for extents in a single round trip. */ static void -nbd_ops_get_extents (struct rw *rw, uintptr_t index, +nbd_ops_get_extents (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret) { struct rw_nbd *rwn = (struct rw_nbd *) rw; extent_list exts = empty_vector; struct nbd_handle *nbd; nbd = rwn->handles.ptr[index]; ret->len = 0; diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h index c070f8d7..4fe8bee6 100644 --- a/copy/nbdcopy.h +++ b/copy/nbdcopy.h @@ -71,36 +71,42 @@ struct buffer { struct slice { size_t len; /* Length of slice. */ size_t base; /* Start of slice relative to buffer. */ struct buffer *buffer; /* Underlying allocation (may be shared * or NULL). */ }; #define slice_ptr(slice) ((slice).buffer->data + (slice).base) +/* Worker state used by multi-threaded copying. */ +struct worker { + pthread_t thread; + size_t index; +}; + /* Commands for asynchronous operations in flight. * * We don't store the command type (read/write/zero/etc) because it is * implicit in the function being called and because commands * naturally change from read -> write/zero/etc as they progress. * * slice.buffer may be NULL for commands (like zero) that have no * associated data. * * A separate set of commands, slices and buffers is maintained per * thread so no locking is necessary. */ struct command { uint64_t offset; /* Offset relative to start of disk. */ struct slice slice; /* Data slice. */ - uintptr_t index; /* Thread number. */ + size_t index; /* Thread number. */ }; /* List of extents for rw->ops->get_extents. */ struct extent { uint64_t offset; uint64_t length; bool zero; }; DEFINE_VECTOR_TYPE(extent_list, struct extent); @@ -173,51 +179,51 @@ struct rw_ops { struct command *command, nbd_completion_callback cb); /* Asynchronously zero. command->slice.buffer is not used. If not possible, * returns false. 'cb' must be called only if returning true. */ bool (*asynch_zero) (struct rw *rw, struct command *command, nbd_completion_callback cb, bool allocate); /* Number of asynchronous commands in flight for a particular thread. */ - unsigned (*in_flight) (struct rw *rw, uintptr_t index); + unsigned (*in_flight) (struct rw *rw, size_t index); /* Get polling file descriptor and direction, and notify read/write. * For sources which cannot be polled (such as files and pipes) * get_polling_fd returns fd == -1 (NOT an error), and the * asynch_notify_* functions are no-ops. */ - void (*get_polling_fd) (struct rw *rw, uintptr_t index, + void (*get_polling_fd) (struct rw *rw, size_t index, int *fd_rtn, int *direction_rtn); - void (*asynch_notify_read) (struct rw *rw, uintptr_t index); - void (*asynch_notify_write) (struct rw *rw, uintptr_t index); + void (*asynch_notify_read) (struct rw *rw, size_t index); + void (*asynch_notify_write) (struct rw *rw, size_t index); /* Read base:allocation extents metadata for a region of the source. * For local files the same information is read from the kernel. * * Note that qemu-img fetches extents for the entire disk up front, * and we want to avoid doing that because it had very negative * behaviour for certain sources (ie. VDDK). */ - void (*get_extents) (struct rw *rw, uintptr_t index, + void (*get_extents) (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret); }; -extern void default_get_extents (struct rw *rw, uintptr_t index, +extern void default_get_extents (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret); -extern void get_polling_fd_not_supported (struct rw *rw, uintptr_t index, +extern void get_polling_fd_not_supported (struct rw *rw, size_t index, int *fd_rtn, int *direction_rtn); extern void asynch_notify_read_write_not_supported (struct rw *rw, - uintptr_t index); + size_t index); extern bool allocated; extern unsigned connections; extern bool destination_is_zero; extern bool extents; extern bool flush; extern unsigned max_requests; extern bool progress; extern int progress_fd; extern unsigned request_size; diff --git a/copy/null-ops.c b/copy/null-ops.c index 5f1fda50..1218a623 100644 --- a/copy/null-ops.c +++ b/copy/null-ops.c @@ -126,27 +126,27 @@ static bool null_asynch_zero (struct rw *rw, struct command *command, nbd_completion_callback cb, bool allocate) { int dummy = 0; cb.callback (cb.user_data, &dummy); return true; } static unsigned -null_in_flight (struct rw *rw, uintptr_t index) +null_in_flight (struct rw *rw, size_t index) { return 0; } static void -null_get_extents (struct rw *rw, uintptr_t index, +null_get_extents (struct rw *rw, size_t index, uint64_t offset, uint64_t count, extent_list *ret) { abort (); } static struct rw_ops null_ops = { .ops_name = "null_ops", .close = null_close, .is_read_only = null_is_read_only, diff --git a/copy/pipe-ops.c b/copy/pipe-ops.c index f9b8599a..3c8b6c2b 100644 --- a/copy/pipe-ops.c +++ b/copy/pipe-ops.c @@ -147,21 +147,21 @@ pipe_asynch_write (struct rw *rw, } static bool pipe_asynch_zero (struct rw *rw, struct command *command, nbd_completion_callback cb, bool allocate) { return false; /* not supported by pipes */ } static unsigned -pipe_in_flight (struct rw *rw, uintptr_t index) +pipe_in_flight (struct rw *rw, size_t index) { return 0; } static struct rw_ops pipe_ops = { .ops_name = "pipe_ops", .close = pipe_close, .is_read_only = pipe_is_read_only, -- 2.35.1
Nir Soffer
2022-Feb-20 12:14 UTC
[Libguestfs] [PATCH libnbd 6/8] copy: Keep worker pointer in command
Replace the command index with a worker pointer. The nbd-ops access the index via the worker pointer. This allows commands to modify worker state during processing. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- copy/multi-thread-copying.c | 12 ++++++------ copy/nbd-ops.c | 6 +++--- copy/nbdcopy.h | 2 +- 3 files changed, 10 insertions(+), 10 deletions(-) diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index a1a8d09c..8ba721fe 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -131,21 +131,21 @@ multi_thread_copying (void) } static void wait_for_request_slots (size_t index); static unsigned in_flight (size_t index); static void poll_both_ends (size_t index); static int finished_read (void *vp, int *error); static int finished_command (void *vp, int *error); static void free_command (struct command *command); static void fill_dst_range_with_zeroes (struct command *command); static struct command *create_command (uint64_t offset, size_t len, bool zero, - size_t index); + struct worker *worker); /* There are 'threads' worker threads, each copying work ranges from * src to dst until there are no more work ranges. */ static void * worker_thread (void *wp) { struct worker *w = wp; uint64_t offset, count; extent_list exts = empty_vector; @@ -161,36 +161,36 @@ worker_thread (void *wp) for (i = 0; i < exts.len; ++i) { struct command *command; size_t len; if (exts.ptr[i].zero) { /* The source is zero so we can proceed directly to skipping, * fast zeroing, or writing zeroes at the destination. */ command = create_command (exts.ptr[i].offset, exts.ptr[i].length, - true, w->index); + true, w); fill_dst_range_with_zeroes (command); } else /* data */ { /* As the extent might be larger than permitted for a single * command, we may have to split this into multiple read * requests. */ while (exts.ptr[i].length > 0) { len = exts.ptr[i].length; if (len > request_size) len = request_size; command = create_command (exts.ptr[i].offset, len, - false, w->index); + false, w); wait_for_request_slots (w->index); /* Begin the asynch read operation. */ src->ops->asynch_read (src, command, (nbd_completion_callback) { .callback = finished_read, .user_data = command, }); @@ -332,38 +332,38 @@ create_buffer (size_t len) exit (EXIT_FAILURE); } buffer->refs = 1; return buffer; } /* Create a new command for read or zero. */ static struct command * -create_command (uint64_t offset, size_t len, bool zero, size_t index) +create_command (uint64_t offset, size_t len, bool zero, struct worker *worker) { struct command *command; command = calloc (1, sizeof *command); if (command == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } command->offset = offset; command->slice.len = len; command->slice.base = 0; if (!zero) command->slice.buffer = create_buffer (len); - command->index = index; + command->worker = worker; return command; } /* Create a sub-command of an existing command. This creates a slice * referencing the buffer of the existing command without copying. */ static struct command * create_subcommand (struct command *command, uint64_t offset, size_t len, bool zero) @@ -379,21 +379,21 @@ create_subcommand (struct command *command, uint64_t offset, size_t len, perror ("calloc"); exit (EXIT_FAILURE); } newcommand->offset = offset; newcommand->slice.len = len; if (!zero) { newcommand->slice.buffer = command->slice.buffer; newcommand->slice.buffer->refs++; newcommand->slice.base = offset - command->offset; } - newcommand->index = command->index; + newcommand->worker = command->worker; return newcommand; } /* Callback called when src has finished one read command. This * initiates a write. */ static int finished_read (void *vp, int *error) { diff --git a/copy/nbd-ops.c b/copy/nbd-ops.c index dca86e88..adfe4de5 100644 --- a/copy/nbd-ops.c +++ b/copy/nbd-ops.c @@ -296,57 +296,57 @@ nbd_ops_synch_zero (struct rw *rw, uint64_t offset, uint64_t count, return true; } static void nbd_ops_asynch_read (struct rw *rw, struct command *command, nbd_completion_callback cb) { struct rw_nbd *rwn = (struct rw_nbd *) rw; - if (nbd_aio_pread (rwn->handles.ptr[command->index], + if (nbd_aio_pread (rwn->handles.ptr[command->worker->index], slice_ptr (command->slice), command->slice.len, command->offset, cb, 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } } static void nbd_ops_asynch_write (struct rw *rw, struct command *command, nbd_completion_callback cb) { struct rw_nbd *rwn = (struct rw_nbd *) rw; - if (nbd_aio_pwrite (rwn->handles.ptr[command->index], + if (nbd_aio_pwrite (rwn->handles.ptr[command->worker->index], slice_ptr (command->slice), command->slice.len, command->offset, cb, 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } } static bool nbd_ops_asynch_zero (struct rw *rw, struct command *command, nbd_completion_callback cb, bool allocate) { struct rw_nbd *rwn = (struct rw_nbd *) rw; if (!rwn->can_zero) return false; assert (command->slice.len <= UINT32_MAX); - if (nbd_aio_zero (rwn->handles.ptr[command->index], + if (nbd_aio_zero (rwn->handles.ptr[command->worker->index], command->slice.len, command->offset, cb, allocate ? LIBNBD_CMD_FLAG_NO_HOLE : 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } return true; } static int add_extent (void *vp, const char *metacontext, diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h index 4fe8bee6..8027836b 100644 --- a/copy/nbdcopy.h +++ b/copy/nbdcopy.h @@ -92,21 +92,21 @@ struct worker { * * slice.buffer may be NULL for commands (like zero) that have no * associated data. * * A separate set of commands, slices and buffers is maintained per * thread so no locking is necessary. */ struct command { uint64_t offset; /* Offset relative to start of disk. */ struct slice slice; /* Data slice. */ - size_t index; /* Thread number. */ + struct worker *worker; /* The worker owning this command. */ }; /* List of extents for rw->ops->get_extents. */ struct extent { uint64_t offset; uint64_t length; bool zero; }; DEFINE_VECTOR_TYPE(extent_list, struct extent); -- 2.35.1
Nir Soffer
2022-Feb-20 12:14 UTC
[Libguestfs] [PATCH libnbd 7/8] copy: Track worker queue size
Tracking the number of queued bytes per worker will allow optimizing the number of in flight requests based on the actual requests size. The goal is to allow large number of small requests, required to get good performance, and in the same time limit the number of large requests, that can be faster with lower number of requests. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- copy/multi-thread-copying.c | 33 +++++++++++++++++++++++++++++++++ copy/nbdcopy.h | 6 ++++++ 2 files changed, 39 insertions(+) diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index 8ba721fe..620dc571 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -133,20 +133,45 @@ multi_thread_copying (void) static void wait_for_request_slots (size_t index); static unsigned in_flight (size_t index); static void poll_both_ends (size_t index); static int finished_read (void *vp, int *error); static int finished_command (void *vp, int *error); static void free_command (struct command *command); static void fill_dst_range_with_zeroes (struct command *command); static struct command *create_command (uint64_t offset, size_t len, bool zero, struct worker *worker); +/* Tracking worker queue size. + * + * The queue size is increased when starting a read command. + * + * The queue size is decreased when a read command is converted to zero + * subcommand in finished_read(), or when a write command completes in + * finished_command(). + * + * Zero commands are not considered in the queue size since they have no + * payload. + */ + +static inline void +increase_queue_size(struct worker *worker, size_t len) +{ + worker->queue_size += len; +} + +static inline void +decrease_queue_size(struct worker *worker, size_t len) +{ + assert (worker->queue_size >= len); + worker->queue_size -= len; +} + /* There are 'threads' worker threads, each copying work ranges from * src to dst until there are no more work ranges. */ static void * worker_thread (void *wp) { struct worker *w = wp; uint64_t offset, count; extent_list exts = empty_vector; @@ -180,20 +205,23 @@ worker_thread (void *wp) while (exts.ptr[i].length > 0) { len = exts.ptr[i].length; if (len > request_size) len = request_size; command = create_command (exts.ptr[i].offset, len, false, w); wait_for_request_slots (w->index); + /* NOTE: Must increase the queue size after waiting. */ + increase_queue_size (w, len); + /* Begin the asynch read operation. */ src->ops->asynch_read (src, command, (nbd_completion_callback) { .callback = finished_read, .user_data = command, }); exts.ptr[i].offset += len; exts.ptr[i].length -= len; } @@ -455,20 +483,21 @@ finished_read (void *vp, int *error) /* It's data. If the last was data too, do nothing => * coalesce. Otherwise write the last zero range and start a * new data. */ if (last_is_zero) { /* Write the last zero range (if any). */ if (i - last_offset > 0) { newcommand = create_subcommand (command, last_offset, i - last_offset, true); + decrease_queue_size (command->worker, newcommand->slice.len); fill_dst_range_with_zeroes (newcommand); } /* Start the new data. */ last_offset = i; last_is_zero = false; } } } /* for i */ /* Write the last_offset up to i. */ @@ -480,20 +509,21 @@ finished_read (void *vp, int *error) dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { .callback = finished_command, .user_data = newcommand, }); } else { newcommand = create_subcommand (command, last_offset, i - last_offset, true); + decrease_queue_size (command->worker, newcommand->slice.len); fill_dst_range_with_zeroes (newcommand); } } /* There may be an unaligned tail, so write that. */ if (end - i > 0) { newcommand = create_subcommand (command, i, end - i, false); dst->ops->asynch_write (dst, newcommand, (nbd_completion_callback) { .callback = finished_command, @@ -573,20 +603,23 @@ static int finished_command (void *vp, int *error) { struct command *command = vp; if (*error) { fprintf (stderr, "write at offset %" PRId64 " failed: %s\n", command->offset, strerror (*error)); exit (EXIT_FAILURE); } + if (command->slice.buffer) + decrease_queue_size (command->worker, command->slice.len); + free_command (command); return 1; /* auto-retires the command */ } static void free_command (struct command *command) { struct buffer *buffer = command->slice.buffer; diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h index 8027836b..d6bd63f0 100644 --- a/copy/nbdcopy.h +++ b/copy/nbdcopy.h @@ -75,20 +75,26 @@ struct slice { * or NULL). */ }; #define slice_ptr(slice) ((slice).buffer->data + (slice).base) /* Worker state used by multi-threaded copying. */ struct worker { pthread_t thread; size_t index; + + /* The number of bytes queued for in flight read and write requests. + * Tracking this allows issuing many small requests, but limiting the + * number of large requests. + */ + size_t queue_size; }; /* Commands for asynchronous operations in flight. * * We don't store the command type (read/write/zero/etc) because it is * implicit in the function being called and because commands * naturally change from read -> write/zero/etc as they progress. * * slice.buffer may be NULL for commands (like zero) that have no * associated data. -- 2.35.1
Nir Soffer
2022-Feb-20 12:14 UTC
[Libguestfs] [PATCH libnbd 8/8] copy: Adaptive queue size
Limit the size of the copy queue also by the number of queued bytes. This allows using many concurrent small requests, required to get good performance, but limiting the number of large requests that are actually faster with lower concurrency. New --queue-size option added to control the maximum queue size. With 2 MiB we can have 8 256 KiB requests per connection. The default queue size is 16 MiB, to match the default --requests value (64) with the default --request-size (256 KiB). Testing show that using more than 16 256 KiB requests with one connection do not improve anything. The new option will simplify limiting memory usage when using large requests, like this change in virt-v2v: https://github.com/libguestfs/virt-v2v/commit/c943420219fa0ee971fc228aa4d9127c5ce973f7 I tested this change with 3 images: - Fedora 35 + 3g of random data - hopefully simulates a real image - Fully allocated image - the special case when every read command is converted to a write command. - Zero image - the special case when every read command is converted to a zero command. On 2 machines: - laptop: Intel(R) Core(TM) i7-10850H CPU @ 2.70GHz, 12 cpus, 1.5 MiB L2 cache per 2 cpus, 12 MiB L3 cache. - server: Intel(R) Xeon(R) Gold 5218R CPU @ 2.10GHz, 80 cpus, 1 MiB L2 cache per cpu, 27.5 MiB L3 cache. In all cases, both source and destination are served by qemu-nbd, using --cache=none --aio=native. Because qemu-nbd does not support MULTI_CON for writing, we are testing a single connection when copying an to qemu-nbd. I tested also copying to null: since in this case we use 4 connections (these tests are marked with /ro). Results for copying all images on all machines with nbdcopy v1.11.0 and this change. "before" and "after" are average time of 10 runs. image machine before after queue size improvement ==================================================================fedora laptop 3.044 2.129 2m +43% full laptop 4.900 3.136 2m +56% zero laptop 3.147 2.624 2m +20% ------------------------------------------------------------------- fedora server 2.324 2.189 16m +6% full server 3.521 3.380 8m +4% zero server 2.297 2.338 16m -2% ------------------------------------------------------------------- fedora/ro laptop 2.040 1.663 1m +23% fedora/ro server 1.585 1.393 2m +14% Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- copy/main.c | 52 ++++++++++++++++++++++++------------- copy/multi-thread-copying.c | 18 +++++++------ copy/nbdcopy.h | 1 + copy/nbdcopy.pod | 12 +++++++-- 4 files changed, 55 insertions(+), 28 deletions(-) diff --git a/copy/main.c b/copy/main.c index 390de1eb..832f99da 100644 --- a/copy/main.c +++ b/copy/main.c @@ -36,53 +36,55 @@ #include <pthread.h> #include <libnbd.h> #include "ispowerof2.h" #include "human-size.h" #include "version.h" #include "nbdcopy.h" -bool allocated; /* --allocated flag */ -unsigned connections = 4; /* --connections */ -bool destination_is_zero; /* --destination-is-zero flag */ -bool extents = true; /* ! --no-extents flag */ -bool flush; /* --flush flag */ -unsigned max_requests = 64; /* --requests */ -bool progress; /* -p flag */ -int progress_fd = -1; /* --progress=FD */ -unsigned request_size = 1<<18; /* --request-size */ -unsigned sparse_size = 4096; /* --sparse */ -bool synchronous; /* --synchronous flag */ -unsigned threads; /* --threads */ -struct rw *src, *dst; /* The source and destination. */ -bool verbose; /* --verbose flag */ - -const char *prog; /* program name (== basename argv[0]) */ +bool allocated; /* --allocated flag */ +unsigned connections = 4; /* --connections */ +bool destination_is_zero; /* --destination-is-zero flag */ +bool extents = true; /* ! --no-extents flag */ +bool flush; /* --flush flag */ +unsigned max_requests = 64; /* --requests */ +bool progress; /* -p flag */ +int progress_fd = -1; /* --progress=FD */ +unsigned request_size = 1<<18; /* --request-size */ +unsigned queue_size = 16<<20; /* --queue-size */ +unsigned sparse_size = 4096; /* --sparse */ +bool synchronous; /* --synchronous flag */ +unsigned threads; /* --threads */ +struct rw *src, *dst; /* The source and destination. */ +bool verbose; /* --verbose flag */ + +const char *prog; /* program name (== basename argv[0]) */ static bool is_nbd_uri (const char *s); static struct rw *open_local (const char *filename, direction d); static void print_rw (struct rw *rw, const char *prefix, FILE *fp); static void __attribute__((noreturn)) usage (FILE *fp, int exitcode) { fprintf (fp, "\n" "Copy to and from an NBD server:\n" "\n" " nbdcopy [--allocated] [-C N|--connections=N]\n" " [--destination-is-zero|--target-is-zero] [--flush]\n" " [--no-extents] [-p|--progress|--progress=FD]\n" -" [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N]\n" -" [--synchronous] [-T N|--threads=N] [-v|--verbose]\n" +" [--request-size=N] [--queue-size=N] [-R N|--requests=N]\n" +" [-S N|--sparse=N] [--synchronous] [-T N|--threads=N] \n" +" [-v|--verbose]\n" " SOURCE DESTINATION\n" "\n" " SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ]\n" " DESTINATION += null:\n" "\n" "Other options:\n" "\n" " nbdcopy --help\n" " nbdcopy -V|--version\n" "\n" @@ -104,33 +106,35 @@ main (int argc, char *argv[]) { enum { HELP_OPTION = CHAR_MAX + 1, LONG_OPTIONS, SHORT_OPTIONS, ALLOCATED_OPTION, DESTINATION_IS_ZERO_OPTION, FLUSH_OPTION, NO_EXTENTS_OPTION, REQUEST_SIZE_OPTION, + QUEUE_SIZE_OPTION, SYNCHRONOUS_OPTION, }; const char *short_options = "C:pR:S:T:vV"; const struct option long_options[] = { { "help", no_argument, NULL, HELP_OPTION }, { "long-options", no_argument, NULL, LONG_OPTIONS }, { "allocated", no_argument, NULL, ALLOCATED_OPTION }, { "connections", required_argument, NULL, 'C' }, { "destination-is-zero",no_argument, NULL, DESTINATION_IS_ZERO_OPTION }, { "flush", no_argument, NULL, FLUSH_OPTION }, { "no-extents", no_argument, NULL, NO_EXTENTS_OPTION }, { "progress", optional_argument, NULL, 'p' }, { "request-size", required_argument, NULL, REQUEST_SIZE_OPTION }, + { "queue-size", required_argument, NULL, QUEUE_SIZE_OPTION }, { "requests", required_argument, NULL, 'R' }, { "short-options", no_argument, NULL, SHORT_OPTIONS }, { "sparse", required_argument, NULL, 'S' }, { "synchronous", no_argument, NULL, SYNCHRONOUS_OPTION }, { "target-is-zero", no_argument, NULL, DESTINATION_IS_ZERO_OPTION }, { "threads", required_argument, NULL, 'T' }, { "verbose", no_argument, NULL, 'v' }, { "version", no_argument, NULL, 'V' }, { NULL } }; @@ -212,20 +216,28 @@ main (int argc, char *argv[]) } if (request_size < MIN_REQUEST_SIZE || request_size > MAX_REQUEST_SIZE || !is_power_of_2 (request_size)) { fprintf (stderr, "%s: --request-size: must be a power of 2 within %d-%d\n", prog, MIN_REQUEST_SIZE, MAX_REQUEST_SIZE); exit (EXIT_FAILURE); } break; + case QUEUE_SIZE_OPTION: + if (sscanf (optarg, "%u", &queue_size) != 1) { + fprintf (stderr, "%s: --queue-size: could not parse: %s\n", + prog, optarg); + exit (EXIT_FAILURE); + } + break; + case 'R': if (sscanf (optarg, "%u", &max_requests) != 1 || max_requests == 0) { fprintf (stderr, "%s: --requests: could not parse: %s\n", prog, optarg); exit (EXIT_FAILURE); } break; case 'S': if (sscanf (optarg, "%u", &sparse_size) != 1) { @@ -360,20 +372,24 @@ main (int argc, char *argv[]) } if (synchronous) connections = 1; if (connections < threads) threads = connections; if (threads < connections) connections = threads; + /* Adapt queue to size to request size if needed. */ + if (request_size > queue_size) + queue_size = request_size; + /* Truncate the destination to the same size as the source. Only * has an effect on regular files. */ if (dst->ops->truncate) dst->ops->truncate (dst, src->size); /* Check if the source is bigger than the destination, since that * would truncate (ie. lose) data. Copying from smaller to larger * is OK. */ diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index 620dc571..d9f17285 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -123,21 +123,21 @@ multi_thread_copying (void) if (err != 0) { errno = err; perror ("pthread_join"); exit (EXIT_FAILURE); } } free (workers); } -static void wait_for_request_slots (size_t index); +static void wait_for_request_slots (struct worker *worker); static unsigned in_flight (size_t index); static void poll_both_ends (size_t index); static int finished_read (void *vp, int *error); static int finished_command (void *vp, int *error); static void free_command (struct command *command); static void fill_dst_range_with_zeroes (struct command *command); static struct command *create_command (uint64_t offset, size_t len, bool zero, struct worker *worker); /* Tracking worker queue size. @@ -148,20 +148,21 @@ static struct command *create_command (uint64_t offset, size_t len, bool zero, * subcommand in finished_read(), or when a write command completes in * finished_command(). * * Zero commands are not considered in the queue size since they have no * payload. */ static inline void increase_queue_size(struct worker *worker, size_t len) { + assert (worker->queue_size < queue_size); worker->queue_size += len; } static inline void decrease_queue_size(struct worker *worker, size_t len) { assert (worker->queue_size >= len); worker->queue_size -= len; } @@ -203,21 +204,21 @@ worker_thread (void *wp) * requests. */ while (exts.ptr[i].length > 0) { len = exts.ptr[i].length; if (len > request_size) len = request_size; command = create_command (exts.ptr[i].offset, len, false, w); - wait_for_request_slots (w->index); + wait_for_request_slots (w); /* NOTE: Must increase the queue size after waiting. */ increase_queue_size (w, len); /* Begin the asynch read operation. */ src->ops->asynch_read (src, command, (nbd_completion_callback) { .callback = finished_read, .user_data = command, }); @@ -233,36 +234,37 @@ worker_thread (void *wp) } /* Wait for in flight NBD requests to finish. */ while (in_flight (w->index) > 0) poll_both_ends (w->index); free (exts.ptr); return NULL; } -/* If the number of requests in flight exceeds the limit, poll - * waiting for at least one request to finish. This enforces - * the user --requests option. +/* If the number of requests in flight or the number of queued bytes + * exceed the limit, poll waiting for at least one request to finish. + * This enforces the user --requests and --queue-size options. * * NB: Unfortunately it's not possible to call this from a callback, * since it will deadlock trying to grab the libnbd handle lock. This * means that although the worker thread calls this and enforces the * limit, when we split up requests into subrequests (eg. doing * sparseness detection) we will probably exceed the user request * limit. XXX */ static void -wait_for_request_slots (size_t index) +wait_for_request_slots (struct worker *worker) { - while (in_flight (index) >= max_requests) - poll_both_ends (index); + while (in_flight (worker->index) >= max_requests || + worker->queue_size >= queue_size) + poll_both_ends (worker->index); } /* Count the number of asynchronous commands in flight. */ static unsigned in_flight (size_t index) { return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index); } /* Poll (optional) NBD src and NBD dst, moving the state machine(s) diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h index d6bd63f0..19797dfd 100644 --- a/copy/nbdcopy.h +++ b/copy/nbdcopy.h @@ -226,20 +226,21 @@ extern void asynch_notify_read_write_not_supported (struct rw *rw, extern bool allocated; extern unsigned connections; extern bool destination_is_zero; extern bool extents; extern bool flush; extern unsigned max_requests; extern bool progress; extern int progress_fd; extern unsigned request_size; +extern unsigned queue_size; extern unsigned sparse_size; extern bool synchronous; extern unsigned threads; extern bool verbose; extern const char *prog; extern void progress_bar (off_t pos, int64_t size); extern void synch_copying (void); extern void multi_thread_copying (void); diff --git a/copy/nbdcopy.pod b/copy/nbdcopy.pod index f7100935..22ceca8e 100644 --- a/copy/nbdcopy.pod +++ b/copy/nbdcopy.pod @@ -1,21 +1,22 @@ =head1 NAME nbdcopy - copy to and from an NBD server =head1 SYNOPSIS nbdcopy [--allocated] [-C N|--connections=N] [--destination-is-zero|--target-is-zero] [--flush] [--no-extents] [-p|--progress|--progress=FD] - [--request-size=N] [-R N|--requests=N] [-S N|--sparse=N] - [--synchronous] [-T N|--threads=N] [-v|--verbose] + [--request-size=N] [--queue-size=N] [-R N|--requests=N] + [-S N|--sparse=N] [--synchronous] [-T N|--threads=N] + [-v|--verbose] SOURCE DESTINATION SOURCE, DESTINATION := - | FILE | DEVICE | NBD-URI | [ CMD ARGS ... ] DESTINATION += null: nbdcopy --help nbdcopy -V|--version =head1 EXAMPLES @@ -156,20 +157,27 @@ following shell commands: Set the maximum request size in bytes. The maximum value is 32 MiB, specified by the NBD protocol. =item B<-R> N =item B<--requests=>N Set the maximum number of requests in flight per NBD connection. +=item B<--queue-size=>N + +Set the maximum number of bytes to queue for in flight requests. The +default value is 16 MiB, allowing up to 64 256k requests per NBD +connection. If you use larger B<--request-size> you may want to increase +this value. + =item B<-S> N =item B<--sparse=>N Detect all zero blocks of size N (bytes) and make them sparse on the output. You can also turn off sparse detection using S<I<-S 0>>. The default is 4096 bytes. =item B<--synchronous> -- 2.35.1