Richard W.M. Jones
2021-Feb-22 15:42 UTC
[Libguestfs] [PATCH libnbd] copy: Refactor ‘struct rw’.
Make this a (fairly) abstract structure. At least hide the subtype fields from the main program. This change is pure refactoring and doesn?t change the semantics. --- copy/file-ops.c | 164 +++++++++++++++++-- copy/main.c | 315 ++++++++---------------------------- copy/multi-thread-copying.c | 104 ++++++------ copy/nbd-ops.c | 248 +++++++++++++++++++++++++--- copy/nbdcopy.h | 53 +++--- copy/null-ops.c | 50 +++++- copy/pipe-ops.c | 64 +++++++- copy/synch-copying.c | 30 ++-- 8 files changed, 649 insertions(+), 379 deletions(-) diff --git a/copy/file-ops.c b/copy/file-ops.c index 2a239d0..81b08ce 100644 --- a/copy/file-ops.c +++ b/copy/file-ops.c @@ -36,35 +36,159 @@ #include "isaligned.h" #include "nbdcopy.h" +static struct rw_ops file_ops; + +struct rw_file { + struct rw rw; + int fd; + struct stat stat; + bool seek_hole_supported; + int sector_size; +}; + +static bool +seek_hole_supported (int fd) +{ +#ifndef SEEK_HOLE + return false; +#else + off_t r = lseek (fd, 0, SEEK_HOLE); + return r >= 0; +#endif +} + +struct rw * +file_create (const char *name, const struct stat *stat, int fd) +{ + struct rw_file *rwf = calloc (1, sizeof *rwf); + if (rwf == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } + + rwf->rw.ops = &file_ops; + rwf->rw.name = name; + rwf->stat = *stat; + rwf->fd = fd; + + if (S_ISBLK (stat->st_mode)) { + /* Block device. */ + rwf->rw.size = lseek (fd, 0, SEEK_END); + if (rwf->rw.size == -1) { + perror ("lseek"); + exit (EXIT_FAILURE); + } + if (lseek (fd, 0, SEEK_SET) == -1) { + perror ("lseek"); + exit (EXIT_FAILURE); + } + rwf->seek_hole_supported = seek_hole_supported (fd); + rwf->sector_size = 4096; +#ifdef BLKSSZGET + if (ioctl (fd, BLKSSZGET, &rwf->sector_size)) + fprintf (stderr, "warning: cannot get sector size: %s: %m", name); +#endif + } + else if (S_ISREG (stat->st_mode)) { + /* Regular file. */ + rwf->rw.size = stat->st_size; + rwf->seek_hole_supported = seek_hole_supported (fd); + } + else + abort (); + + return (struct rw *) rwf; +} + static void file_close (struct rw *rw) { - if (close (rw->u.local.fd) == -1) { + struct rw_file *rwf = (struct rw_file *)rw; + + if (close (rwf->fd) == -1) { fprintf (stderr, "%s: close: %m\n", rw->name); exit (EXIT_FAILURE); } + free (rw); +} + +static void +file_truncate (struct rw *rw, int64_t size) +{ + struct rw_file *rwf = (struct rw_file *) rw; + + /* If the destination is an ordinary file then the original file + * size doesn't matter. Truncate it to the source size. But + * truncate it to zero first so the file is completely empty and + * sparse. + */ + if (! S_ISREG (rwf->stat.st_mode)) + return; + + if (ftruncate (rwf->fd, 0) == -1 || + ftruncate (rwf->fd, size) == -1) { + perror ("truncate"); + exit (EXIT_FAILURE); + } + rwf->rw.size = size; + + /* We can assume the destination is zero. */ + destination_is_zero = true; } static void file_flush (struct rw *rw) { - if ((S_ISREG (rw->u.local.stat.st_mode) || - S_ISBLK (rw->u.local.stat.st_mode)) && - fsync (rw->u.local.fd) == -1) { + struct rw_file *rwf = (struct rw_file *)rw; + + if ((S_ISREG (rwf->stat.st_mode) || + S_ISBLK (rwf->stat.st_mode)) && + fsync (rwf->fd) == -1) { perror (rw->name); exit (EXIT_FAILURE); } } +static bool +file_is_read_only (struct rw *rw) +{ + /* Permissions are hard, and this is only used as an early check + * before the copy. Proceed with the copy and fail if it fails. + */ + return false; +} + +static bool +file_can_extents (struct rw *rw) +{ +#ifdef SEEK_HOLE + return true; +#else + return false; +#endif +} + +static bool +file_can_multi_conn (struct rw *rw) +{ + return true; +} + +static void +file_start_multi_conn (struct rw *rw) +{ + /* Don't need to do anything for files since we can read/write on a + * single file descriptor. + */ +} + static size_t file_synch_read (struct rw *rw, void *data, size_t len, uint64_t offset) { + struct rw_file *rwf = (struct rw_file *)rw; size_t n = 0; ssize_t r; while (len > 0) { - r = pread (rw->u.local.fd, data, len, offset); + r = pread (rwf->fd, data, len, offset); if (r == -1) { perror (rw->name); exit (EXIT_FAILURE); @@ -85,10 +209,11 @@ static void file_synch_write (struct rw *rw, const void *data, size_t len, uint64_t offset) { + struct rw_file *rwf = (struct rw_file *)rw; ssize_t r; while (len > 0) { - r = pwrite (rw->u.local.fd, data, len, offset); + r = pwrite (rwf->fd, data, len, offset); if (r == -1) { perror (rw->name); exit (EXIT_FAILURE); @@ -103,7 +228,8 @@ static bool file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) { #ifdef FALLOC_FL_PUNCH_HOLE - int fd = rw->u.local.fd; + struct rw_file *rwf = (struct rw_file *)rw; + int fd = rwf->fd; int r; r = fallocate (fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, @@ -121,9 +247,11 @@ file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) static bool file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count) { - if (S_ISREG (rw->u.local.stat.st_mode)) { + struct rw_file *rwf = (struct rw_file *)rw; + + if (S_ISREG (rwf->stat.st_mode)) { #ifdef FALLOC_FL_ZERO_RANGE - int fd = rw->u.local.fd; + int fd = rwf->fd; int r; r = fallocate (fd, FALLOC_FL_ZERO_RANGE, offset, count); @@ -134,10 +262,10 @@ file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count) return true; #endif } - else if (S_ISBLK (rw->u.local.stat.st_mode) && - IS_ALIGNED (offset | count, rw->u.local.sector_size)) { + else if (S_ISBLK (rwf->stat.st_mode) && + IS_ALIGNED (offset | count, rwf->sector_size)) { #ifdef BLKZEROOUT - int fd = rw->u.local.fd; + int fd = rwf->fd; int r; uint64_t range[2] = {offset, count}; @@ -223,11 +351,12 @@ file_get_extents (struct rw *rw, uintptr_t index, ret->size = 0; #ifdef SEEK_HOLE + struct rw_file *rwf = (struct rw_file *)rw; static pthread_mutex_t lseek_lock = PTHREAD_MUTEX_INITIALIZER; - if (rw->u.local.seek_hole_supported) { + if (rwf->seek_hole_supported) { uint64_t end = offset + count; - int fd = rw->u.local.fd; + int fd = rwf->fd; off_t pos; struct extent e; size_t last; @@ -302,9 +431,14 @@ file_get_extents (struct rw *rw, uintptr_t index, default_get_extents (rw, index, offset, count, ret); } -struct rw_ops file_ops = { +static struct rw_ops file_ops = { .ops_name = "file_ops", .close = file_close, + .is_read_only = file_is_read_only, + .can_extents = file_can_extents, + .can_multi_conn = file_can_multi_conn, + .start_multi_conn = file_start_multi_conn, + .truncate = file_truncate, .flush = file_flush, .synch_read = file_synch_read, .synch_write = file_synch_write, diff --git a/copy/main.c b/copy/main.c index 68a6030..6fdc6fd 100644 --- a/copy/main.c +++ b/copy/main.c @@ -53,19 +53,11 @@ int progress_fd = -1; /* --progress=FD */ unsigned sparse_size = 4096; /* --sparse */ bool synchronous; /* --synchronous flag */ unsigned threads; /* --threads */ -struct rw src, dst; /* The source and destination. */ +struct rw *src, *dst; /* The source and destination. */ bool verbose; /* --verbose flag */ static bool is_nbd_uri (const char *s); -static bool seek_hole_supported (int fd); -static void open_null (struct rw *rw); -static int open_local (const char *prog, - const char *filename, bool writing, struct rw *rw); -static void open_nbd_uri (const char *prog, - const char *uri, bool writing, struct rw *rw); -static void open_nbd_subprocess (const char *prog, - const char **argv, size_t argc, - bool writing, struct rw *rw); +static struct rw *open_local (const char *filename, bool writing); static void print_rw (struct rw *rw, const char *prefix, FILE *fp); static void __attribute__((noreturn)) @@ -242,18 +234,18 @@ main (int argc, char *argv[]) found1: connections = 1; /* multi-conn not supported */ - src.name = argv[optind+1]; - open_nbd_subprocess (argv[0], - (const char **) &argv[optind+1], i-optind-1, - false, &src); + src + nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1, + false); optind = i+1; } else { /* Source is not [...]. */ - src.name = argv[optind++]; - if (! is_nbd_uri (src.name)) - src.u.local.fd = open_local (argv[0], src.name, false, &src); + const char *src_name = argv[optind++]; + + if (! is_nbd_uri (src_name)) + src = open_local (src_name, false); else - open_nbd_uri (argv[0], src.name, false, &src); + src = nbd_rw_create_uri (src_name, src_name, false); } if (optind >= argc) @@ -267,48 +259,46 @@ main (int argc, char *argv[]) found2: connections = 1; /* multi-conn not supported */ - dst.name = argv[optind+1]; - open_nbd_subprocess (argv[0], - (const char **) &argv[optind+1], i-optind-1, - true, &dst); + dst + nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1, + true); optind = i+1; } else { /* Destination is not [...] */ - dst.name = argv[optind++]; - if (strcmp (dst.name, "null:") == 0) - open_null (&dst); - else if (! is_nbd_uri (dst.name)) - dst.u.local.fd = open_local (argv[0], dst.name, true /* writing */, &dst); - else { - open_nbd_uri (argv[0], dst.name, true, &dst); + const char *dst_name = argv[optind++]; - /* Obviously this is not going to work if the server is - * advertising read-only, so fail early with a nice error message. - */ - if (nbd_is_read_only (dst.u.nbd.handles.ptr[0])) { - fprintf (stderr, "%s: %s: " - "this NBD server is read-only, cannot write to it\n", - argv[0], dst.name); - exit (EXIT_FAILURE); - } - } + if (strcmp (dst_name, "null:") == 0) + dst = null_create (dst_name); + else if (! is_nbd_uri (dst_name)) + dst = open_local (dst_name, true /* writing */); + else + dst = nbd_rw_create_uri (dst_name, dst_name, true); } /* There must be no extra parameters. */ if (optind != argc) usage (stderr, EXIT_FAILURE); - /* Check we've set the fields of src and dst. */ - assert (src.ops); - assert (src.name); - assert (dst.ops); - assert (dst.name); + /* Check we've created src and dst and set the expected fields. */ + assert (src != NULL); + assert (dst != NULL); + assert (src->ops != NULL); + assert (src->name != NULL); + assert (dst->ops != NULL); + assert (dst->name != NULL); + + /* Obviously this is not going to work if the destination is + * read-only, so fail early with a nice error message. + */ + if (dst->ops->is_read_only (dst)) { + fprintf (stderr, "%s: %s: " + "the destination is read-only, cannot write to it\n", + argv[0], dst->name); + exit (EXIT_FAILURE); + } /* If multi-conn is not supported, force connections to 1. */ - if ((src.ops == &nbd_ops && - ! nbd_can_multi_conn (src.u.nbd.handles.ptr[0])) || - (dst.ops == &nbd_ops && - ! nbd_can_multi_conn (dst.u.nbd.handles.ptr[0]))) + if (! src->ops->can_multi_conn (src) || ! dst->ops->can_multi_conn (dst)) connections = 1; /* Calculate the number of threads from the number of connections. */ @@ -335,44 +325,17 @@ main (int argc, char *argv[]) if (threads < connections) connections = threads; - /* Calculate the source and destination sizes. We set these to -1 - * if the size is not known (because it's a stream). Note that for - * local types, open_local set something in *.size already. + /* Truncate the destination to the same size as the source. Only + * has an effect on regular files. */ - if (src.ops == &nbd_ops) { - src.size = nbd_get_size (src.u.nbd.handles.ptr[0]); - if (src.size == -1) { - fprintf (stderr, "%s: %s: %s\n", argv[0], src.name, nbd_get_error ()); - exit (EXIT_FAILURE); - } - } - if (dst.ops != &nbd_ops && S_ISREG (dst.u.local.stat.st_mode)) { - /* If the destination is an ordinary file then the original file - * size doesn't matter. Truncate it to the source size. But - * truncate it to zero first so the file is completely empty and - * sparse. - */ - dst.size = src.size; - if (ftruncate (dst.u.local.fd, 0) == -1 || - ftruncate (dst.u.local.fd, dst.size) == -1) { - perror ("truncate"); - exit (EXIT_FAILURE); - } - destination_is_zero = true; - } - else if (dst.ops == &nbd_ops) { - dst.size = nbd_get_size (dst.u.nbd.handles.ptr[0]); - if (dst.size == -1) { - fprintf (stderr, "%s: %s: %s\n", argv[0], dst.name, nbd_get_error ()); - exit (EXIT_FAILURE); - } - } + if (dst->ops->truncate) + dst->ops->truncate (dst, src->size); /* Check if the source is bigger than the destination, since that * would truncate (ie. lose) data. Copying from smaller to larger * is OK. */ - if (src.size >= 0 && dst.size >= 0 && src.size > dst.size) { + if (src->size >= 0 && dst->size >= 0 && src->size > dst->size) { fprintf (stderr, "nbdcopy: error: destination size is smaller than source size\n"); exit (EXIT_FAILURE); @@ -383,37 +346,29 @@ main (int argc, char *argv[]) * settings. */ if (verbose) { - print_rw (&src, "nbdcopy: src", stderr); - print_rw (&dst, "nbdcopy: dst", stderr); + print_rw (src, "nbdcopy: src", stderr); + print_rw (dst, "nbdcopy: dst", stderr); fprintf (stderr, "nbdcopy: connections=%u requests=%u threads=%u " "synchronous=%s\n", connections, max_requests, threads, synchronous ? "true" : "false"); } - /* If #connections > 1 then multi-conn is enabled at both ends and - * we need to open further connections. + /* If multi-conn is enabled on either side, then at this point we + * need to ask the backend to open the extra connections. */ if (connections > 1) { assert (threads == connections); - - if (src.ops == &nbd_ops) { - for (i = 1; i < connections; ++i) - open_nbd_uri (argv[0], src.name, false, &src); - assert (src.u.nbd.handles.size == connections); - } - if (dst.ops == &nbd_ops) { - for (i = 1; i < connections; ++i) - open_nbd_uri (argv[0], dst.name, true, &dst); - assert (dst.u.nbd.handles.size == connections); - } + if (src->ops->can_multi_conn (src)) + src->ops->start_multi_conn (src); + if (dst->ops->can_multi_conn (dst)) + dst->ops->start_multi_conn (dst); } /* If the source is NBD and we couldn't negotiate meta * base:allocation then turn off extents. */ - if (src.ops == &nbd_ops && - !nbd_can_meta_context (src.u.nbd.handles.ptr[0], "base:allocation")) + if (! src->ops->can_extents (src)) extents = false; /* Always set the progress bar to 0% at the start of the copy. */ @@ -429,12 +384,12 @@ main (int argc, char *argv[]) progress_bar (1, 1); /* Shut down the source side. */ - src.ops->close (&src); + src->ops->close (src); /* Shut down the destination side. */ if (flush) - dst.ops->flush (&dst); - dst.ops->close (&dst); + dst->ops->flush (dst); + dst->ops->close (dst); exit (EXIT_SUCCESS); } @@ -452,33 +407,25 @@ is_nbd_uri (const char *s) strncmp (s, "nbds+vsock:", 11) == 0; } -/* Open null: (destination only). */ -static void -open_null (struct rw *rw) -{ - rw->ops = &null_ops; - rw->size = INT64_MAX; -} - /* Open a local (non-NBD) file, ie. a file, device, or "-" for stdio. - * Returns the open file descriptor which the caller must close. + * Returns the struct rw * which the caller must close. * * ?writing? is true if this is the destination parameter. * ?rw->u.local.stat? and ?rw->size? return the file stat and size, * but size can be returned as -1 if we don't know the size (if it's a * pipe or stdio). */ -static int -open_local (const char *prog, - const char *filename, bool writing, struct rw *rw) +static struct rw * +open_local (const char *filename, bool writing) { int flags, fd; + struct stat stat; if (strcmp (filename, "-") == 0) { synchronous = true; fd = writing ? STDOUT_FILENO : STDIN_FILENO; if (writing && isatty (fd)) { - fprintf (stderr, "%s: refusing to write to tty\n", prog); + fprintf (stderr, "%s: refusing to write to tty\n", "nbdcopy"); exit (EXIT_FAILURE); } } @@ -502,146 +449,17 @@ open_local (const char *prog, } } - if (fstat (fd, &rw->u.local.stat) == -1) { + if (fstat (fd, &stat) == -1) { perror (filename); exit (EXIT_FAILURE); } - if (S_ISBLK (rw->u.local.stat.st_mode)) { - /* Block device. */ - rw->ops = &file_ops; - rw->size = lseek (fd, 0, SEEK_END); - if (rw->size == -1) { - perror ("lseek"); - exit (EXIT_FAILURE); - } - if (lseek (fd, 0, SEEK_SET) == -1) { - perror ("lseek"); - exit (EXIT_FAILURE); - } - rw->u.local.seek_hole_supported = seek_hole_supported (fd); - rw->u.local.sector_size = 4096; -#ifdef BLKSSZGET - if (ioctl (fd, BLKSSZGET, &rw->u.local.sector_size)) - fprintf (stderr, "warning: cannot get sector size: %s: %m", rw->name); -#endif - } - else if (S_ISREG (rw->u.local.stat.st_mode)) { - /* Regular file. */ - rw->ops = &file_ops; - rw->size = rw->u.local.stat.st_size; - rw->u.local.seek_hole_supported = seek_hole_supported (fd); - } + if (S_ISBLK (stat.st_mode) || S_ISREG (stat.st_mode)) + return file_create (filename, &stat, fd); else { - /* Probably stdin/stdout, a pipe or a socket. Set size == -1 - * which means don't know, and force synchronous mode. - */ - synchronous = true; - rw->ops = &pipe_ops; - rw->size = -1; - rw->u.local.seek_hole_supported = false; + /* Probably stdin/stdout, a pipe or a socket. */ + synchronous = true; /* Force synchronous mode for pipes. */ + return pipe_create (filename, fd); } - - return fd; -} - -static bool -seek_hole_supported (int fd) -{ -#ifndef SEEK_HOLE - return false; -#else - off_t r = lseek (fd, 0, SEEK_HOLE); - return r >= 0; -#endif -} - -static void -open_nbd_uri (const char *prog, - const char *uri, bool writing, struct rw *rw) -{ - struct nbd_handle *nbd; - - rw->ops = &nbd_ops; - nbd = nbd_create (); - if (nbd == NULL) { - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); - exit (EXIT_FAILURE); - } - nbd_set_debug (nbd, verbose); - nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */ - if (extents && !writing && - nbd_add_meta_context (nbd, "base:allocation") == -1) { - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); - exit (EXIT_FAILURE); - } - - if (handles_append (&rw->u.nbd.handles, nbd) == -1) { - perror ("realloc"); - exit (EXIT_FAILURE); - } - - if (nbd_connect_uri (nbd, uri) == -1) { - fprintf (stderr, "%s: %s: %s\n", prog, uri, nbd_get_error ()); - exit (EXIT_FAILURE); - } - - /* Cache these. We assume with multi-conn that each handle will act - * the same way. - */ - rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0; - rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0; -} - -DEFINE_VECTOR_TYPE (const_string_vector, const char *); - -static void -open_nbd_subprocess (const char *prog, - const char **argv, size_t argc, - bool writing, struct rw *rw) -{ - struct nbd_handle *nbd; - const_string_vector copy = empty_vector; - size_t i; - - rw->ops = &nbd_ops; - nbd = nbd_create (); - if (nbd == NULL) { - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); - exit (EXIT_FAILURE); - } - nbd_set_debug (nbd, verbose); - if (extents && !writing && - nbd_add_meta_context (nbd, "base:allocation") == -1) { - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); - exit (EXIT_FAILURE); - } - - if (handles_append (&rw->u.nbd.handles, nbd) == -1) { - memory_error: - perror ("realloc"); - exit (EXIT_FAILURE); - } - - /* We have to copy the args so we can null-terminate them. */ - for (i = 0; i < argc; ++i) { - if (const_string_vector_append (©, argv[i]) == -1) - goto memory_error; - } - if (const_string_vector_append (©, NULL) == -1) - goto memory_error; - - if (nbd_connect_systemd_socket_activation (nbd, (char **) copy.ptr) == -1) { - fprintf (stderr, "%s: %s: %s\n", prog, argv[0], nbd_get_error ()); - exit (EXIT_FAILURE); - } - - /* Cache these. We assume with multi-conn that each handle will act - * the same way. - */ - rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0; - rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0; - - free (copy.ptr); } /* Print an rw struct, used in --verbose mode. */ @@ -650,7 +468,6 @@ print_rw (struct rw *rw, const char *prefix, FILE *fp) { fprintf (fp, "%s: %s \"%s\"\n", prefix, rw->ops->ops_name, rw->name); fprintf (fp, "%s: size=%" PRIi64 "\n", prefix, rw->size); - /* Could print other stuff here, but that's enough for debugging. */ } /* Default implementation of rw->ops->get_extents for backends which diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index 98b4056..4f57054 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -50,13 +50,13 @@ get_next_offset (uint64_t *offset, uint64_t *count) bool r = false; /* returning false means no more work */ pthread_mutex_lock (&lock); - if (next_offset < src.size) { + if (next_offset < src->size) { *offset = next_offset; /* Work out how large this range is. The last range may be * smaller than THREAD_WORK_SIZE. */ - *count = src.size - *offset; + *count = src->size - *offset; if (*count > THREAD_WORK_SIZE) *count = THREAD_WORK_SIZE; @@ -69,7 +69,7 @@ get_next_offset (uint64_t *offset, uint64_t *count) * are called from threads and not necessarily in monotonic order * so the progress bar would move erratically. */ - progress_bar (*offset, dst.size); + progress_bar (*offset, dst->size); } pthread_mutex_unlock (&lock); return r; @@ -89,11 +89,13 @@ multi_thread_copying (void) */ assert (threads > 0); assert (threads == connections); +/* if (src.ops == &nbd_ops) assert (src.u.nbd.handles.size == connections); if (dst.ops == &nbd_ops) assert (dst.u.nbd.handles.size == connections); - assert (src.size != -1); +*/ + assert (src->size != -1); workers = malloc (sizeof (pthread_t) * threads); if (workers == NULL) { @@ -147,9 +149,9 @@ worker_thread (void *indexp) assert (0 < count && count <= THREAD_WORK_SIZE); if (extents) - src.ops->get_extents (&src, index, offset, count, &exts); + src->ops->get_extents (src, index, offset, count, &exts); else - default_get_extents (&src, index, offset, count, &exts); + default_get_extents (src, index, offset, count, &exts); for (i = 0; i < exts.size; ++i) { struct command *command; @@ -208,11 +210,11 @@ worker_thread (void *indexp) wait_for_request_slots (index); /* Begin the asynch read operation. */ - src.ops->asynch_read (&src, command, - (nbd_completion_callback) { - .callback = finished_read, - .user_data = command, - }); + src->ops->asynch_read (src, command, + (nbd_completion_callback) { + .callback = finished_read, + .user_data = command, + }); exts.ptr[i].offset += len; exts.ptr[i].length -= len; @@ -254,7 +256,7 @@ wait_for_request_slots (uintptr_t index) static unsigned in_flight (uintptr_t index) { - return src.ops->in_flight (&src, index) + dst.ops->in_flight (&dst, index); + return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index); } /* Poll (optional) NBD src and NBD dst, moving the state machine(s) @@ -271,7 +273,7 @@ poll_both_ends (uintptr_t index) /* Note: if polling is not supported, this function will * set fd == -1 which poll ignores. */ - src.ops->get_polling_fd (&src, index, &fds[0].fd, &direction); + src->ops->get_polling_fd (src, index, &fds[0].fd, &direction); if (fds[0].fd >= 0) { switch (direction) { case LIBNBD_AIO_DIRECTION_READ: @@ -286,7 +288,7 @@ poll_both_ends (uintptr_t index) } } - dst.ops->get_polling_fd (&dst, index, &fds[1].fd, &direction); + dst->ops->get_polling_fd (dst, index, &fds[1].fd, &direction); if (fds[1].fd >= 0) { switch (direction) { case LIBNBD_AIO_DIRECTION_READ: @@ -311,24 +313,24 @@ poll_both_ends (uintptr_t index) if (fds[0].fd >= 0) { if ((fds[0].revents & (POLLIN | POLLHUP)) != 0) - src.ops->asynch_notify_read (&src, index); + src->ops->asynch_notify_read (src, index); else if ((fds[0].revents & POLLOUT) != 0) - src.ops->asynch_notify_write (&src, index); + src->ops->asynch_notify_write (src, index); else if ((fds[0].revents & (POLLERR | POLLNVAL)) != 0) { errno = ENOTCONN; - perror (src.name); + perror (src->name); exit (EXIT_FAILURE); } } if (fds[1].fd >= 0) { if ((fds[1].revents & (POLLIN | POLLHUP)) != 0) - dst.ops->asynch_notify_read (&dst, index); + dst->ops->asynch_notify_read (dst, index); else if ((fds[1].revents & POLLOUT) != 0) - dst.ops->asynch_notify_write (&dst, index); + dst->ops->asynch_notify_write (dst, index); else if ((fds[1].revents & (POLLERR | POLLNVAL)) != 0) { errno = ENOTCONN; - perror (dst.name); + perror (dst->name); exit (EXIT_FAILURE); } } @@ -377,11 +379,11 @@ finished_read (void *vp, int *error) /* If sparseness detection (see below) is turned off then we write * the whole command. */ - dst.ops->asynch_write (&dst, command, - (nbd_completion_callback) { - .callback = free_command, - .user_data = command, - }); + dst->ops->asynch_write (dst, command, + (nbd_completion_callback) { + .callback = free_command, + .user_data = command, + }); } else { /* Sparseness detection. */ const uint64_t start = command->offset; @@ -408,11 +410,11 @@ finished_read (void *vp, int *error) newcommand = copy_subcommand (command, last_offset, i - last_offset, false); - dst.ops->asynch_write (&dst, newcommand, - (nbd_completion_callback) { - .callback = free_command, - .user_data = newcommand, - }); + dst->ops->asynch_write (dst, newcommand, + (nbd_completion_callback) { + .callback = free_command, + .user_data = newcommand, + }); } /* Start the new hole. */ last_offset = i; @@ -445,11 +447,11 @@ finished_read (void *vp, int *error) newcommand = copy_subcommand (command, last_offset, i - last_offset, false); - dst.ops->asynch_write (&dst, newcommand, - (nbd_completion_callback) { - .callback = free_command, - .user_data = newcommand, - }); + dst->ops->asynch_write (dst, newcommand, + (nbd_completion_callback) { + .callback = free_command, + .user_data = newcommand, + }); } else { newcommand = copy_subcommand (command, @@ -462,11 +464,11 @@ finished_read (void *vp, int *error) /* There may be an unaligned tail, so write that. */ if (end - i > 0) { newcommand = copy_subcommand (command, i, end - i, false); - dst.ops->asynch_write (&dst, newcommand, - (nbd_completion_callback) { - .callback = free_command, - .user_data = newcommand, - }); + dst->ops->asynch_write (dst, newcommand, + (nbd_completion_callback) { + .callback = free_command, + .user_data = newcommand, + }); } /* Free the original command since it has been split into @@ -503,20 +505,20 @@ fill_dst_range_with_zeroes (struct command *command) if (!allocated) { /* Try trimming. */ - if (dst.ops->asynch_trim (&dst, command, - (nbd_completion_callback) { - .callback = free_command, - .user_data = command, - })) + if (dst->ops->asynch_trim (dst, command, + (nbd_completion_callback) { + .callback = free_command, + .user_data = command, + })) return; } /* Try efficient zeroing. */ - if (dst.ops->asynch_zero (&dst, command, - (nbd_completion_callback) { - .callback = free_command, - .user_data = command, - })) + if (dst->ops->asynch_zero (dst, command, + (nbd_completion_callback) { + .callback = free_command, + .user_data = command, + })) return; /* Fall back to loop writing zeroes. This is going to be slow @@ -533,7 +535,7 @@ fill_dst_range_with_zeroes (struct command *command) if (len > MAX_REQUEST_SIZE) len = MAX_REQUEST_SIZE; - dst.ops->synch_write (&dst, data, len, command->offset); + dst->ops->synch_write (dst, data, len, command->offset); command->slice.len -= len; command->offset += len; } diff --git a/copy/nbd-ops.c b/copy/nbd-ops.c index 7b48cbc..24970c2 100644 --- a/copy/nbd-ops.c +++ b/copy/nbd-ops.c @@ -27,45 +27,221 @@ #include "nbdcopy.h" +static struct rw_ops nbd_ops; + +DEFINE_VECTOR_TYPE (handles, struct nbd_handle *) +DEFINE_VECTOR_TYPE (const_string_vector, const char *); + +struct rw_nbd { + struct rw rw; + + /* Because of multi-conn we have to remember enough state in this + * handle in order to be able to open another connection with the + * same parameters after nbd_rw_create* has been called once. + */ + enum { CREATE_URI, CREATE_SUBPROCESS } create_t; + const char *uri; /* For CREATE_URI */ + const_string_vector argv; /* For CREATE_SUBPROCESS */ + bool writing; + + handles handles; /* One handle per connection. */ + bool can_trim, can_zero; /* Cached nbd_can_trim, nbd_can_zero. */ +}; + +static void +open_one_nbd_handle (struct rw_nbd *rwn) +{ + struct nbd_handle *nbd; + + nbd = nbd_create (); + if (nbd == NULL) { + fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + nbd_set_debug (nbd, verbose); + + if (extents && !rwn->writing && + nbd_add_meta_context (nbd, "base:allocation") == -1) { + fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + switch (rwn->create_t) { + case CREATE_URI: + nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */ + + if (nbd_connect_uri (nbd, rwn->uri) == -1) { + fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->uri, nbd_get_error ()); + exit (EXIT_FAILURE); + } + break; + + case CREATE_SUBPROCESS: + if (nbd_connect_systemd_socket_activation (nbd, + (char **) rwn->argv.ptr) + == -1) { + fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->argv.ptr[0], + nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + + /* Cache these. We assume with multi-conn that each handle will act + * the same way. + */ + if (rwn->handles.size == 0) { + rwn->can_trim = nbd_can_trim (nbd) > 0; + rwn->can_zero = nbd_can_zero (nbd) > 0; + rwn->rw.size = nbd_get_size (nbd); + if (rwn->rw.size == -1) { + fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->rw.name, + nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + + if (handles_append (&rwn->handles, nbd) == -1) { + perror ("realloc"); + exit (EXIT_FAILURE); + } +} + +struct rw * +nbd_rw_create_uri (const char *name, const char *uri, bool writing) +{ + struct rw_nbd *rwn = calloc (1, sizeof *rwn); + if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } + + rwn->rw.ops = &nbd_ops; + rwn->rw.name = name; + rwn->create_t = CREATE_URI; + rwn->uri = uri; + rwn->writing = writing; + + open_one_nbd_handle (rwn); + + return (struct rw *) rwn; +} + +struct rw * +nbd_rw_create_subprocess (const char **argv, size_t argc, bool writing) +{ + size_t i; + struct rw_nbd *rwn = calloc (1, sizeof *rwn); + if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } + + rwn->rw.ops = &nbd_ops; + rwn->rw.name = argv[0]; + rwn->create_t = CREATE_SUBPROCESS; + rwn->writing = writing; + + /* We have to copy the args so we can null-terminate them. */ + for (i = 0; i < argc; ++i) { + if (const_string_vector_append (&rwn->argv, argv[i]) == -1) { + memory_error: + perror ("realloc"); + exit (EXIT_FAILURE); + } + } + if (const_string_vector_append (&rwn->argv, NULL) == -1) + goto memory_error; + + open_one_nbd_handle (rwn); + + return (struct rw *) rwn; +} + static void nbd_ops_close (struct rw *rw) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; size_t i; - for (i = 0; i < rw->u.nbd.handles.size; ++i) { - if (nbd_shutdown (rw->u.nbd.handles.ptr[i], 0) == -1) { + for (i = 0; i < rwn->handles.size; ++i) { + if (nbd_shutdown (rwn->handles.ptr[i], 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } - nbd_close (rw->u.nbd.handles.ptr[i]); + nbd_close (rwn->handles.ptr[i]); } - handles_reset (&rw->u.nbd.handles); + handles_reset (&rwn->handles); + const_string_vector_reset (&rwn->argv); + free (rw); } static void nbd_ops_flush (struct rw *rw) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; size_t i; - for (i = 0; i < rw->u.nbd.handles.size; ++i) { - if (nbd_flush (rw->u.nbd.handles.ptr[i], 0) == -1) { + for (i = 0; i < rwn->handles.size; ++i) { + if (nbd_flush (rwn->handles.ptr[i], 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } } } +static bool +nbd_ops_is_read_only (struct rw *rw) +{ + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (rwn->handles.size > 0) + return nbd_is_read_only (rwn->handles.ptr[0]); + else + return false; +} + +static bool +nbd_ops_can_extents (struct rw *rw) +{ + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (rwn->handles.size > 0) + return nbd_can_meta_context (rwn->handles.ptr[0], "base:allocation"); + else + return false; +} + +static bool +nbd_ops_can_multi_conn (struct rw *rw) +{ + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (rwn->handles.size > 0) + return nbd_can_multi_conn (rwn->handles.ptr[0]); + else + return false; +} + +static void +nbd_ops_start_multi_conn (struct rw *rw) +{ + struct rw_nbd *rwn = (struct rw_nbd *) rw; + size_t i; + + for (i = 1; i < connections; ++i) + open_one_nbd_handle (rwn); + + assert (rwn->handles.size == connections); +} + static size_t nbd_ops_synch_read (struct rw *rw, void *data, size_t len, uint64_t offset) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; + if (len > rw->size - offset) len = rw->size - offset; if (len == 0) return 0; - if (nbd_pread (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) { + if (nbd_pread (rwn->handles.ptr[0], data, len, offset, 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } @@ -77,7 +253,9 @@ static void nbd_ops_synch_write (struct rw *rw, const void *data, size_t len, uint64_t offset) { - if (nbd_pwrite (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (nbd_pwrite (rwn->handles.ptr[0], data, len, offset, 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } @@ -86,10 +264,12 @@ nbd_ops_synch_write (struct rw *rw, static bool nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) { - if (!rw->u.nbd.can_trim) + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (!rwn->can_trim) return false; - if (nbd_trim (rw->u.nbd.handles.ptr[0], count, offset, 0) == -1) { + if (nbd_trim (rwn->handles.ptr[0], count, offset, 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } @@ -99,10 +279,12 @@ nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) static bool nbd_ops_synch_zero (struct rw *rw, uint64_t offset, uint64_t count) { - if (!rw->u.nbd.can_zero) + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (!rwn->can_zero) return false; - if (nbd_zero (rw->u.nbd.handles.ptr[0], + if (nbd_zero (rwn->handles.ptr[0], count, offset, LIBNBD_CMD_FLAG_NO_HOLE) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); @@ -115,7 +297,9 @@ nbd_ops_asynch_read (struct rw *rw, struct command *command, nbd_completion_callback cb) { - if (nbd_aio_pread (rw->u.nbd.handles.ptr[command->index], + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (nbd_aio_pread (rwn->handles.ptr[command->index], slice_ptr (command->slice), command->slice.len, command->offset, cb, 0) == -1) { @@ -129,7 +313,9 @@ nbd_ops_asynch_write (struct rw *rw, struct command *command, nbd_completion_callback cb) { - if (nbd_aio_pwrite (rw->u.nbd.handles.ptr[command->index], + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (nbd_aio_pwrite (rwn->handles.ptr[command->index], slice_ptr (command->slice), command->slice.len, command->offset, cb, 0) == -1) { @@ -142,12 +328,14 @@ static bool nbd_ops_asynch_trim (struct rw *rw, struct command *command, nbd_completion_callback cb) { - if (!rw->u.nbd.can_trim) + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (!rwn->can_trim) return false; assert (command->slice.len <= UINT32_MAX); - if (nbd_aio_trim (rw->u.nbd.handles.ptr[command->index], + if (nbd_aio_trim (rwn->handles.ptr[command->index], command->slice.len, command->offset, cb, 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); @@ -160,12 +348,14 @@ static bool nbd_ops_asynch_zero (struct rw *rw, struct command *command, nbd_completion_callback cb) { - if (!rw->u.nbd.can_zero) + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (!rwn->can_zero) return false; assert (command->slice.len <= UINT32_MAX); - if (nbd_aio_zero (rw->u.nbd.handles.ptr[command->index], + if (nbd_aio_zero (rwn->handles.ptr[command->index], command->slice.len, command->offset, cb, LIBNBD_CMD_FLAG_NO_HOLE) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); @@ -212,19 +402,22 @@ add_extent (void *vp, const char *metacontext, static unsigned nbd_ops_in_flight (struct rw *rw, uintptr_t index) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; + /* Since the commands are auto-retired in the callbacks we don't * need to count "done" commands. */ - return nbd_aio_in_flight (rw->u.nbd.handles.ptr[index]); + return nbd_aio_in_flight (rwn->handles.ptr[index]); } static void nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index, int *fd, int *direction) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; struct nbd_handle *nbd; - nbd = rw->u.nbd.handles.ptr[index]; + nbd = rwn->handles.ptr[index]; *fd = nbd_aio_get_fd (nbd); if (*fd == -1) { @@ -240,7 +433,8 @@ nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index, static void nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index) { - if (nbd_aio_notify_read (rw->u.nbd.handles.ptr[index]) == -1) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; + if (nbd_aio_notify_read (rwn->handles.ptr[index]) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } @@ -249,7 +443,8 @@ nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index) static void nbd_ops_asynch_notify_write (struct rw *rw, uintptr_t index) { - if (nbd_aio_notify_write (rw->u.nbd.handles.ptr[index]) == -1) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; + if (nbd_aio_notify_write (rwn->handles.ptr[index]) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } @@ -266,10 +461,11 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index, uint64_t offset, uint64_t count, extent_list *ret) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; extent_list exts = empty_vector; struct nbd_handle *nbd; - nbd = rw->u.nbd.handles.ptr[index]; + nbd = rwn->handles.ptr[index]; ret->size = 0; @@ -331,9 +527,13 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index, free (exts.ptr); } -struct rw_ops nbd_ops = { +static struct rw_ops nbd_ops = { .ops_name = "nbd_ops", .close = nbd_ops_close, + .is_read_only = nbd_ops_is_read_only, + .can_extents = nbd_ops_can_extents, + .can_multi_conn = nbd_ops_can_multi_conn, + .start_multi_conn = nbd_ops_start_multi_conn, .flush = nbd_ops_flush, .synch_read = nbd_ops_synch_read, .synch_write = nbd_ops_synch_write, diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h index 69fac2a..94fbdeb 100644 --- a/copy/nbdcopy.h +++ b/copy/nbdcopy.h @@ -36,8 +36,6 @@ */ #define THREAD_WORK_SIZE (128 * 1024 * 1024) -DEFINE_VECTOR_TYPE (handles, struct nbd_handle *) - /* Abstracts the input (src) and output (dst) parameters on the * command line. */ @@ -45,21 +43,20 @@ struct rw { struct rw_ops *ops; /* Operations. */ const char *name; /* Printable name, for error messages etc. */ int64_t size; /* May be -1 for streams. */ - union { - struct { /* For files and pipes. */ - int fd; - struct stat stat; - bool seek_hole_supported; - int sector_size; - } local; - struct { - handles handles; /* For NBD, one handle per connection. */ - bool can_trim, can_zero; /* Cached nbd_can_trim, nbd_can_zero. */ - } nbd; - } u; + /* Followed by private data for the particular subtype. */ }; -extern struct rw src, dst; +extern struct rw *src, *dst; + +/* Create subtypes. */ +extern struct rw *file_create (const char *name, + const struct stat *stat, int fd); +extern struct rw *nbd_rw_create_uri (const char *name, + const char *uri, bool writing); +extern struct rw *nbd_rw_create_subprocess (const char **argv, size_t argc, + bool writing); +extern struct rw *null_create (const char *name); +extern struct rw *pipe_create (const char *name, int fd); /* Underlying data buffers. */ struct buffer { @@ -117,6 +114,28 @@ struct rw_ops { /* Close the connection and free up associated resources. */ void (*close) (struct rw *rw); + /* Return true if this is a read-only connection. */ + bool (*is_read_only) (struct rw *rw); + + /* For source only, does it support reading extents? */ + bool (*can_extents) (struct rw *rw); + + /* Return true if the connection can do multi-conn. This is true + * for files, false for streams, and passed through for NBD. + */ + bool (*can_multi_conn) (struct rw *rw); + + /* For multi-conn capable backends, before copying we must call this + * to begin multi-conn. For NBD this means opening the additional + * connections. + */ + void (*start_multi_conn) (struct rw *rw); + + /* Truncate, only called on output files. This callback can be NULL + * for types that don't support this. + */ + void (*truncate) (struct rw *rw, int64_t size); + /* Flush pending writes to permanent storage. */ void (*flush) (struct rw *rw); @@ -188,10 +207,6 @@ struct rw_ops { uint64_t offset, uint64_t count, extent_list *ret); }; -extern struct rw_ops file_ops; -extern struct rw_ops nbd_ops; -extern struct rw_ops pipe_ops; -extern struct rw_ops null_ops; extern void default_get_extents (struct rw *rw, uintptr_t index, uint64_t offset, uint64_t count, diff --git a/copy/null-ops.c b/copy/null-ops.c index b2ca66f..3262fb5 100644 --- a/copy/null-ops.c +++ b/copy/null-ops.c @@ -30,10 +30,28 @@ * and fast zeroing. */ +static struct rw_ops null_ops; + +struct rw_null { + struct rw rw; +}; + +struct rw * +null_create (const char *name) +{ + struct rw_null *rw = calloc (1, sizeof *rw); + if (rw == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } + + rw->rw.ops = &null_ops; + rw->rw.name = name; + rw->rw.size = INT64_MAX; + return (struct rw *) rw; +} + static void null_close (struct rw *rw) { - /* nothing */ + free (rw); } static void @@ -42,6 +60,30 @@ null_flush (struct rw *rw) /* nothing */ } +static bool +null_is_read_only (struct rw *rw) +{ + return false; +} + +static bool +null_can_extents (struct rw *rw) +{ + return false; +} + +static bool +null_can_multi_conn (struct rw *rw) +{ + return true; +} + +static void +null_start_multi_conn (struct rw *rw) +{ + /* nothing */ +} + static size_t null_synch_read (struct rw *rw, void *data, size_t len, uint64_t offset) @@ -126,9 +168,13 @@ null_get_extents (struct rw *rw, uintptr_t index, abort (); } -struct rw_ops null_ops = { +static struct rw_ops null_ops = { .ops_name = "null_ops", .close = null_close, + .is_read_only = null_is_read_only, + .can_extents = null_can_extents, + .can_multi_conn = null_can_multi_conn, + .start_multi_conn = null_start_multi_conn, .flush = null_flush, .synch_read = null_synch_read, .synch_write = null_synch_write, diff --git a/copy/pipe-ops.c b/copy/pipe-ops.c index d127dad..286e6c0 100644 --- a/copy/pipe-ops.c +++ b/copy/pipe-ops.c @@ -26,10 +26,33 @@ #include "nbdcopy.h" +static struct rw_ops pipe_ops; + +struct rw_pipe { + struct rw rw; + int fd; +}; + +struct rw * +pipe_create (const char *name, int fd) +{ + struct rw_pipe *rwp = calloc (1, sizeof *rwp); + if (rwp == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } + + /* Set size == -1 which means don't know. */ + rwp->rw.ops = &pipe_ops; + rwp->rw.name = name; + rwp->rw.size = -1; + rwp->fd = fd; + return (struct rw *) rwp; +} + static void pipe_close (struct rw *rw) { - if (close (rw->u.local.fd) == -1) { + struct rw_pipe *rwp = (struct rw_pipe *) rw; + + if (close (rwp->fd) == -1) { fprintf (stderr, "%s: close: %m\n", rw->name); exit (EXIT_FAILURE); } @@ -43,13 +66,39 @@ pipe_flush (struct rw *rw) */ } +static bool +pipe_is_read_only (struct rw *rw) +{ + return false; +} + +static bool +pipe_can_extents (struct rw *rw) +{ + return false; +} + +static bool +pipe_can_multi_conn (struct rw *rw) +{ + return false; +} + +static void +pipe_start_multi_conn (struct rw *rw) +{ + /* Should never be called. */ + abort (); +} + static size_t pipe_synch_read (struct rw *rw, void *data, size_t len, uint64_t offset) { + struct rw_pipe *rwp = (struct rw_pipe *) rw; ssize_t r; - r = read (rw->u.local.fd, data, len); + r = read (rwp->fd, data, len); if (r == -1) { perror (rw->name); exit (EXIT_FAILURE); @@ -61,10 +110,11 @@ static void pipe_synch_write (struct rw *rw, const void *data, size_t len, uint64_t offset) { + struct rw_pipe *rwp = (struct rw_pipe *) rw; ssize_t r; while (len > 0) { - r = write (rw->u.local.fd, data, len); + r = write (rwp->fd, data, len); if (r == -1) { perror (rw->name); exit (EXIT_FAILURE); @@ -109,10 +159,16 @@ pipe_in_flight (struct rw *rw, uintptr_t index) return 0; } -struct rw_ops pipe_ops = { +static struct rw_ops pipe_ops = { .ops_name = "pipe_ops", .close = pipe_close, + + .is_read_only = pipe_is_read_only, + .can_extents = pipe_can_extents, + .can_multi_conn = pipe_can_multi_conn, + .start_multi_conn = pipe_start_multi_conn, + .flush = pipe_flush, .synch_read = pipe_synch_read, diff --git a/copy/synch-copying.c b/copy/synch-copying.c index 2712c10..985f005 100644 --- a/copy/synch-copying.c +++ b/copy/synch-copying.c @@ -38,13 +38,13 @@ synch_copying (void) /* If the source size is unknown then we copy data and cannot use * extent information. */ - if (src.size == -1) { + if (src->size == -1) { size_t r; - while ((r = src.ops->synch_read (&src, buf, sizeof buf, offset)) > 0) { - dst.ops->synch_write (&dst, buf, r, offset); + while ((r = src->ops->synch_read (src, buf, sizeof buf, offset)) > 0) { + dst->ops->synch_write (dst, buf, r, offset); offset += r; - progress_bar (offset, src.size); + progress_bar (offset, src->size); } } @@ -52,47 +52,47 @@ synch_copying (void) * blocks and use extent information to optimize the case. */ else { - while (offset < src.size) { + while (offset < src->size) { extent_list exts = empty_vector; - uint64_t count = src.size - offset; + uint64_t count = src->size - offset; size_t i, r; if (count > sizeof buf) count = sizeof buf; if (extents) - src.ops->get_extents (&src, 0, offset, count, &exts); + src->ops->get_extents (src, 0, offset, count, &exts); else - default_get_extents (&src, 0, offset, count, &exts); + default_get_extents (src, 0, offset, count, &exts); for (i = 0; i < exts.size; ++i) { assert (exts.ptr[i].length <= count); if (exts.ptr[i].zero) { - if (!dst.ops->synch_trim (&dst, offset, exts.ptr[i].length) && - !dst.ops->synch_zero (&dst, offset, exts.ptr[i].length)) { + if (!dst->ops->synch_trim (dst, offset, exts.ptr[i].length) && + !dst->ops->synch_zero (dst, offset, exts.ptr[i].length)) { /* If neither trimming nor efficient zeroing are possible, * write zeroes the hard way. */ memset (buf, 0, exts.ptr[i].length); - dst.ops->synch_write (&dst, buf, exts.ptr[i].length, offset); + dst->ops->synch_write (dst, buf, exts.ptr[i].length, offset); } offset += exts.ptr[i].length; } else /* data */ { - r = src.ops->synch_read (&src, buf, exts.ptr[i].length, offset); + r = src->ops->synch_read (src, buf, exts.ptr[i].length, offset); /* These cases should never happen unless the file is * truncated underneath us. */ if (r == 0 || r < exts.ptr[i].length) { - fprintf (stderr, "%s: unexpected end of file\n", src.name); + fprintf (stderr, "%s: unexpected end of file\n", src->name); exit (EXIT_FAILURE); } - dst.ops->synch_write (&dst, buf, r, offset); + dst->ops->synch_write (dst, buf, r, offset); offset += r; - progress_bar (offset, src.size); + progress_bar (offset, src->size); } } -- 2.29.0.rc2
On Mon, Feb 22, 2021 at 5:42 PM Richard W.M. Jones <rjones at redhat.com> wrote:> > Make this a (fairly) abstract structure. At least hide the subtype > fields from the main program. This change is pure refactoring and > doesn?t change the semantics.Nicer this way, although a little less type safe.> --- > copy/file-ops.c | 164 +++++++++++++++++-- > copy/main.c | 315 ++++++++---------------------------- > copy/multi-thread-copying.c | 104 ++++++------ > copy/nbd-ops.c | 248 +++++++++++++++++++++++++--- > copy/nbdcopy.h | 53 +++--- > copy/null-ops.c | 50 +++++- > copy/pipe-ops.c | 64 +++++++- > copy/synch-copying.c | 30 ++-- > 8 files changed, 649 insertions(+), 379 deletions(-) > > diff --git a/copy/file-ops.c b/copy/file-ops.c > index 2a239d0..81b08ce 100644 > --- a/copy/file-ops.c > +++ b/copy/file-ops.c > @@ -36,35 +36,159 @@ > #include "isaligned.h" > #include "nbdcopy.h" > > +static struct rw_ops file_ops; > + > +struct rw_file { > + struct rw rw; > + int fd; > + struct stat stat; > + bool seek_hole_supported; > + int sector_size; > +}; > + > +static bool > +seek_hole_supported (int fd) > +{ > +#ifndef SEEK_HOLE > + return false; > +#else > + off_t r = lseek (fd, 0, SEEK_HOLE); > + return r >= 0; > +#endif > +} > + > +struct rw * > +file_create (const char *name, const struct stat *stat, int fd) > +{ > + struct rw_file *rwf = calloc (1, sizeof *rwf); > + if (rwf == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } > + > + rwf->rw.ops = &file_ops; > + rwf->rw.name = name; > + rwf->stat = *stat; > + rwf->fd = fd; > + > + if (S_ISBLK (stat->st_mode)) { > + /* Block device. */ > + rwf->rw.size = lseek (fd, 0, SEEK_END); > + if (rwf->rw.size == -1) { > + perror ("lseek"); > + exit (EXIT_FAILURE); > + } > + if (lseek (fd, 0, SEEK_SET) == -1) { > + perror ("lseek"); > + exit (EXIT_FAILURE); > + } > + rwf->seek_hole_supported = seek_hole_supported (fd); > + rwf->sector_size = 4096; > +#ifdef BLKSSZGET > + if (ioctl (fd, BLKSSZGET, &rwf->sector_size)) > + fprintf (stderr, "warning: cannot get sector size: %s: %m", name); > +#endif > + } > + else if (S_ISREG (stat->st_mode)) { > + /* Regular file. */ > + rwf->rw.size = stat->st_size; > + rwf->seek_hole_supported = seek_hole_supported (fd); > + } > + else > + abort (); > + > + return (struct rw *) rwf;We can avoid the cast here by returning &(rwf->rw)> +} > + > static void > file_close (struct rw *rw) > { > - if (close (rw->u.local.fd) == -1) { > + struct rw_file *rwf = (struct rw_file *)rw; > + > + if (close (rwf->fd) == -1) { > fprintf (stderr, "%s: close: %m\n", rw->name); > exit (EXIT_FAILURE); > } > + free (rw); > +} > + > +static void > +file_truncate (struct rw *rw, int64_t size) > +{ > + struct rw_file *rwf = (struct rw_file *) rw; > + > + /* If the destination is an ordinary file then the original file > + * size doesn't matter. Truncate it to the source size. But > + * truncate it to zero first so the file is completely empty and > + * sparse. > + */ > + if (! S_ISREG (rwf->stat.st_mode)) > + return; > + > + if (ftruncate (rwf->fd, 0) == -1 || > + ftruncate (rwf->fd, size) == -1) { > + perror ("truncate"); > + exit (EXIT_FAILURE); > + } > + rwf->rw.size = size; > + > + /* We can assume the destination is zero. */ > + destination_is_zero = true; > } > > static void > file_flush (struct rw *rw) > { > - if ((S_ISREG (rw->u.local.stat.st_mode) || > - S_ISBLK (rw->u.local.stat.st_mode)) && > - fsync (rw->u.local.fd) == -1) { > + struct rw_file *rwf = (struct rw_file *)rw; > + > + if ((S_ISREG (rwf->stat.st_mode) || > + S_ISBLK (rwf->stat.st_mode)) && > + fsync (rwf->fd) == -1) { > perror (rw->name); > exit (EXIT_FAILURE); > } > } > > +static bool > +file_is_read_only (struct rw *rw) > +{ > + /* Permissions are hard, and this is only used as an early check > + * before the copy. Proceed with the copy and fail if it fails. > + */ > + return false; > +} > + > +static bool > +file_can_extents (struct rw *rw) > +{ > +#ifdef SEEK_HOLE > + return true; > +#else > + return false; > +#endif > +} > + > +static bool > +file_can_multi_conn (struct rw *rw) > +{ > + return true; > +} > + > +static void > +file_start_multi_conn (struct rw *rw) > +{ > + /* Don't need to do anything for files since we can read/write on a > + * single file descriptor. > + */ > +} > + > static size_t > file_synch_read (struct rw *rw, > void *data, size_t len, uint64_t offset) > { > + struct rw_file *rwf = (struct rw_file *)rw; > size_t n = 0; > ssize_t r; > > while (len > 0) { > - r = pread (rw->u.local.fd, data, len, offset); > + r = pread (rwf->fd, data, len, offset); > if (r == -1) { > perror (rw->name); > exit (EXIT_FAILURE); > @@ -85,10 +209,11 @@ static void > file_synch_write (struct rw *rw, > const void *data, size_t len, uint64_t offset) > { > + struct rw_file *rwf = (struct rw_file *)rw; > ssize_t r; > > while (len > 0) { > - r = pwrite (rw->u.local.fd, data, len, offset); > + r = pwrite (rwf->fd, data, len, offset); > if (r == -1) { > perror (rw->name); > exit (EXIT_FAILURE); > @@ -103,7 +228,8 @@ static bool > file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) > { > #ifdef FALLOC_FL_PUNCH_HOLE > - int fd = rw->u.local.fd; > + struct rw_file *rwf = (struct rw_file *)rw; > + int fd = rwf->fd; > int r; > > r = fallocate (fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, > @@ -121,9 +247,11 @@ file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) > static bool > file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count) > { > - if (S_ISREG (rw->u.local.stat.st_mode)) { > + struct rw_file *rwf = (struct rw_file *)rw; > + > + if (S_ISREG (rwf->stat.st_mode)) { > #ifdef FALLOC_FL_ZERO_RANGE > - int fd = rw->u.local.fd; > + int fd = rwf->fd; > int r; > > r = fallocate (fd, FALLOC_FL_ZERO_RANGE, offset, count); > @@ -134,10 +262,10 @@ file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count) > return true; > #endif > } > - else if (S_ISBLK (rw->u.local.stat.st_mode) && > - IS_ALIGNED (offset | count, rw->u.local.sector_size)) { > + else if (S_ISBLK (rwf->stat.st_mode) && > + IS_ALIGNED (offset | count, rwf->sector_size)) { > #ifdef BLKZEROOUT > - int fd = rw->u.local.fd; > + int fd = rwf->fd; > int r; > uint64_t range[2] = {offset, count}; > > @@ -223,11 +351,12 @@ file_get_extents (struct rw *rw, uintptr_t index, > ret->size = 0; > > #ifdef SEEK_HOLE > + struct rw_file *rwf = (struct rw_file *)rw; > static pthread_mutex_t lseek_lock = PTHREAD_MUTEX_INITIALIZER; > > - if (rw->u.local.seek_hole_supported) { > + if (rwf->seek_hole_supported) { > uint64_t end = offset + count; > - int fd = rw->u.local.fd; > + int fd = rwf->fd; > off_t pos; > struct extent e; > size_t last; > @@ -302,9 +431,14 @@ file_get_extents (struct rw *rw, uintptr_t index, > default_get_extents (rw, index, offset, count, ret); > } > > -struct rw_ops file_ops = { > +static struct rw_ops file_ops = { > .ops_name = "file_ops", > .close = file_close, > + .is_read_only = file_is_read_only, > + .can_extents = file_can_extents, > + .can_multi_conn = file_can_multi_conn, > + .start_multi_conn = file_start_multi_conn, > + .truncate = file_truncate, > .flush = file_flush, > .synch_read = file_synch_read, > .synch_write = file_synch_write, > diff --git a/copy/main.c b/copy/main.c > index 68a6030..6fdc6fd 100644 > --- a/copy/main.c > +++ b/copy/main.c > @@ -53,19 +53,11 @@ int progress_fd = -1; /* --progress=FD */ > unsigned sparse_size = 4096; /* --sparse */ > bool synchronous; /* --synchronous flag */ > unsigned threads; /* --threads */ > -struct rw src, dst; /* The source and destination. */ > +struct rw *src, *dst; /* The source and destination. */ > bool verbose; /* --verbose flag */ > > static bool is_nbd_uri (const char *s); > -static bool seek_hole_supported (int fd); > -static void open_null (struct rw *rw); > -static int open_local (const char *prog, > - const char *filename, bool writing, struct rw *rw); > -static void open_nbd_uri (const char *prog, > - const char *uri, bool writing, struct rw *rw); > -static void open_nbd_subprocess (const char *prog, > - const char **argv, size_t argc, > - bool writing, struct rw *rw); > +static struct rw *open_local (const char *filename, bool writing); > static void print_rw (struct rw *rw, const char *prefix, FILE *fp); > > static void __attribute__((noreturn)) > @@ -242,18 +234,18 @@ main (int argc, char *argv[]) > > found1: > connections = 1; /* multi-conn not supported */ > - src.name = argv[optind+1]; > - open_nbd_subprocess (argv[0], > - (const char **) &argv[optind+1], i-optind-1, > - false, &src); > + src > + nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1, > + false); > optind = i+1; > } > else { /* Source is not [...]. */ > - src.name = argv[optind++]; > - if (! is_nbd_uri (src.name)) > - src.u.local.fd = open_local (argv[0], src.name, false, &src); > + const char *src_name = argv[optind++]; > + > + if (! is_nbd_uri (src_name)) > + src = open_local (src_name, false); > else > - open_nbd_uri (argv[0], src.name, false, &src); > + src = nbd_rw_create_uri (src_name, src_name, false); > } > > if (optind >= argc) > @@ -267,48 +259,46 @@ main (int argc, char *argv[]) > > found2: > connections = 1; /* multi-conn not supported */ > - dst.name = argv[optind+1]; > - open_nbd_subprocess (argv[0], > - (const char **) &argv[optind+1], i-optind-1, > - true, &dst); > + dst > + nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1, > + true); > optind = i+1; > } > else { /* Destination is not [...] */ > - dst.name = argv[optind++]; > - if (strcmp (dst.name, "null:") == 0) > - open_null (&dst); > - else if (! is_nbd_uri (dst.name)) > - dst.u.local.fd = open_local (argv[0], dst.name, true /* writing */, &dst); > - else { > - open_nbd_uri (argv[0], dst.name, true, &dst); > + const char *dst_name = argv[optind++]; > > - /* Obviously this is not going to work if the server is > - * advertising read-only, so fail early with a nice error message. > - */ > - if (nbd_is_read_only (dst.u.nbd.handles.ptr[0])) { > - fprintf (stderr, "%s: %s: " > - "this NBD server is read-only, cannot write to it\n", > - argv[0], dst.name); > - exit (EXIT_FAILURE); > - } > - } > + if (strcmp (dst_name, "null:") == 0) > + dst = null_create (dst_name); > + else if (! is_nbd_uri (dst_name)) > + dst = open_local (dst_name, true /* writing */); > + else > + dst = nbd_rw_create_uri (dst_name, dst_name, true); > } > > /* There must be no extra parameters. */ > if (optind != argc) > usage (stderr, EXIT_FAILURE); > > - /* Check we've set the fields of src and dst. */ > - assert (src.ops); > - assert (src.name); > - assert (dst.ops); > - assert (dst.name); > + /* Check we've created src and dst and set the expected fields. */ > + assert (src != NULL); > + assert (dst != NULL); > + assert (src->ops != NULL); > + assert (src->name != NULL); > + assert (dst->ops != NULL); > + assert (dst->name != NULL); > + > + /* Obviously this is not going to work if the destination is > + * read-only, so fail early with a nice error message. > + */ > + if (dst->ops->is_read_only (dst)) { > + fprintf (stderr, "%s: %s: " > + "the destination is read-only, cannot write to it\n", > + argv[0], dst->name); > + exit (EXIT_FAILURE); > + } > > /* If multi-conn is not supported, force connections to 1. */ > - if ((src.ops == &nbd_ops && > - ! nbd_can_multi_conn (src.u.nbd.handles.ptr[0])) || > - (dst.ops == &nbd_ops && > - ! nbd_can_multi_conn (dst.u.nbd.handles.ptr[0]))) > + if (! src->ops->can_multi_conn (src) || ! dst->ops->can_multi_conn (dst)) > connections = 1; > > /* Calculate the number of threads from the number of connections. */ > @@ -335,44 +325,17 @@ main (int argc, char *argv[]) > if (threads < connections) > connections = threads; > > - /* Calculate the source and destination sizes. We set these to -1 > - * if the size is not known (because it's a stream). Note that for > - * local types, open_local set something in *.size already. > + /* Truncate the destination to the same size as the source. Only > + * has an effect on regular files. > */ > - if (src.ops == &nbd_ops) { > - src.size = nbd_get_size (src.u.nbd.handles.ptr[0]); > - if (src.size == -1) { > - fprintf (stderr, "%s: %s: %s\n", argv[0], src.name, nbd_get_error ()); > - exit (EXIT_FAILURE); > - } > - } > - if (dst.ops != &nbd_ops && S_ISREG (dst.u.local.stat.st_mode)) { > - /* If the destination is an ordinary file then the original file > - * size doesn't matter. Truncate it to the source size. But > - * truncate it to zero first so the file is completely empty and > - * sparse. > - */ > - dst.size = src.size; > - if (ftruncate (dst.u.local.fd, 0) == -1 || > - ftruncate (dst.u.local.fd, dst.size) == -1) { > - perror ("truncate"); > - exit (EXIT_FAILURE); > - } > - destination_is_zero = true; > - } > - else if (dst.ops == &nbd_ops) { > - dst.size = nbd_get_size (dst.u.nbd.handles.ptr[0]); > - if (dst.size == -1) { > - fprintf (stderr, "%s: %s: %s\n", argv[0], dst.name, nbd_get_error ()); > - exit (EXIT_FAILURE); > - } > - } > + if (dst->ops->truncate) > + dst->ops->truncate (dst, src->size); > > /* Check if the source is bigger than the destination, since that > * would truncate (ie. lose) data. Copying from smaller to larger > * is OK. > */ > - if (src.size >= 0 && dst.size >= 0 && src.size > dst.size) { > + if (src->size >= 0 && dst->size >= 0 && src->size > dst->size) { > fprintf (stderr, > "nbdcopy: error: destination size is smaller than source size\n"); > exit (EXIT_FAILURE); > @@ -383,37 +346,29 @@ main (int argc, char *argv[]) > * settings. > */ > if (verbose) { > - print_rw (&src, "nbdcopy: src", stderr); > - print_rw (&dst, "nbdcopy: dst", stderr); > + print_rw (src, "nbdcopy: src", stderr); > + print_rw (dst, "nbdcopy: dst", stderr); > fprintf (stderr, "nbdcopy: connections=%u requests=%u threads=%u " > "synchronous=%s\n", > connections, max_requests, threads, > synchronous ? "true" : "false"); > } > > - /* If #connections > 1 then multi-conn is enabled at both ends and > - * we need to open further connections. > + /* If multi-conn is enabled on either side, then at this point we > + * need to ask the backend to open the extra connections. > */ > if (connections > 1) { > assert (threads == connections); > - > - if (src.ops == &nbd_ops) { > - for (i = 1; i < connections; ++i) > - open_nbd_uri (argv[0], src.name, false, &src); > - assert (src.u.nbd.handles.size == connections); > - } > - if (dst.ops == &nbd_ops) { > - for (i = 1; i < connections; ++i) > - open_nbd_uri (argv[0], dst.name, true, &dst); > - assert (dst.u.nbd.handles.size == connections); > - } > + if (src->ops->can_multi_conn (src)) > + src->ops->start_multi_conn (src); > + if (dst->ops->can_multi_conn (dst)) > + dst->ops->start_multi_conn (dst); > } > > /* If the source is NBD and we couldn't negotiate meta > * base:allocation then turn off extents. > */ > - if (src.ops == &nbd_ops && > - !nbd_can_meta_context (src.u.nbd.handles.ptr[0], "base:allocation")) > + if (! src->ops->can_extents (src)) > extents = false; > > /* Always set the progress bar to 0% at the start of the copy. */ > @@ -429,12 +384,12 @@ main (int argc, char *argv[]) > progress_bar (1, 1); > > /* Shut down the source side. */ > - src.ops->close (&src); > + src->ops->close (src); > > /* Shut down the destination side. */ > if (flush) > - dst.ops->flush (&dst); > - dst.ops->close (&dst); > + dst->ops->flush (dst); > + dst->ops->close (dst); > > exit (EXIT_SUCCESS); > } > @@ -452,33 +407,25 @@ is_nbd_uri (const char *s) > strncmp (s, "nbds+vsock:", 11) == 0; > } > > -/* Open null: (destination only). */ > -static void > -open_null (struct rw *rw) > -{ > - rw->ops = &null_ops; > - rw->size = INT64_MAX; > -} > - > /* Open a local (non-NBD) file, ie. a file, device, or "-" for stdio. > - * Returns the open file descriptor which the caller must close. > + * Returns the struct rw * which the caller must close. > * > * ?writing? is true if this is the destination parameter. > * ?rw->u.local.stat? and ?rw->size? return the file stat and size, > * but size can be returned as -1 if we don't know the size (if it's a > * pipe or stdio). > */ > -static int > -open_local (const char *prog, > - const char *filename, bool writing, struct rw *rw) > +static struct rw * > +open_local (const char *filename, bool writing) > { > int flags, fd; > + struct stat stat; > > if (strcmp (filename, "-") == 0) { > synchronous = true; > fd = writing ? STDOUT_FILENO : STDIN_FILENO; > if (writing && isatty (fd)) { > - fprintf (stderr, "%s: refusing to write to tty\n", prog); > + fprintf (stderr, "%s: refusing to write to tty\n", "nbdcopy");Is it intended to replace prog with "nbdcopy"?> exit (EXIT_FAILURE); > } > } > @@ -502,146 +449,17 @@ open_local (const char *prog, > } > } > > - if (fstat (fd, &rw->u.local.stat) == -1) { > + if (fstat (fd, &stat) == -1) { > perror (filename); > exit (EXIT_FAILURE); > } > - if (S_ISBLK (rw->u.local.stat.st_mode)) { > - /* Block device. */ > - rw->ops = &file_ops; > - rw->size = lseek (fd, 0, SEEK_END); > - if (rw->size == -1) { > - perror ("lseek"); > - exit (EXIT_FAILURE); > - } > - if (lseek (fd, 0, SEEK_SET) == -1) { > - perror ("lseek"); > - exit (EXIT_FAILURE); > - } > - rw->u.local.seek_hole_supported = seek_hole_supported (fd); > - rw->u.local.sector_size = 4096; > -#ifdef BLKSSZGET > - if (ioctl (fd, BLKSSZGET, &rw->u.local.sector_size)) > - fprintf (stderr, "warning: cannot get sector size: %s: %m", rw->name); > -#endif > - } > - else if (S_ISREG (rw->u.local.stat.st_mode)) { > - /* Regular file. */ > - rw->ops = &file_ops; > - rw->size = rw->u.local.stat.st_size; > - rw->u.local.seek_hole_supported = seek_hole_supported (fd); > - } > + if (S_ISBLK (stat.st_mode) || S_ISREG (stat.st_mode)) > + return file_create (filename, &stat, fd); > else { > - /* Probably stdin/stdout, a pipe or a socket. Set size == -1 > - * which means don't know, and force synchronous mode. > - */ > - synchronous = true; > - rw->ops = &pipe_ops; > - rw->size = -1; > - rw->u.local.seek_hole_supported = false; > + /* Probably stdin/stdout, a pipe or a socket. */ > + synchronous = true; /* Force synchronous mode for pipes. */ > + return pipe_create (filename, fd); > } > - > - return fd; > -} > - > -static bool > -seek_hole_supported (int fd) > -{ > -#ifndef SEEK_HOLE > - return false; > -#else > - off_t r = lseek (fd, 0, SEEK_HOLE); > - return r >= 0; > -#endif > -} > - > -static void > -open_nbd_uri (const char *prog, > - const char *uri, bool writing, struct rw *rw) > -{ > - struct nbd_handle *nbd; > - > - rw->ops = &nbd_ops; > - nbd = nbd_create (); > - if (nbd == NULL) { > - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); > - exit (EXIT_FAILURE); > - } > - nbd_set_debug (nbd, verbose); > - nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */ > - if (extents && !writing && > - nbd_add_meta_context (nbd, "base:allocation") == -1) { > - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); > - exit (EXIT_FAILURE); > - } > - > - if (handles_append (&rw->u.nbd.handles, nbd) == -1) { > - perror ("realloc"); > - exit (EXIT_FAILURE); > - } > - > - if (nbd_connect_uri (nbd, uri) == -1) { > - fprintf (stderr, "%s: %s: %s\n", prog, uri, nbd_get_error ()); > - exit (EXIT_FAILURE); > - } > - > - /* Cache these. We assume with multi-conn that each handle will act > - * the same way. > - */ > - rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0; > - rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0; > -} > - > -DEFINE_VECTOR_TYPE (const_string_vector, const char *); > - > -static void > -open_nbd_subprocess (const char *prog, > - const char **argv, size_t argc, > - bool writing, struct rw *rw) > -{ > - struct nbd_handle *nbd; > - const_string_vector copy = empty_vector; > - size_t i; > - > - rw->ops = &nbd_ops; > - nbd = nbd_create (); > - if (nbd == NULL) { > - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); > - exit (EXIT_FAILURE); > - } > - nbd_set_debug (nbd, verbose); > - if (extents && !writing && > - nbd_add_meta_context (nbd, "base:allocation") == -1) { > - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); > - exit (EXIT_FAILURE); > - } > - > - if (handles_append (&rw->u.nbd.handles, nbd) == -1) { > - memory_error: > - perror ("realloc"); > - exit (EXIT_FAILURE); > - } > - > - /* We have to copy the args so we can null-terminate them. */ > - for (i = 0; i < argc; ++i) { > - if (const_string_vector_append (©, argv[i]) == -1) > - goto memory_error; > - } > - if (const_string_vector_append (©, NULL) == -1) > - goto memory_error; > - > - if (nbd_connect_systemd_socket_activation (nbd, (char **) copy.ptr) == -1) { > - fprintf (stderr, "%s: %s: %s\n", prog, argv[0], nbd_get_error ()); > - exit (EXIT_FAILURE); > - } > - > - /* Cache these. We assume with multi-conn that each handle will act > - * the same way. > - */ > - rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0; > - rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0; > - > - free (copy.ptr); > } > > /* Print an rw struct, used in --verbose mode. */ > @@ -650,7 +468,6 @@ print_rw (struct rw *rw, const char *prefix, FILE *fp) > { > fprintf (fp, "%s: %s \"%s\"\n", prefix, rw->ops->ops_name, rw->name); > fprintf (fp, "%s: size=%" PRIi64 "\n", prefix, rw->size); > - /* Could print other stuff here, but that's enough for debugging. */ > } > > /* Default implementation of rw->ops->get_extents for backends which > diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c > index 98b4056..4f57054 100644 > --- a/copy/multi-thread-copying.c > +++ b/copy/multi-thread-copying.c > @@ -50,13 +50,13 @@ get_next_offset (uint64_t *offset, uint64_t *count) > bool r = false; /* returning false means no more work */ > > pthread_mutex_lock (&lock); > - if (next_offset < src.size) { > + if (next_offset < src->size) { > *offset = next_offset; > > /* Work out how large this range is. The last range may be > * smaller than THREAD_WORK_SIZE. > */ > - *count = src.size - *offset; > + *count = src->size - *offset; > if (*count > THREAD_WORK_SIZE) > *count = THREAD_WORK_SIZE; > > @@ -69,7 +69,7 @@ get_next_offset (uint64_t *offset, uint64_t *count) > * are called from threads and not necessarily in monotonic order > * so the progress bar would move erratically. > */ > - progress_bar (*offset, dst.size); > + progress_bar (*offset, dst->size); > } > pthread_mutex_unlock (&lock); > return r; > @@ -89,11 +89,13 @@ multi_thread_copying (void) > */ > assert (threads > 0); > assert (threads == connections); > +/* > if (src.ops == &nbd_ops) > assert (src.u.nbd.handles.size == connections); > if (dst.ops == &nbd_ops) > assert (dst.u.nbd.handles.size == connections); > - assert (src.size != -1); > +*/ > + assert (src->size != -1); > > workers = malloc (sizeof (pthread_t) * threads); > if (workers == NULL) { > @@ -147,9 +149,9 @@ worker_thread (void *indexp) > > assert (0 < count && count <= THREAD_WORK_SIZE); > if (extents) > - src.ops->get_extents (&src, index, offset, count, &exts); > + src->ops->get_extents (src, index, offset, count, &exts); > else > - default_get_extents (&src, index, offset, count, &exts); > + default_get_extents (src, index, offset, count, &exts); > > for (i = 0; i < exts.size; ++i) { > struct command *command; > @@ -208,11 +210,11 @@ worker_thread (void *indexp) > wait_for_request_slots (index); > > /* Begin the asynch read operation. */ > - src.ops->asynch_read (&src, command, > - (nbd_completion_callback) { > - .callback = finished_read, > - .user_data = command, > - }); > + src->ops->asynch_read (src, command, > + (nbd_completion_callback) { > + .callback = finished_read, > + .user_data = command, > + }); > > exts.ptr[i].offset += len; > exts.ptr[i].length -= len; > @@ -254,7 +256,7 @@ wait_for_request_slots (uintptr_t index) > static unsigned > in_flight (uintptr_t index) > { > - return src.ops->in_flight (&src, index) + dst.ops->in_flight (&dst, index); > + return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index); > } > > /* Poll (optional) NBD src and NBD dst, moving the state machine(s) > @@ -271,7 +273,7 @@ poll_both_ends (uintptr_t index) > /* Note: if polling is not supported, this function will > * set fd == -1 which poll ignores. > */ > - src.ops->get_polling_fd (&src, index, &fds[0].fd, &direction); > + src->ops->get_polling_fd (src, index, &fds[0].fd, &direction); > if (fds[0].fd >= 0) { > switch (direction) { > case LIBNBD_AIO_DIRECTION_READ: > @@ -286,7 +288,7 @@ poll_both_ends (uintptr_t index) > } > } > > - dst.ops->get_polling_fd (&dst, index, &fds[1].fd, &direction); > + dst->ops->get_polling_fd (dst, index, &fds[1].fd, &direction); > if (fds[1].fd >= 0) { > switch (direction) { > case LIBNBD_AIO_DIRECTION_READ: > @@ -311,24 +313,24 @@ poll_both_ends (uintptr_t index) > > if (fds[0].fd >= 0) { > if ((fds[0].revents & (POLLIN | POLLHUP)) != 0) > - src.ops->asynch_notify_read (&src, index); > + src->ops->asynch_notify_read (src, index); > else if ((fds[0].revents & POLLOUT) != 0) > - src.ops->asynch_notify_write (&src, index); > + src->ops->asynch_notify_write (src, index); > else if ((fds[0].revents & (POLLERR | POLLNVAL)) != 0) { > errno = ENOTCONN; > - perror (src.name); > + perror (src->name); > exit (EXIT_FAILURE); > } > } > > if (fds[1].fd >= 0) { > if ((fds[1].revents & (POLLIN | POLLHUP)) != 0) > - dst.ops->asynch_notify_read (&dst, index); > + dst->ops->asynch_notify_read (dst, index); > else if ((fds[1].revents & POLLOUT) != 0) > - dst.ops->asynch_notify_write (&dst, index); > + dst->ops->asynch_notify_write (dst, index); > else if ((fds[1].revents & (POLLERR | POLLNVAL)) != 0) { > errno = ENOTCONN; > - perror (dst.name); > + perror (dst->name); > exit (EXIT_FAILURE); > } > } > @@ -377,11 +379,11 @@ finished_read (void *vp, int *error) > /* If sparseness detection (see below) is turned off then we write > * the whole command. > */ > - dst.ops->asynch_write (&dst, command, > - (nbd_completion_callback) { > - .callback = free_command, > - .user_data = command, > - }); > + dst->ops->asynch_write (dst, command, > + (nbd_completion_callback) { > + .callback = free_command, > + .user_data = command, > + }); > } > else { /* Sparseness detection. */ > const uint64_t start = command->offset; > @@ -408,11 +410,11 @@ finished_read (void *vp, int *error) > newcommand = copy_subcommand (command, > last_offset, i - last_offset, > false); > - dst.ops->asynch_write (&dst, newcommand, > - (nbd_completion_callback) { > - .callback = free_command, > - .user_data = newcommand, > - }); > + dst->ops->asynch_write (dst, newcommand, > + (nbd_completion_callback) { > + .callback = free_command, > + .user_data = newcommand, > + }); > } > /* Start the new hole. */ > last_offset = i; > @@ -445,11 +447,11 @@ finished_read (void *vp, int *error) > newcommand = copy_subcommand (command, > last_offset, i - last_offset, > false); > - dst.ops->asynch_write (&dst, newcommand, > - (nbd_completion_callback) { > - .callback = free_command, > - .user_data = newcommand, > - }); > + dst->ops->asynch_write (dst, newcommand, > + (nbd_completion_callback) { > + .callback = free_command, > + .user_data = newcommand, > + }); > } > else { > newcommand = copy_subcommand (command, > @@ -462,11 +464,11 @@ finished_read (void *vp, int *error) > /* There may be an unaligned tail, so write that. */ > if (end - i > 0) { > newcommand = copy_subcommand (command, i, end - i, false); > - dst.ops->asynch_write (&dst, newcommand, > - (nbd_completion_callback) { > - .callback = free_command, > - .user_data = newcommand, > - }); > + dst->ops->asynch_write (dst, newcommand, > + (nbd_completion_callback) { > + .callback = free_command, > + .user_data = newcommand, > + }); > } > > /* Free the original command since it has been split into > @@ -503,20 +505,20 @@ fill_dst_range_with_zeroes (struct command *command) > > if (!allocated) { > /* Try trimming. */ > - if (dst.ops->asynch_trim (&dst, command, > - (nbd_completion_callback) { > - .callback = free_command, > - .user_data = command, > - })) > + if (dst->ops->asynch_trim (dst, command, > + (nbd_completion_callback) { > + .callback = free_command, > + .user_data = command, > + })) > return; > } > > /* Try efficient zeroing. */ > - if (dst.ops->asynch_zero (&dst, command, > - (nbd_completion_callback) { > - .callback = free_command, > - .user_data = command, > - })) > + if (dst->ops->asynch_zero (dst, command, > + (nbd_completion_callback) { > + .callback = free_command, > + .user_data = command, > + })) > return; > > /* Fall back to loop writing zeroes. This is going to be slow > @@ -533,7 +535,7 @@ fill_dst_range_with_zeroes (struct command *command) > if (len > MAX_REQUEST_SIZE) > len = MAX_REQUEST_SIZE; > > - dst.ops->synch_write (&dst, data, len, command->offset); > + dst->ops->synch_write (dst, data, len, command->offset); > command->slice.len -= len; > command->offset += len; > } > diff --git a/copy/nbd-ops.c b/copy/nbd-ops.c > index 7b48cbc..24970c2 100644 > --- a/copy/nbd-ops.c > +++ b/copy/nbd-ops.c > @@ -27,45 +27,221 @@ > > #include "nbdcopy.h" > > +static struct rw_ops nbd_ops; > + > +DEFINE_VECTOR_TYPE (handles, struct nbd_handle *) > +DEFINE_VECTOR_TYPE (const_string_vector, const char *); > + > +struct rw_nbd { > + struct rw rw; > + > + /* Because of multi-conn we have to remember enough state in this > + * handle in order to be able to open another connection with the > + * same parameters after nbd_rw_create* has been called once. > + */ > + enum { CREATE_URI, CREATE_SUBPROCESS } create_t; > + const char *uri; /* For CREATE_URI */ > + const_string_vector argv; /* For CREATE_SUBPROCESS */ > + bool writing; > + > + handles handles; /* One handle per connection. */ > + bool can_trim, can_zero; /* Cached nbd_can_trim, nbd_can_zero. */ > +}; > + > +static void > +open_one_nbd_handle (struct rw_nbd *rwn) > +{ > + struct nbd_handle *nbd; > + > + nbd = nbd_create (); > + if (nbd == NULL) { > + fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + > + nbd_set_debug (nbd, verbose); > + > + if (extents && !rwn->writing && > + nbd_add_meta_context (nbd, "base:allocation") == -1) { > + fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + > + switch (rwn->create_t) { > + case CREATE_URI: > + nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */ > + > + if (nbd_connect_uri (nbd, rwn->uri) == -1) { > + fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->uri, nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + break; > + > + case CREATE_SUBPROCESS: > + if (nbd_connect_systemd_socket_activation (nbd, > + (char **) rwn->argv.ptr) > + == -1) { > + fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->argv.ptr[0], > + nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + } > + > + /* Cache these. We assume with multi-conn that each handle will act > + * the same way. > + */ > + if (rwn->handles.size == 0) { > + rwn->can_trim = nbd_can_trim (nbd) > 0; > + rwn->can_zero = nbd_can_zero (nbd) > 0; > + rwn->rw.size = nbd_get_size (nbd); > + if (rwn->rw.size == -1) { > + fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->rw.name, > + nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + } > + > + if (handles_append (&rwn->handles, nbd) == -1) { > + perror ("realloc"); > + exit (EXIT_FAILURE); > + } > +} > + > +struct rw * > +nbd_rw_create_uri (const char *name, const char *uri, bool writing) > +{ > + struct rw_nbd *rwn = calloc (1, sizeof *rwn); > + if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } > + > + rwn->rw.ops = &nbd_ops; > + rwn->rw.name = name; > + rwn->create_t = CREATE_URI; > + rwn->uri = uri; > + rwn->writing = writing; > + > + open_one_nbd_handle (rwn); > + > + return (struct rw *) rwn; > +} > + > +struct rw * > +nbd_rw_create_subprocess (const char **argv, size_t argc, bool writing) > +{ > + size_t i; > + struct rw_nbd *rwn = calloc (1, sizeof *rwn); > + if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } > + > + rwn->rw.ops = &nbd_ops; > + rwn->rw.name = argv[0]; > + rwn->create_t = CREATE_SUBPROCESS; > + rwn->writing = writing; > + > + /* We have to copy the args so we can null-terminate them. */ > + for (i = 0; i < argc; ++i) { > + if (const_string_vector_append (&rwn->argv, argv[i]) == -1) { > + memory_error: > + perror ("realloc"); > + exit (EXIT_FAILURE); > + } > + } > + if (const_string_vector_append (&rwn->argv, NULL) == -1) > + goto memory_error; > + > + open_one_nbd_handle (rwn); > + > + return (struct rw *) rwn; > +} > + > static void > nbd_ops_close (struct rw *rw) > { > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > size_t i; > > - for (i = 0; i < rw->u.nbd.handles.size; ++i) { > - if (nbd_shutdown (rw->u.nbd.handles.ptr[i], 0) == -1) { > + for (i = 0; i < rwn->handles.size; ++i) { > + if (nbd_shutdown (rwn->handles.ptr[i], 0) == -1) { > fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); > exit (EXIT_FAILURE); > } > - nbd_close (rw->u.nbd.handles.ptr[i]); > + nbd_close (rwn->handles.ptr[i]); > } > > - handles_reset (&rw->u.nbd.handles); > + handles_reset (&rwn->handles); > + const_string_vector_reset (&rwn->argv); > + free (rw); > } > > static void > nbd_ops_flush (struct rw *rw) > { > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > size_t i; > > - for (i = 0; i < rw->u.nbd.handles.size; ++i) { > - if (nbd_flush (rw->u.nbd.handles.ptr[i], 0) == -1) { > + for (i = 0; i < rwn->handles.size; ++i) { > + if (nbd_flush (rwn->handles.ptr[i], 0) == -1) { > fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); > exit (EXIT_FAILURE); > } > } > } > > +static bool > +nbd_ops_is_read_only (struct rw *rw) > +{ > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > + if (rwn->handles.size > 0) > + return nbd_is_read_only (rwn->handles.ptr[0]); > + else > + return false; > +} > + > +static bool > +nbd_ops_can_extents (struct rw *rw) > +{ > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > + if (rwn->handles.size > 0) > + return nbd_can_meta_context (rwn->handles.ptr[0], "base:allocation"); > + else > + return false; > +} > + > +static bool > +nbd_ops_can_multi_conn (struct rw *rw) > +{ > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > + if (rwn->handles.size > 0) > + return nbd_can_multi_conn (rwn->handles.ptr[0]); > + else > + return false; > +} > + > +static void > +nbd_ops_start_multi_conn (struct rw *rw) > +{ > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + size_t i; > + > + for (i = 1; i < connections; ++i) > + open_one_nbd_handle (rwn); > + > + assert (rwn->handles.size == connections); > +} > + > static size_t > nbd_ops_synch_read (struct rw *rw, > void *data, size_t len, uint64_t offset) > { > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > if (len > rw->size - offset) > len = rw->size - offset; > if (len == 0) > return 0; > > - if (nbd_pread (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) { > + if (nbd_pread (rwn->handles.ptr[0], data, len, offset, 0) == -1) { > fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); > exit (EXIT_FAILURE); > } > @@ -77,7 +253,9 @@ static void > nbd_ops_synch_write (struct rw *rw, > const void *data, size_t len, uint64_t offset) > { > - if (nbd_pwrite (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) { > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > + if (nbd_pwrite (rwn->handles.ptr[0], data, len, offset, 0) == -1) { > fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); > exit (EXIT_FAILURE); > } > @@ -86,10 +264,12 @@ nbd_ops_synch_write (struct rw *rw, > static bool > nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) > { > - if (!rw->u.nbd.can_trim) > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > + if (!rwn->can_trim) > return false; > > - if (nbd_trim (rw->u.nbd.handles.ptr[0], count, offset, 0) == -1) { > + if (nbd_trim (rwn->handles.ptr[0], count, offset, 0) == -1) { > fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); > exit (EXIT_FAILURE); > } > @@ -99,10 +279,12 @@ nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) > static bool > nbd_ops_synch_zero (struct rw *rw, uint64_t offset, uint64_t count) > { > - if (!rw->u.nbd.can_zero) > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > + if (!rwn->can_zero) > return false; > > - if (nbd_zero (rw->u.nbd.handles.ptr[0], > + if (nbd_zero (rwn->handles.ptr[0], > count, offset, LIBNBD_CMD_FLAG_NO_HOLE) == -1) { > fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); > exit (EXIT_FAILURE); > @@ -115,7 +297,9 @@ nbd_ops_asynch_read (struct rw *rw, > struct command *command, > nbd_completion_callback cb) > { > - if (nbd_aio_pread (rw->u.nbd.handles.ptr[command->index], > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > + if (nbd_aio_pread (rwn->handles.ptr[command->index], > slice_ptr (command->slice), > command->slice.len, command->offset, > cb, 0) == -1) { > @@ -129,7 +313,9 @@ nbd_ops_asynch_write (struct rw *rw, > struct command *command, > nbd_completion_callback cb) > { > - if (nbd_aio_pwrite (rw->u.nbd.handles.ptr[command->index], > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > + if (nbd_aio_pwrite (rwn->handles.ptr[command->index], > slice_ptr (command->slice), > command->slice.len, command->offset, > cb, 0) == -1) { > @@ -142,12 +328,14 @@ static bool > nbd_ops_asynch_trim (struct rw *rw, struct command *command, > nbd_completion_callback cb) > { > - if (!rw->u.nbd.can_trim) > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > + if (!rwn->can_trim) > return false; > > assert (command->slice.len <= UINT32_MAX); > > - if (nbd_aio_trim (rw->u.nbd.handles.ptr[command->index], > + if (nbd_aio_trim (rwn->handles.ptr[command->index], > command->slice.len, command->offset, > cb, 0) == -1) { > fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); > @@ -160,12 +348,14 @@ static bool > nbd_ops_asynch_zero (struct rw *rw, struct command *command, > nbd_completion_callback cb) > { > - if (!rw->u.nbd.can_zero) > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > + if (!rwn->can_zero) > return false; > > assert (command->slice.len <= UINT32_MAX); > > - if (nbd_aio_zero (rw->u.nbd.handles.ptr[command->index], > + if (nbd_aio_zero (rwn->handles.ptr[command->index], > command->slice.len, command->offset, > cb, LIBNBD_CMD_FLAG_NO_HOLE) == -1) { > fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); > @@ -212,19 +402,22 @@ add_extent (void *vp, const char *metacontext, > static unsigned > nbd_ops_in_flight (struct rw *rw, uintptr_t index) > { > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + > /* Since the commands are auto-retired in the callbacks we don't > * need to count "done" commands. > */ > - return nbd_aio_in_flight (rw->u.nbd.handles.ptr[index]); > + return nbd_aio_in_flight (rwn->handles.ptr[index]); > } > > static void > nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index, > int *fd, int *direction) > { > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > struct nbd_handle *nbd; > > - nbd = rw->u.nbd.handles.ptr[index]; > + nbd = rwn->handles.ptr[index]; > > *fd = nbd_aio_get_fd (nbd); > if (*fd == -1) { > @@ -240,7 +433,8 @@ nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index, > static void > nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index) > { > - if (nbd_aio_notify_read (rw->u.nbd.handles.ptr[index]) == -1) { > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + if (nbd_aio_notify_read (rwn->handles.ptr[index]) == -1) { > fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); > exit (EXIT_FAILURE); > } > @@ -249,7 +443,8 @@ nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index) > static void > nbd_ops_asynch_notify_write (struct rw *rw, uintptr_t index) > { > - if (nbd_aio_notify_write (rw->u.nbd.handles.ptr[index]) == -1) { > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > + if (nbd_aio_notify_write (rwn->handles.ptr[index]) == -1) { > fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); > exit (EXIT_FAILURE); > } > @@ -266,10 +461,11 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index, > uint64_t offset, uint64_t count, > extent_list *ret) > { > + struct rw_nbd *rwn = (struct rw_nbd *) rw; > extent_list exts = empty_vector; > struct nbd_handle *nbd; > > - nbd = rw->u.nbd.handles.ptr[index]; > + nbd = rwn->handles.ptr[index]; > > ret->size = 0; > > @@ -331,9 +527,13 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index, > free (exts.ptr); > } > > -struct rw_ops nbd_ops = { > +static struct rw_ops nbd_ops = { > .ops_name = "nbd_ops", > .close = nbd_ops_close, > + .is_read_only = nbd_ops_is_read_only, > + .can_extents = nbd_ops_can_extents, > + .can_multi_conn = nbd_ops_can_multi_conn, > + .start_multi_conn = nbd_ops_start_multi_conn, > .flush = nbd_ops_flush, > .synch_read = nbd_ops_synch_read, > .synch_write = nbd_ops_synch_write, > diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h > index 69fac2a..94fbdeb 100644 > --- a/copy/nbdcopy.h > +++ b/copy/nbdcopy.h > @@ -36,8 +36,6 @@ > */ > #define THREAD_WORK_SIZE (128 * 1024 * 1024) > > -DEFINE_VECTOR_TYPE (handles, struct nbd_handle *) > - > /* Abstracts the input (src) and output (dst) parameters on the > * command line. > */ > @@ -45,21 +43,20 @@ struct rw { > struct rw_ops *ops; /* Operations. */ > const char *name; /* Printable name, for error messages etc. */ > int64_t size; /* May be -1 for streams. */ > - union { > - struct { /* For files and pipes. */ > - int fd; > - struct stat stat; > - bool seek_hole_supported; > - int sector_size; > - } local; > - struct { > - handles handles; /* For NBD, one handle per connection. */ > - bool can_trim, can_zero; /* Cached nbd_can_trim, nbd_can_zero. */ > - } nbd; > - } u; > + /* Followed by private data for the particular subtype. */ > }; > > -extern struct rw src, dst; > +extern struct rw *src, *dst; > + > +/* Create subtypes. */ > +extern struct rw *file_create (const char *name, > + const struct stat *stat, int fd); > +extern struct rw *nbd_rw_create_uri (const char *name, > + const char *uri, bool writing); > +extern struct rw *nbd_rw_create_subprocess (const char **argv, size_t argc, > + bool writing); > +extern struct rw *null_create (const char *name); > +extern struct rw *pipe_create (const char *name, int fd); > > /* Underlying data buffers. */ > struct buffer { > @@ -117,6 +114,28 @@ struct rw_ops { > /* Close the connection and free up associated resources. */ > void (*close) (struct rw *rw); > > + /* Return true if this is a read-only connection. */ > + bool (*is_read_only) (struct rw *rw); > + > + /* For source only, does it support reading extents? */ > + bool (*can_extents) (struct rw *rw); > + > + /* Return true if the connection can do multi-conn. This is true > + * for files, false for streams, and passed through for NBD. > + */ > + bool (*can_multi_conn) (struct rw *rw); > + > + /* For multi-conn capable backends, before copying we must call this > + * to begin multi-conn. For NBD this means opening the additional > + * connections. > + */ > + void (*start_multi_conn) (struct rw *rw); > + > + /* Truncate, only called on output files. This callback can be NULL > + * for types that don't support this. > + */ > + void (*truncate) (struct rw *rw, int64_t size); > + > /* Flush pending writes to permanent storage. */ > void (*flush) (struct rw *rw); > > @@ -188,10 +207,6 @@ struct rw_ops { > uint64_t offset, uint64_t count, > extent_list *ret); > }; > -extern struct rw_ops file_ops; > -extern struct rw_ops nbd_ops; > -extern struct rw_ops pipe_ops; > -extern struct rw_ops null_ops; > > extern void default_get_extents (struct rw *rw, uintptr_t index, > uint64_t offset, uint64_t count, > diff --git a/copy/null-ops.c b/copy/null-ops.c > index b2ca66f..3262fb5 100644 > --- a/copy/null-ops.c > +++ b/copy/null-ops.c > @@ -30,10 +30,28 @@ > * and fast zeroing. > */ > > +static struct rw_ops null_ops; > + > +struct rw_null { > + struct rw rw; > +}; > + > +struct rw * > +null_create (const char *name) > +{ > + struct rw_null *rw = calloc (1, sizeof *rw); > + if (rw == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } > + > + rw->rw.ops = &null_ops; > + rw->rw.name = name; > + rw->rw.size = INT64_MAX; > + return (struct rw *) rw; > +} > + > static void > null_close (struct rw *rw) > { > - /* nothing */ > + free (rw); > } > > static void > @@ -42,6 +60,30 @@ null_flush (struct rw *rw) > /* nothing */ > } > > +static bool > +null_is_read_only (struct rw *rw) > +{ > + return false; > +} > + > +static bool > +null_can_extents (struct rw *rw) > +{ > + return false; > +} > + > +static bool > +null_can_multi_conn (struct rw *rw) > +{ > + return true; > +} > + > +static void > +null_start_multi_conn (struct rw *rw) > +{ > + /* nothing */ > +} > + > static size_t > null_synch_read (struct rw *rw, > void *data, size_t len, uint64_t offset) > @@ -126,9 +168,13 @@ null_get_extents (struct rw *rw, uintptr_t index, > abort (); > } > > -struct rw_ops null_ops = { > +static struct rw_ops null_ops = { > .ops_name = "null_ops", > .close = null_close, > + .is_read_only = null_is_read_only, > + .can_extents = null_can_extents, > + .can_multi_conn = null_can_multi_conn, > + .start_multi_conn = null_start_multi_conn, > .flush = null_flush, > .synch_read = null_synch_read, > .synch_write = null_synch_write, > diff --git a/copy/pipe-ops.c b/copy/pipe-ops.c > index d127dad..286e6c0 100644 > --- a/copy/pipe-ops.c > +++ b/copy/pipe-ops.c > @@ -26,10 +26,33 @@ > > #include "nbdcopy.h" > > +static struct rw_ops pipe_ops; > + > +struct rw_pipe { > + struct rw rw; > + int fd; > +}; > + > +struct rw * > +pipe_create (const char *name, int fd) > +{ > + struct rw_pipe *rwp = calloc (1, sizeof *rwp); > + if (rwp == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } > + > + /* Set size == -1 which means don't know. */ > + rwp->rw.ops = &pipe_ops; > + rwp->rw.name = name; > + rwp->rw.size = -1; > + rwp->fd = fd; > + return (struct rw *) rwp; > +} > + > static void > pipe_close (struct rw *rw) > { > - if (close (rw->u.local.fd) == -1) { > + struct rw_pipe *rwp = (struct rw_pipe *) rw; > + > + if (close (rwp->fd) == -1) { > fprintf (stderr, "%s: close: %m\n", rw->name); > exit (EXIT_FAILURE); > } > @@ -43,13 +66,39 @@ pipe_flush (struct rw *rw) > */ > } > > +static bool > +pipe_is_read_only (struct rw *rw) > +{ > + return false; > +} > + > +static bool > +pipe_can_extents (struct rw *rw) > +{ > + return false; > +} > + > +static bool > +pipe_can_multi_conn (struct rw *rw) > +{ > + return false; > +} > + > +static void > +pipe_start_multi_conn (struct rw *rw) > +{ > + /* Should never be called. */ > + abort (); > +} > + > static size_t > pipe_synch_read (struct rw *rw, > void *data, size_t len, uint64_t offset) > { > + struct rw_pipe *rwp = (struct rw_pipe *) rw; > ssize_t r; > > - r = read (rw->u.local.fd, data, len); > + r = read (rwp->fd, data, len); > if (r == -1) { > perror (rw->name); > exit (EXIT_FAILURE); > @@ -61,10 +110,11 @@ static void > pipe_synch_write (struct rw *rw, > const void *data, size_t len, uint64_t offset) > { > + struct rw_pipe *rwp = (struct rw_pipe *) rw; > ssize_t r; > > while (len > 0) { > - r = write (rw->u.local.fd, data, len); > + r = write (rwp->fd, data, len); > if (r == -1) { > perror (rw->name); > exit (EXIT_FAILURE); > @@ -109,10 +159,16 @@ pipe_in_flight (struct rw *rw, uintptr_t index) > return 0; > } > > -struct rw_ops pipe_ops = { > +static struct rw_ops pipe_ops = { > .ops_name = "pipe_ops", > > .close = pipe_close, > + > + .is_read_only = pipe_is_read_only, > + .can_extents = pipe_can_extents, > + .can_multi_conn = pipe_can_multi_conn, > + .start_multi_conn = pipe_start_multi_conn, > + > .flush = pipe_flush, > > .synch_read = pipe_synch_read, > diff --git a/copy/synch-copying.c b/copy/synch-copying.c > index 2712c10..985f005 100644 > --- a/copy/synch-copying.c > +++ b/copy/synch-copying.c > @@ -38,13 +38,13 @@ synch_copying (void) > /* If the source size is unknown then we copy data and cannot use > * extent information. > */ > - if (src.size == -1) { > + if (src->size == -1) { > size_t r; > > - while ((r = src.ops->synch_read (&src, buf, sizeof buf, offset)) > 0) { > - dst.ops->synch_write (&dst, buf, r, offset); > + while ((r = src->ops->synch_read (src, buf, sizeof buf, offset)) > 0) { > + dst->ops->synch_write (dst, buf, r, offset); > offset += r; > - progress_bar (offset, src.size); > + progress_bar (offset, src->size); > } > } > > @@ -52,47 +52,47 @@ synch_copying (void) > * blocks and use extent information to optimize the case. > */ > else { > - while (offset < src.size) { > + while (offset < src->size) { > extent_list exts = empty_vector; > - uint64_t count = src.size - offset; > + uint64_t count = src->size - offset; > size_t i, r; > > if (count > sizeof buf) > count = sizeof buf; > > if (extents) > - src.ops->get_extents (&src, 0, offset, count, &exts); > + src->ops->get_extents (src, 0, offset, count, &exts); > else > - default_get_extents (&src, 0, offset, count, &exts); > + default_get_extents (src, 0, offset, count, &exts); > > for (i = 0; i < exts.size; ++i) { > assert (exts.ptr[i].length <= count); > > if (exts.ptr[i].zero) { > - if (!dst.ops->synch_trim (&dst, offset, exts.ptr[i].length) && > - !dst.ops->synch_zero (&dst, offset, exts.ptr[i].length)) { > + if (!dst->ops->synch_trim (dst, offset, exts.ptr[i].length) && > + !dst->ops->synch_zero (dst, offset, exts.ptr[i].length)) { > /* If neither trimming nor efficient zeroing are possible, > * write zeroes the hard way. > */ > memset (buf, 0, exts.ptr[i].length); > - dst.ops->synch_write (&dst, buf, exts.ptr[i].length, offset); > + dst->ops->synch_write (dst, buf, exts.ptr[i].length, offset); > } > offset += exts.ptr[i].length; > } > else /* data */ { > - r = src.ops->synch_read (&src, buf, exts.ptr[i].length, offset); > + r = src->ops->synch_read (src, buf, exts.ptr[i].length, offset); > > /* These cases should never happen unless the file is > * truncated underneath us. > */ > if (r == 0 || r < exts.ptr[i].length) { > - fprintf (stderr, "%s: unexpected end of file\n", src.name); > + fprintf (stderr, "%s: unexpected end of file\n", src->name); > exit (EXIT_FAILURE); > } > > - dst.ops->synch_write (&dst, buf, r, offset); > + dst->ops->synch_write (dst, buf, r, offset); > offset += r; > - progress_bar (offset, src.size); > + progress_bar (offset, src->size); > } > } > > -- > 2.29.0.rc2 >I looked at it briefly, this is a large change, but generally it looks good. Nir
On 2/22/21 9:42 AM, Richard W.M. Jones wrote:> Make this a (fairly) abstract structure. At least hide the subtype > fields from the main program. This change is pure refactoring and > doesn?t change the semantics. > --- > copy/file-ops.c | 164 +++++++++++++++++-- > copy/main.c | 315 ++++++++---------------------------- > copy/multi-thread-copying.c | 104 ++++++------ > copy/nbd-ops.c | 248 +++++++++++++++++++++++++--- > copy/nbdcopy.h | 53 +++--- > copy/null-ops.c | 50 +++++- > copy/pipe-ops.c | 64 +++++++- > copy/synch-copying.c | 30 ++-- > 8 files changed, 649 insertions(+), 379 deletions(-)Adds more than it removes, but I agree that the new layout looks easier to maintain.> +++ b/copy/nbdcopy.h > @@ -36,8 +36,6 @@ > */ > #define THREAD_WORK_SIZE (128 * 1024 * 1024) > > -DEFINE_VECTOR_TYPE (handles, struct nbd_handle *) > - > /* Abstracts the input (src) and output (dst) parameters on the > * command line. > */ > @@ -45,21 +43,20 @@ struct rw { > struct rw_ops *ops; /* Operations. */ > const char *name; /* Printable name, for error messages etc. */ > int64_t size; /* May be -1 for streams. */ > - union { > - struct { /* For files and pipes. */ > - int fd; > - struct stat stat; > - bool seek_hole_supported; > - int sector_size; > - } local; > - struct { > - handles handles; /* For NBD, one handle per connection. */ > - bool can_trim, can_zero; /* Cached nbd_can_trim, nbd_can_zero. */ > - } nbd; > - } u; > + /* Followed by private data for the particular subtype. */ > }; > > -extern struct rw src, dst; > +extern struct rw *src, *dst; > + > +/* Create subtypes. */ > +extern struct rw *file_create (const char *name, > + const struct stat *stat, int fd); > +extern struct rw *nbd_rw_create_uri (const char *name, > + const char *uri, bool writing); > +extern struct rw *nbd_rw_create_subprocess (const char **argv, size_t argc, > + bool writing); > +extern struct rw *null_create (const char *name); > +extern struct rw *pipe_create (const char *name, int fd); > > /* Underlying data buffers. */ > struct buffer { > @@ -117,6 +114,28 @@ struct rw_ops { > /* Close the connection and free up associated resources. */ > void (*close) (struct rw *rw); > > + /* Return true if this is a read-only connection. */ > + bool (*is_read_only) (struct rw *rw); > + > + /* For source only, does it support reading extents? */ > + bool (*can_extents) (struct rw *rw); > + > + /* Return true if the connection can do multi-conn. This is true > + * for files, false for streams, and passed through for NBD. > + */ > + bool (*can_multi_conn) (struct rw *rw); > + > + /* For multi-conn capable backends, before copying we must call this > + * to begin multi-conn. For NBD this means opening the additional > + * connections. > + */ > + void (*start_multi_conn) (struct rw *rw); > + > + /* Truncate, only called on output files. This callback can be NULL > + * for types that don't support this. > + */ > + void (*truncate) (struct rw *rw, int64_t size); > + > /* Flush pending writes to permanent storage. */ > void (*flush) (struct rw *rw);Looks nice, beyond what Nir already pointed out. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org