Richard W.M. Jones
2021-Feb-22 15:42 UTC
[Libguestfs] [PATCH libnbd] copy: Refactor struct rw
This is a large refactoring change which tries to clean up the internals of nbdcopy by making the various "ops" truly encapsulated. It obviously will create massive conflicts with Nir's patch series (although almost completely mechanical to fix). I promise to resolve those conflicts for you if you get me an updated version of your patch series. Rich.
Richard W.M. Jones
2021-Feb-22 15:42 UTC
[Libguestfs] [PATCH libnbd] copy: Refactor ‘struct rw’.
Make this a (fairly) abstract structure. At least hide the subtype fields from the main program. This change is pure refactoring and doesn?t change the semantics. --- copy/file-ops.c | 164 +++++++++++++++++-- copy/main.c | 315 ++++++++---------------------------- copy/multi-thread-copying.c | 104 ++++++------ copy/nbd-ops.c | 248 +++++++++++++++++++++++++--- copy/nbdcopy.h | 53 +++--- copy/null-ops.c | 50 +++++- copy/pipe-ops.c | 64 +++++++- copy/synch-copying.c | 30 ++-- 8 files changed, 649 insertions(+), 379 deletions(-) diff --git a/copy/file-ops.c b/copy/file-ops.c index 2a239d0..81b08ce 100644 --- a/copy/file-ops.c +++ b/copy/file-ops.c @@ -36,35 +36,159 @@ #include "isaligned.h" #include "nbdcopy.h" +static struct rw_ops file_ops; + +struct rw_file { + struct rw rw; + int fd; + struct stat stat; + bool seek_hole_supported; + int sector_size; +}; + +static bool +seek_hole_supported (int fd) +{ +#ifndef SEEK_HOLE + return false; +#else + off_t r = lseek (fd, 0, SEEK_HOLE); + return r >= 0; +#endif +} + +struct rw * +file_create (const char *name, const struct stat *stat, int fd) +{ + struct rw_file *rwf = calloc (1, sizeof *rwf); + if (rwf == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } + + rwf->rw.ops = &file_ops; + rwf->rw.name = name; + rwf->stat = *stat; + rwf->fd = fd; + + if (S_ISBLK (stat->st_mode)) { + /* Block device. */ + rwf->rw.size = lseek (fd, 0, SEEK_END); + if (rwf->rw.size == -1) { + perror ("lseek"); + exit (EXIT_FAILURE); + } + if (lseek (fd, 0, SEEK_SET) == -1) { + perror ("lseek"); + exit (EXIT_FAILURE); + } + rwf->seek_hole_supported = seek_hole_supported (fd); + rwf->sector_size = 4096; +#ifdef BLKSSZGET + if (ioctl (fd, BLKSSZGET, &rwf->sector_size)) + fprintf (stderr, "warning: cannot get sector size: %s: %m", name); +#endif + } + else if (S_ISREG (stat->st_mode)) { + /* Regular file. */ + rwf->rw.size = stat->st_size; + rwf->seek_hole_supported = seek_hole_supported (fd); + } + else + abort (); + + return (struct rw *) rwf; +} + static void file_close (struct rw *rw) { - if (close (rw->u.local.fd) == -1) { + struct rw_file *rwf = (struct rw_file *)rw; + + if (close (rwf->fd) == -1) { fprintf (stderr, "%s: close: %m\n", rw->name); exit (EXIT_FAILURE); } + free (rw); +} + +static void +file_truncate (struct rw *rw, int64_t size) +{ + struct rw_file *rwf = (struct rw_file *) rw; + + /* If the destination is an ordinary file then the original file + * size doesn't matter. Truncate it to the source size. But + * truncate it to zero first so the file is completely empty and + * sparse. + */ + if (! S_ISREG (rwf->stat.st_mode)) + return; + + if (ftruncate (rwf->fd, 0) == -1 || + ftruncate (rwf->fd, size) == -1) { + perror ("truncate"); + exit (EXIT_FAILURE); + } + rwf->rw.size = size; + + /* We can assume the destination is zero. */ + destination_is_zero = true; } static void file_flush (struct rw *rw) { - if ((S_ISREG (rw->u.local.stat.st_mode) || - S_ISBLK (rw->u.local.stat.st_mode)) && - fsync (rw->u.local.fd) == -1) { + struct rw_file *rwf = (struct rw_file *)rw; + + if ((S_ISREG (rwf->stat.st_mode) || + S_ISBLK (rwf->stat.st_mode)) && + fsync (rwf->fd) == -1) { perror (rw->name); exit (EXIT_FAILURE); } } +static bool +file_is_read_only (struct rw *rw) +{ + /* Permissions are hard, and this is only used as an early check + * before the copy. Proceed with the copy and fail if it fails. + */ + return false; +} + +static bool +file_can_extents (struct rw *rw) +{ +#ifdef SEEK_HOLE + return true; +#else + return false; +#endif +} + +static bool +file_can_multi_conn (struct rw *rw) +{ + return true; +} + +static void +file_start_multi_conn (struct rw *rw) +{ + /* Don't need to do anything for files since we can read/write on a + * single file descriptor. + */ +} + static size_t file_synch_read (struct rw *rw, void *data, size_t len, uint64_t offset) { + struct rw_file *rwf = (struct rw_file *)rw; size_t n = 0; ssize_t r; while (len > 0) { - r = pread (rw->u.local.fd, data, len, offset); + r = pread (rwf->fd, data, len, offset); if (r == -1) { perror (rw->name); exit (EXIT_FAILURE); @@ -85,10 +209,11 @@ static void file_synch_write (struct rw *rw, const void *data, size_t len, uint64_t offset) { + struct rw_file *rwf = (struct rw_file *)rw; ssize_t r; while (len > 0) { - r = pwrite (rw->u.local.fd, data, len, offset); + r = pwrite (rwf->fd, data, len, offset); if (r == -1) { perror (rw->name); exit (EXIT_FAILURE); @@ -103,7 +228,8 @@ static bool file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) { #ifdef FALLOC_FL_PUNCH_HOLE - int fd = rw->u.local.fd; + struct rw_file *rwf = (struct rw_file *)rw; + int fd = rwf->fd; int r; r = fallocate (fd, FALLOC_FL_PUNCH_HOLE | FALLOC_FL_KEEP_SIZE, @@ -121,9 +247,11 @@ file_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) static bool file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count) { - if (S_ISREG (rw->u.local.stat.st_mode)) { + struct rw_file *rwf = (struct rw_file *)rw; + + if (S_ISREG (rwf->stat.st_mode)) { #ifdef FALLOC_FL_ZERO_RANGE - int fd = rw->u.local.fd; + int fd = rwf->fd; int r; r = fallocate (fd, FALLOC_FL_ZERO_RANGE, offset, count); @@ -134,10 +262,10 @@ file_synch_zero (struct rw *rw, uint64_t offset, uint64_t count) return true; #endif } - else if (S_ISBLK (rw->u.local.stat.st_mode) && - IS_ALIGNED (offset | count, rw->u.local.sector_size)) { + else if (S_ISBLK (rwf->stat.st_mode) && + IS_ALIGNED (offset | count, rwf->sector_size)) { #ifdef BLKZEROOUT - int fd = rw->u.local.fd; + int fd = rwf->fd; int r; uint64_t range[2] = {offset, count}; @@ -223,11 +351,12 @@ file_get_extents (struct rw *rw, uintptr_t index, ret->size = 0; #ifdef SEEK_HOLE + struct rw_file *rwf = (struct rw_file *)rw; static pthread_mutex_t lseek_lock = PTHREAD_MUTEX_INITIALIZER; - if (rw->u.local.seek_hole_supported) { + if (rwf->seek_hole_supported) { uint64_t end = offset + count; - int fd = rw->u.local.fd; + int fd = rwf->fd; off_t pos; struct extent e; size_t last; @@ -302,9 +431,14 @@ file_get_extents (struct rw *rw, uintptr_t index, default_get_extents (rw, index, offset, count, ret); } -struct rw_ops file_ops = { +static struct rw_ops file_ops = { .ops_name = "file_ops", .close = file_close, + .is_read_only = file_is_read_only, + .can_extents = file_can_extents, + .can_multi_conn = file_can_multi_conn, + .start_multi_conn = file_start_multi_conn, + .truncate = file_truncate, .flush = file_flush, .synch_read = file_synch_read, .synch_write = file_synch_write, diff --git a/copy/main.c b/copy/main.c index 68a6030..6fdc6fd 100644 --- a/copy/main.c +++ b/copy/main.c @@ -53,19 +53,11 @@ int progress_fd = -1; /* --progress=FD */ unsigned sparse_size = 4096; /* --sparse */ bool synchronous; /* --synchronous flag */ unsigned threads; /* --threads */ -struct rw src, dst; /* The source and destination. */ +struct rw *src, *dst; /* The source and destination. */ bool verbose; /* --verbose flag */ static bool is_nbd_uri (const char *s); -static bool seek_hole_supported (int fd); -static void open_null (struct rw *rw); -static int open_local (const char *prog, - const char *filename, bool writing, struct rw *rw); -static void open_nbd_uri (const char *prog, - const char *uri, bool writing, struct rw *rw); -static void open_nbd_subprocess (const char *prog, - const char **argv, size_t argc, - bool writing, struct rw *rw); +static struct rw *open_local (const char *filename, bool writing); static void print_rw (struct rw *rw, const char *prefix, FILE *fp); static void __attribute__((noreturn)) @@ -242,18 +234,18 @@ main (int argc, char *argv[]) found1: connections = 1; /* multi-conn not supported */ - src.name = argv[optind+1]; - open_nbd_subprocess (argv[0], - (const char **) &argv[optind+1], i-optind-1, - false, &src); + src + nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1, + false); optind = i+1; } else { /* Source is not [...]. */ - src.name = argv[optind++]; - if (! is_nbd_uri (src.name)) - src.u.local.fd = open_local (argv[0], src.name, false, &src); + const char *src_name = argv[optind++]; + + if (! is_nbd_uri (src_name)) + src = open_local (src_name, false); else - open_nbd_uri (argv[0], src.name, false, &src); + src = nbd_rw_create_uri (src_name, src_name, false); } if (optind >= argc) @@ -267,48 +259,46 @@ main (int argc, char *argv[]) found2: connections = 1; /* multi-conn not supported */ - dst.name = argv[optind+1]; - open_nbd_subprocess (argv[0], - (const char **) &argv[optind+1], i-optind-1, - true, &dst); + dst + nbd_rw_create_subprocess ((const char **) &argv[optind+1], i-optind-1, + true); optind = i+1; } else { /* Destination is not [...] */ - dst.name = argv[optind++]; - if (strcmp (dst.name, "null:") == 0) - open_null (&dst); - else if (! is_nbd_uri (dst.name)) - dst.u.local.fd = open_local (argv[0], dst.name, true /* writing */, &dst); - else { - open_nbd_uri (argv[0], dst.name, true, &dst); + const char *dst_name = argv[optind++]; - /* Obviously this is not going to work if the server is - * advertising read-only, so fail early with a nice error message. - */ - if (nbd_is_read_only (dst.u.nbd.handles.ptr[0])) { - fprintf (stderr, "%s: %s: " - "this NBD server is read-only, cannot write to it\n", - argv[0], dst.name); - exit (EXIT_FAILURE); - } - } + if (strcmp (dst_name, "null:") == 0) + dst = null_create (dst_name); + else if (! is_nbd_uri (dst_name)) + dst = open_local (dst_name, true /* writing */); + else + dst = nbd_rw_create_uri (dst_name, dst_name, true); } /* There must be no extra parameters. */ if (optind != argc) usage (stderr, EXIT_FAILURE); - /* Check we've set the fields of src and dst. */ - assert (src.ops); - assert (src.name); - assert (dst.ops); - assert (dst.name); + /* Check we've created src and dst and set the expected fields. */ + assert (src != NULL); + assert (dst != NULL); + assert (src->ops != NULL); + assert (src->name != NULL); + assert (dst->ops != NULL); + assert (dst->name != NULL); + + /* Obviously this is not going to work if the destination is + * read-only, so fail early with a nice error message. + */ + if (dst->ops->is_read_only (dst)) { + fprintf (stderr, "%s: %s: " + "the destination is read-only, cannot write to it\n", + argv[0], dst->name); + exit (EXIT_FAILURE); + } /* If multi-conn is not supported, force connections to 1. */ - if ((src.ops == &nbd_ops && - ! nbd_can_multi_conn (src.u.nbd.handles.ptr[0])) || - (dst.ops == &nbd_ops && - ! nbd_can_multi_conn (dst.u.nbd.handles.ptr[0]))) + if (! src->ops->can_multi_conn (src) || ! dst->ops->can_multi_conn (dst)) connections = 1; /* Calculate the number of threads from the number of connections. */ @@ -335,44 +325,17 @@ main (int argc, char *argv[]) if (threads < connections) connections = threads; - /* Calculate the source and destination sizes. We set these to -1 - * if the size is not known (because it's a stream). Note that for - * local types, open_local set something in *.size already. + /* Truncate the destination to the same size as the source. Only + * has an effect on regular files. */ - if (src.ops == &nbd_ops) { - src.size = nbd_get_size (src.u.nbd.handles.ptr[0]); - if (src.size == -1) { - fprintf (stderr, "%s: %s: %s\n", argv[0], src.name, nbd_get_error ()); - exit (EXIT_FAILURE); - } - } - if (dst.ops != &nbd_ops && S_ISREG (dst.u.local.stat.st_mode)) { - /* If the destination is an ordinary file then the original file - * size doesn't matter. Truncate it to the source size. But - * truncate it to zero first so the file is completely empty and - * sparse. - */ - dst.size = src.size; - if (ftruncate (dst.u.local.fd, 0) == -1 || - ftruncate (dst.u.local.fd, dst.size) == -1) { - perror ("truncate"); - exit (EXIT_FAILURE); - } - destination_is_zero = true; - } - else if (dst.ops == &nbd_ops) { - dst.size = nbd_get_size (dst.u.nbd.handles.ptr[0]); - if (dst.size == -1) { - fprintf (stderr, "%s: %s: %s\n", argv[0], dst.name, nbd_get_error ()); - exit (EXIT_FAILURE); - } - } + if (dst->ops->truncate) + dst->ops->truncate (dst, src->size); /* Check if the source is bigger than the destination, since that * would truncate (ie. lose) data. Copying from smaller to larger * is OK. */ - if (src.size >= 0 && dst.size >= 0 && src.size > dst.size) { + if (src->size >= 0 && dst->size >= 0 && src->size > dst->size) { fprintf (stderr, "nbdcopy: error: destination size is smaller than source size\n"); exit (EXIT_FAILURE); @@ -383,37 +346,29 @@ main (int argc, char *argv[]) * settings. */ if (verbose) { - print_rw (&src, "nbdcopy: src", stderr); - print_rw (&dst, "nbdcopy: dst", stderr); + print_rw (src, "nbdcopy: src", stderr); + print_rw (dst, "nbdcopy: dst", stderr); fprintf (stderr, "nbdcopy: connections=%u requests=%u threads=%u " "synchronous=%s\n", connections, max_requests, threads, synchronous ? "true" : "false"); } - /* If #connections > 1 then multi-conn is enabled at both ends and - * we need to open further connections. + /* If multi-conn is enabled on either side, then at this point we + * need to ask the backend to open the extra connections. */ if (connections > 1) { assert (threads == connections); - - if (src.ops == &nbd_ops) { - for (i = 1; i < connections; ++i) - open_nbd_uri (argv[0], src.name, false, &src); - assert (src.u.nbd.handles.size == connections); - } - if (dst.ops == &nbd_ops) { - for (i = 1; i < connections; ++i) - open_nbd_uri (argv[0], dst.name, true, &dst); - assert (dst.u.nbd.handles.size == connections); - } + if (src->ops->can_multi_conn (src)) + src->ops->start_multi_conn (src); + if (dst->ops->can_multi_conn (dst)) + dst->ops->start_multi_conn (dst); } /* If the source is NBD and we couldn't negotiate meta * base:allocation then turn off extents. */ - if (src.ops == &nbd_ops && - !nbd_can_meta_context (src.u.nbd.handles.ptr[0], "base:allocation")) + if (! src->ops->can_extents (src)) extents = false; /* Always set the progress bar to 0% at the start of the copy. */ @@ -429,12 +384,12 @@ main (int argc, char *argv[]) progress_bar (1, 1); /* Shut down the source side. */ - src.ops->close (&src); + src->ops->close (src); /* Shut down the destination side. */ if (flush) - dst.ops->flush (&dst); - dst.ops->close (&dst); + dst->ops->flush (dst); + dst->ops->close (dst); exit (EXIT_SUCCESS); } @@ -452,33 +407,25 @@ is_nbd_uri (const char *s) strncmp (s, "nbds+vsock:", 11) == 0; } -/* Open null: (destination only). */ -static void -open_null (struct rw *rw) -{ - rw->ops = &null_ops; - rw->size = INT64_MAX; -} - /* Open a local (non-NBD) file, ie. a file, device, or "-" for stdio. - * Returns the open file descriptor which the caller must close. + * Returns the struct rw * which the caller must close. * * ?writing? is true if this is the destination parameter. * ?rw->u.local.stat? and ?rw->size? return the file stat and size, * but size can be returned as -1 if we don't know the size (if it's a * pipe or stdio). */ -static int -open_local (const char *prog, - const char *filename, bool writing, struct rw *rw) +static struct rw * +open_local (const char *filename, bool writing) { int flags, fd; + struct stat stat; if (strcmp (filename, "-") == 0) { synchronous = true; fd = writing ? STDOUT_FILENO : STDIN_FILENO; if (writing && isatty (fd)) { - fprintf (stderr, "%s: refusing to write to tty\n", prog); + fprintf (stderr, "%s: refusing to write to tty\n", "nbdcopy"); exit (EXIT_FAILURE); } } @@ -502,146 +449,17 @@ open_local (const char *prog, } } - if (fstat (fd, &rw->u.local.stat) == -1) { + if (fstat (fd, &stat) == -1) { perror (filename); exit (EXIT_FAILURE); } - if (S_ISBLK (rw->u.local.stat.st_mode)) { - /* Block device. */ - rw->ops = &file_ops; - rw->size = lseek (fd, 0, SEEK_END); - if (rw->size == -1) { - perror ("lseek"); - exit (EXIT_FAILURE); - } - if (lseek (fd, 0, SEEK_SET) == -1) { - perror ("lseek"); - exit (EXIT_FAILURE); - } - rw->u.local.seek_hole_supported = seek_hole_supported (fd); - rw->u.local.sector_size = 4096; -#ifdef BLKSSZGET - if (ioctl (fd, BLKSSZGET, &rw->u.local.sector_size)) - fprintf (stderr, "warning: cannot get sector size: %s: %m", rw->name); -#endif - } - else if (S_ISREG (rw->u.local.stat.st_mode)) { - /* Regular file. */ - rw->ops = &file_ops; - rw->size = rw->u.local.stat.st_size; - rw->u.local.seek_hole_supported = seek_hole_supported (fd); - } + if (S_ISBLK (stat.st_mode) || S_ISREG (stat.st_mode)) + return file_create (filename, &stat, fd); else { - /* Probably stdin/stdout, a pipe or a socket. Set size == -1 - * which means don't know, and force synchronous mode. - */ - synchronous = true; - rw->ops = &pipe_ops; - rw->size = -1; - rw->u.local.seek_hole_supported = false; + /* Probably stdin/stdout, a pipe or a socket. */ + synchronous = true; /* Force synchronous mode for pipes. */ + return pipe_create (filename, fd); } - - return fd; -} - -static bool -seek_hole_supported (int fd) -{ -#ifndef SEEK_HOLE - return false; -#else - off_t r = lseek (fd, 0, SEEK_HOLE); - return r >= 0; -#endif -} - -static void -open_nbd_uri (const char *prog, - const char *uri, bool writing, struct rw *rw) -{ - struct nbd_handle *nbd; - - rw->ops = &nbd_ops; - nbd = nbd_create (); - if (nbd == NULL) { - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); - exit (EXIT_FAILURE); - } - nbd_set_debug (nbd, verbose); - nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */ - if (extents && !writing && - nbd_add_meta_context (nbd, "base:allocation") == -1) { - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); - exit (EXIT_FAILURE); - } - - if (handles_append (&rw->u.nbd.handles, nbd) == -1) { - perror ("realloc"); - exit (EXIT_FAILURE); - } - - if (nbd_connect_uri (nbd, uri) == -1) { - fprintf (stderr, "%s: %s: %s\n", prog, uri, nbd_get_error ()); - exit (EXIT_FAILURE); - } - - /* Cache these. We assume with multi-conn that each handle will act - * the same way. - */ - rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0; - rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0; -} - -DEFINE_VECTOR_TYPE (const_string_vector, const char *); - -static void -open_nbd_subprocess (const char *prog, - const char **argv, size_t argc, - bool writing, struct rw *rw) -{ - struct nbd_handle *nbd; - const_string_vector copy = empty_vector; - size_t i; - - rw->ops = &nbd_ops; - nbd = nbd_create (); - if (nbd == NULL) { - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); - exit (EXIT_FAILURE); - } - nbd_set_debug (nbd, verbose); - if (extents && !writing && - nbd_add_meta_context (nbd, "base:allocation") == -1) { - fprintf (stderr, "%s: %s\n", prog, nbd_get_error ()); - exit (EXIT_FAILURE); - } - - if (handles_append (&rw->u.nbd.handles, nbd) == -1) { - memory_error: - perror ("realloc"); - exit (EXIT_FAILURE); - } - - /* We have to copy the args so we can null-terminate them. */ - for (i = 0; i < argc; ++i) { - if (const_string_vector_append (©, argv[i]) == -1) - goto memory_error; - } - if (const_string_vector_append (©, NULL) == -1) - goto memory_error; - - if (nbd_connect_systemd_socket_activation (nbd, (char **) copy.ptr) == -1) { - fprintf (stderr, "%s: %s: %s\n", prog, argv[0], nbd_get_error ()); - exit (EXIT_FAILURE); - } - - /* Cache these. We assume with multi-conn that each handle will act - * the same way. - */ - rw->u.nbd.can_trim = nbd_can_trim (nbd) > 0; - rw->u.nbd.can_zero = nbd_can_zero (nbd) > 0; - - free (copy.ptr); } /* Print an rw struct, used in --verbose mode. */ @@ -650,7 +468,6 @@ print_rw (struct rw *rw, const char *prefix, FILE *fp) { fprintf (fp, "%s: %s \"%s\"\n", prefix, rw->ops->ops_name, rw->name); fprintf (fp, "%s: size=%" PRIi64 "\n", prefix, rw->size); - /* Could print other stuff here, but that's enough for debugging. */ } /* Default implementation of rw->ops->get_extents for backends which diff --git a/copy/multi-thread-copying.c b/copy/multi-thread-copying.c index 98b4056..4f57054 100644 --- a/copy/multi-thread-copying.c +++ b/copy/multi-thread-copying.c @@ -50,13 +50,13 @@ get_next_offset (uint64_t *offset, uint64_t *count) bool r = false; /* returning false means no more work */ pthread_mutex_lock (&lock); - if (next_offset < src.size) { + if (next_offset < src->size) { *offset = next_offset; /* Work out how large this range is. The last range may be * smaller than THREAD_WORK_SIZE. */ - *count = src.size - *offset; + *count = src->size - *offset; if (*count > THREAD_WORK_SIZE) *count = THREAD_WORK_SIZE; @@ -69,7 +69,7 @@ get_next_offset (uint64_t *offset, uint64_t *count) * are called from threads and not necessarily in monotonic order * so the progress bar would move erratically. */ - progress_bar (*offset, dst.size); + progress_bar (*offset, dst->size); } pthread_mutex_unlock (&lock); return r; @@ -89,11 +89,13 @@ multi_thread_copying (void) */ assert (threads > 0); assert (threads == connections); +/* if (src.ops == &nbd_ops) assert (src.u.nbd.handles.size == connections); if (dst.ops == &nbd_ops) assert (dst.u.nbd.handles.size == connections); - assert (src.size != -1); +*/ + assert (src->size != -1); workers = malloc (sizeof (pthread_t) * threads); if (workers == NULL) { @@ -147,9 +149,9 @@ worker_thread (void *indexp) assert (0 < count && count <= THREAD_WORK_SIZE); if (extents) - src.ops->get_extents (&src, index, offset, count, &exts); + src->ops->get_extents (src, index, offset, count, &exts); else - default_get_extents (&src, index, offset, count, &exts); + default_get_extents (src, index, offset, count, &exts); for (i = 0; i < exts.size; ++i) { struct command *command; @@ -208,11 +210,11 @@ worker_thread (void *indexp) wait_for_request_slots (index); /* Begin the asynch read operation. */ - src.ops->asynch_read (&src, command, - (nbd_completion_callback) { - .callback = finished_read, - .user_data = command, - }); + src->ops->asynch_read (src, command, + (nbd_completion_callback) { + .callback = finished_read, + .user_data = command, + }); exts.ptr[i].offset += len; exts.ptr[i].length -= len; @@ -254,7 +256,7 @@ wait_for_request_slots (uintptr_t index) static unsigned in_flight (uintptr_t index) { - return src.ops->in_flight (&src, index) + dst.ops->in_flight (&dst, index); + return src->ops->in_flight (src, index) + dst->ops->in_flight (dst, index); } /* Poll (optional) NBD src and NBD dst, moving the state machine(s) @@ -271,7 +273,7 @@ poll_both_ends (uintptr_t index) /* Note: if polling is not supported, this function will * set fd == -1 which poll ignores. */ - src.ops->get_polling_fd (&src, index, &fds[0].fd, &direction); + src->ops->get_polling_fd (src, index, &fds[0].fd, &direction); if (fds[0].fd >= 0) { switch (direction) { case LIBNBD_AIO_DIRECTION_READ: @@ -286,7 +288,7 @@ poll_both_ends (uintptr_t index) } } - dst.ops->get_polling_fd (&dst, index, &fds[1].fd, &direction); + dst->ops->get_polling_fd (dst, index, &fds[1].fd, &direction); if (fds[1].fd >= 0) { switch (direction) { case LIBNBD_AIO_DIRECTION_READ: @@ -311,24 +313,24 @@ poll_both_ends (uintptr_t index) if (fds[0].fd >= 0) { if ((fds[0].revents & (POLLIN | POLLHUP)) != 0) - src.ops->asynch_notify_read (&src, index); + src->ops->asynch_notify_read (src, index); else if ((fds[0].revents & POLLOUT) != 0) - src.ops->asynch_notify_write (&src, index); + src->ops->asynch_notify_write (src, index); else if ((fds[0].revents & (POLLERR | POLLNVAL)) != 0) { errno = ENOTCONN; - perror (src.name); + perror (src->name); exit (EXIT_FAILURE); } } if (fds[1].fd >= 0) { if ((fds[1].revents & (POLLIN | POLLHUP)) != 0) - dst.ops->asynch_notify_read (&dst, index); + dst->ops->asynch_notify_read (dst, index); else if ((fds[1].revents & POLLOUT) != 0) - dst.ops->asynch_notify_write (&dst, index); + dst->ops->asynch_notify_write (dst, index); else if ((fds[1].revents & (POLLERR | POLLNVAL)) != 0) { errno = ENOTCONN; - perror (dst.name); + perror (dst->name); exit (EXIT_FAILURE); } } @@ -377,11 +379,11 @@ finished_read (void *vp, int *error) /* If sparseness detection (see below) is turned off then we write * the whole command. */ - dst.ops->asynch_write (&dst, command, - (nbd_completion_callback) { - .callback = free_command, - .user_data = command, - }); + dst->ops->asynch_write (dst, command, + (nbd_completion_callback) { + .callback = free_command, + .user_data = command, + }); } else { /* Sparseness detection. */ const uint64_t start = command->offset; @@ -408,11 +410,11 @@ finished_read (void *vp, int *error) newcommand = copy_subcommand (command, last_offset, i - last_offset, false); - dst.ops->asynch_write (&dst, newcommand, - (nbd_completion_callback) { - .callback = free_command, - .user_data = newcommand, - }); + dst->ops->asynch_write (dst, newcommand, + (nbd_completion_callback) { + .callback = free_command, + .user_data = newcommand, + }); } /* Start the new hole. */ last_offset = i; @@ -445,11 +447,11 @@ finished_read (void *vp, int *error) newcommand = copy_subcommand (command, last_offset, i - last_offset, false); - dst.ops->asynch_write (&dst, newcommand, - (nbd_completion_callback) { - .callback = free_command, - .user_data = newcommand, - }); + dst->ops->asynch_write (dst, newcommand, + (nbd_completion_callback) { + .callback = free_command, + .user_data = newcommand, + }); } else { newcommand = copy_subcommand (command, @@ -462,11 +464,11 @@ finished_read (void *vp, int *error) /* There may be an unaligned tail, so write that. */ if (end - i > 0) { newcommand = copy_subcommand (command, i, end - i, false); - dst.ops->asynch_write (&dst, newcommand, - (nbd_completion_callback) { - .callback = free_command, - .user_data = newcommand, - }); + dst->ops->asynch_write (dst, newcommand, + (nbd_completion_callback) { + .callback = free_command, + .user_data = newcommand, + }); } /* Free the original command since it has been split into @@ -503,20 +505,20 @@ fill_dst_range_with_zeroes (struct command *command) if (!allocated) { /* Try trimming. */ - if (dst.ops->asynch_trim (&dst, command, - (nbd_completion_callback) { - .callback = free_command, - .user_data = command, - })) + if (dst->ops->asynch_trim (dst, command, + (nbd_completion_callback) { + .callback = free_command, + .user_data = command, + })) return; } /* Try efficient zeroing. */ - if (dst.ops->asynch_zero (&dst, command, - (nbd_completion_callback) { - .callback = free_command, - .user_data = command, - })) + if (dst->ops->asynch_zero (dst, command, + (nbd_completion_callback) { + .callback = free_command, + .user_data = command, + })) return; /* Fall back to loop writing zeroes. This is going to be slow @@ -533,7 +535,7 @@ fill_dst_range_with_zeroes (struct command *command) if (len > MAX_REQUEST_SIZE) len = MAX_REQUEST_SIZE; - dst.ops->synch_write (&dst, data, len, command->offset); + dst->ops->synch_write (dst, data, len, command->offset); command->slice.len -= len; command->offset += len; } diff --git a/copy/nbd-ops.c b/copy/nbd-ops.c index 7b48cbc..24970c2 100644 --- a/copy/nbd-ops.c +++ b/copy/nbd-ops.c @@ -27,45 +27,221 @@ #include "nbdcopy.h" +static struct rw_ops nbd_ops; + +DEFINE_VECTOR_TYPE (handles, struct nbd_handle *) +DEFINE_VECTOR_TYPE (const_string_vector, const char *); + +struct rw_nbd { + struct rw rw; + + /* Because of multi-conn we have to remember enough state in this + * handle in order to be able to open another connection with the + * same parameters after nbd_rw_create* has been called once. + */ + enum { CREATE_URI, CREATE_SUBPROCESS } create_t; + const char *uri; /* For CREATE_URI */ + const_string_vector argv; /* For CREATE_SUBPROCESS */ + bool writing; + + handles handles; /* One handle per connection. */ + bool can_trim, can_zero; /* Cached nbd_can_trim, nbd_can_zero. */ +}; + +static void +open_one_nbd_handle (struct rw_nbd *rwn) +{ + struct nbd_handle *nbd; + + nbd = nbd_create (); + if (nbd == NULL) { + fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + nbd_set_debug (nbd, verbose); + + if (extents && !rwn->writing && + nbd_add_meta_context (nbd, "base:allocation") == -1) { + fprintf (stderr, "%s: %s\n", "nbdcopy", nbd_get_error ()); + exit (EXIT_FAILURE); + } + + switch (rwn->create_t) { + case CREATE_URI: + nbd_set_uri_allow_local_file (nbd, true); /* Allow ?tls-psk-file. */ + + if (nbd_connect_uri (nbd, rwn->uri) == -1) { + fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->uri, nbd_get_error ()); + exit (EXIT_FAILURE); + } + break; + + case CREATE_SUBPROCESS: + if (nbd_connect_systemd_socket_activation (nbd, + (char **) rwn->argv.ptr) + == -1) { + fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->argv.ptr[0], + nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + + /* Cache these. We assume with multi-conn that each handle will act + * the same way. + */ + if (rwn->handles.size == 0) { + rwn->can_trim = nbd_can_trim (nbd) > 0; + rwn->can_zero = nbd_can_zero (nbd) > 0; + rwn->rw.size = nbd_get_size (nbd); + if (rwn->rw.size == -1) { + fprintf (stderr, "%s: %s: %s\n", "nbdcopy", rwn->rw.name, + nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + + if (handles_append (&rwn->handles, nbd) == -1) { + perror ("realloc"); + exit (EXIT_FAILURE); + } +} + +struct rw * +nbd_rw_create_uri (const char *name, const char *uri, bool writing) +{ + struct rw_nbd *rwn = calloc (1, sizeof *rwn); + if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } + + rwn->rw.ops = &nbd_ops; + rwn->rw.name = name; + rwn->create_t = CREATE_URI; + rwn->uri = uri; + rwn->writing = writing; + + open_one_nbd_handle (rwn); + + return (struct rw *) rwn; +} + +struct rw * +nbd_rw_create_subprocess (const char **argv, size_t argc, bool writing) +{ + size_t i; + struct rw_nbd *rwn = calloc (1, sizeof *rwn); + if (rwn == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } + + rwn->rw.ops = &nbd_ops; + rwn->rw.name = argv[0]; + rwn->create_t = CREATE_SUBPROCESS; + rwn->writing = writing; + + /* We have to copy the args so we can null-terminate them. */ + for (i = 0; i < argc; ++i) { + if (const_string_vector_append (&rwn->argv, argv[i]) == -1) { + memory_error: + perror ("realloc"); + exit (EXIT_FAILURE); + } + } + if (const_string_vector_append (&rwn->argv, NULL) == -1) + goto memory_error; + + open_one_nbd_handle (rwn); + + return (struct rw *) rwn; +} + static void nbd_ops_close (struct rw *rw) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; size_t i; - for (i = 0; i < rw->u.nbd.handles.size; ++i) { - if (nbd_shutdown (rw->u.nbd.handles.ptr[i], 0) == -1) { + for (i = 0; i < rwn->handles.size; ++i) { + if (nbd_shutdown (rwn->handles.ptr[i], 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } - nbd_close (rw->u.nbd.handles.ptr[i]); + nbd_close (rwn->handles.ptr[i]); } - handles_reset (&rw->u.nbd.handles); + handles_reset (&rwn->handles); + const_string_vector_reset (&rwn->argv); + free (rw); } static void nbd_ops_flush (struct rw *rw) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; size_t i; - for (i = 0; i < rw->u.nbd.handles.size; ++i) { - if (nbd_flush (rw->u.nbd.handles.ptr[i], 0) == -1) { + for (i = 0; i < rwn->handles.size; ++i) { + if (nbd_flush (rwn->handles.ptr[i], 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } } } +static bool +nbd_ops_is_read_only (struct rw *rw) +{ + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (rwn->handles.size > 0) + return nbd_is_read_only (rwn->handles.ptr[0]); + else + return false; +} + +static bool +nbd_ops_can_extents (struct rw *rw) +{ + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (rwn->handles.size > 0) + return nbd_can_meta_context (rwn->handles.ptr[0], "base:allocation"); + else + return false; +} + +static bool +nbd_ops_can_multi_conn (struct rw *rw) +{ + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (rwn->handles.size > 0) + return nbd_can_multi_conn (rwn->handles.ptr[0]); + else + return false; +} + +static void +nbd_ops_start_multi_conn (struct rw *rw) +{ + struct rw_nbd *rwn = (struct rw_nbd *) rw; + size_t i; + + for (i = 1; i < connections; ++i) + open_one_nbd_handle (rwn); + + assert (rwn->handles.size == connections); +} + static size_t nbd_ops_synch_read (struct rw *rw, void *data, size_t len, uint64_t offset) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; + if (len > rw->size - offset) len = rw->size - offset; if (len == 0) return 0; - if (nbd_pread (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) { + if (nbd_pread (rwn->handles.ptr[0], data, len, offset, 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } @@ -77,7 +253,9 @@ static void nbd_ops_synch_write (struct rw *rw, const void *data, size_t len, uint64_t offset) { - if (nbd_pwrite (rw->u.nbd.handles.ptr[0], data, len, offset, 0) == -1) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (nbd_pwrite (rwn->handles.ptr[0], data, len, offset, 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } @@ -86,10 +264,12 @@ nbd_ops_synch_write (struct rw *rw, static bool nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) { - if (!rw->u.nbd.can_trim) + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (!rwn->can_trim) return false; - if (nbd_trim (rw->u.nbd.handles.ptr[0], count, offset, 0) == -1) { + if (nbd_trim (rwn->handles.ptr[0], count, offset, 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } @@ -99,10 +279,12 @@ nbd_ops_synch_trim (struct rw *rw, uint64_t offset, uint64_t count) static bool nbd_ops_synch_zero (struct rw *rw, uint64_t offset, uint64_t count) { - if (!rw->u.nbd.can_zero) + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (!rwn->can_zero) return false; - if (nbd_zero (rw->u.nbd.handles.ptr[0], + if (nbd_zero (rwn->handles.ptr[0], count, offset, LIBNBD_CMD_FLAG_NO_HOLE) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); @@ -115,7 +297,9 @@ nbd_ops_asynch_read (struct rw *rw, struct command *command, nbd_completion_callback cb) { - if (nbd_aio_pread (rw->u.nbd.handles.ptr[command->index], + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (nbd_aio_pread (rwn->handles.ptr[command->index], slice_ptr (command->slice), command->slice.len, command->offset, cb, 0) == -1) { @@ -129,7 +313,9 @@ nbd_ops_asynch_write (struct rw *rw, struct command *command, nbd_completion_callback cb) { - if (nbd_aio_pwrite (rw->u.nbd.handles.ptr[command->index], + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (nbd_aio_pwrite (rwn->handles.ptr[command->index], slice_ptr (command->slice), command->slice.len, command->offset, cb, 0) == -1) { @@ -142,12 +328,14 @@ static bool nbd_ops_asynch_trim (struct rw *rw, struct command *command, nbd_completion_callback cb) { - if (!rw->u.nbd.can_trim) + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (!rwn->can_trim) return false; assert (command->slice.len <= UINT32_MAX); - if (nbd_aio_trim (rw->u.nbd.handles.ptr[command->index], + if (nbd_aio_trim (rwn->handles.ptr[command->index], command->slice.len, command->offset, cb, 0) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); @@ -160,12 +348,14 @@ static bool nbd_ops_asynch_zero (struct rw *rw, struct command *command, nbd_completion_callback cb) { - if (!rw->u.nbd.can_zero) + struct rw_nbd *rwn = (struct rw_nbd *) rw; + + if (!rwn->can_zero) return false; assert (command->slice.len <= UINT32_MAX); - if (nbd_aio_zero (rw->u.nbd.handles.ptr[command->index], + if (nbd_aio_zero (rwn->handles.ptr[command->index], command->slice.len, command->offset, cb, LIBNBD_CMD_FLAG_NO_HOLE) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); @@ -212,19 +402,22 @@ add_extent (void *vp, const char *metacontext, static unsigned nbd_ops_in_flight (struct rw *rw, uintptr_t index) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; + /* Since the commands are auto-retired in the callbacks we don't * need to count "done" commands. */ - return nbd_aio_in_flight (rw->u.nbd.handles.ptr[index]); + return nbd_aio_in_flight (rwn->handles.ptr[index]); } static void nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index, int *fd, int *direction) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; struct nbd_handle *nbd; - nbd = rw->u.nbd.handles.ptr[index]; + nbd = rwn->handles.ptr[index]; *fd = nbd_aio_get_fd (nbd); if (*fd == -1) { @@ -240,7 +433,8 @@ nbd_ops_get_polling_fd (struct rw *rw, uintptr_t index, static void nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index) { - if (nbd_aio_notify_read (rw->u.nbd.handles.ptr[index]) == -1) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; + if (nbd_aio_notify_read (rwn->handles.ptr[index]) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } @@ -249,7 +443,8 @@ nbd_ops_asynch_notify_read (struct rw *rw, uintptr_t index) static void nbd_ops_asynch_notify_write (struct rw *rw, uintptr_t index) { - if (nbd_aio_notify_write (rw->u.nbd.handles.ptr[index]) == -1) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; + if (nbd_aio_notify_write (rwn->handles.ptr[index]) == -1) { fprintf (stderr, "%s: %s\n", rw->name, nbd_get_error ()); exit (EXIT_FAILURE); } @@ -266,10 +461,11 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index, uint64_t offset, uint64_t count, extent_list *ret) { + struct rw_nbd *rwn = (struct rw_nbd *) rw; extent_list exts = empty_vector; struct nbd_handle *nbd; - nbd = rw->u.nbd.handles.ptr[index]; + nbd = rwn->handles.ptr[index]; ret->size = 0; @@ -331,9 +527,13 @@ nbd_ops_get_extents (struct rw *rw, uintptr_t index, free (exts.ptr); } -struct rw_ops nbd_ops = { +static struct rw_ops nbd_ops = { .ops_name = "nbd_ops", .close = nbd_ops_close, + .is_read_only = nbd_ops_is_read_only, + .can_extents = nbd_ops_can_extents, + .can_multi_conn = nbd_ops_can_multi_conn, + .start_multi_conn = nbd_ops_start_multi_conn, .flush = nbd_ops_flush, .synch_read = nbd_ops_synch_read, .synch_write = nbd_ops_synch_write, diff --git a/copy/nbdcopy.h b/copy/nbdcopy.h index 69fac2a..94fbdeb 100644 --- a/copy/nbdcopy.h +++ b/copy/nbdcopy.h @@ -36,8 +36,6 @@ */ #define THREAD_WORK_SIZE (128 * 1024 * 1024) -DEFINE_VECTOR_TYPE (handles, struct nbd_handle *) - /* Abstracts the input (src) and output (dst) parameters on the * command line. */ @@ -45,21 +43,20 @@ struct rw { struct rw_ops *ops; /* Operations. */ const char *name; /* Printable name, for error messages etc. */ int64_t size; /* May be -1 for streams. */ - union { - struct { /* For files and pipes. */ - int fd; - struct stat stat; - bool seek_hole_supported; - int sector_size; - } local; - struct { - handles handles; /* For NBD, one handle per connection. */ - bool can_trim, can_zero; /* Cached nbd_can_trim, nbd_can_zero. */ - } nbd; - } u; + /* Followed by private data for the particular subtype. */ }; -extern struct rw src, dst; +extern struct rw *src, *dst; + +/* Create subtypes. */ +extern struct rw *file_create (const char *name, + const struct stat *stat, int fd); +extern struct rw *nbd_rw_create_uri (const char *name, + const char *uri, bool writing); +extern struct rw *nbd_rw_create_subprocess (const char **argv, size_t argc, + bool writing); +extern struct rw *null_create (const char *name); +extern struct rw *pipe_create (const char *name, int fd); /* Underlying data buffers. */ struct buffer { @@ -117,6 +114,28 @@ struct rw_ops { /* Close the connection and free up associated resources. */ void (*close) (struct rw *rw); + /* Return true if this is a read-only connection. */ + bool (*is_read_only) (struct rw *rw); + + /* For source only, does it support reading extents? */ + bool (*can_extents) (struct rw *rw); + + /* Return true if the connection can do multi-conn. This is true + * for files, false for streams, and passed through for NBD. + */ + bool (*can_multi_conn) (struct rw *rw); + + /* For multi-conn capable backends, before copying we must call this + * to begin multi-conn. For NBD this means opening the additional + * connections. + */ + void (*start_multi_conn) (struct rw *rw); + + /* Truncate, only called on output files. This callback can be NULL + * for types that don't support this. + */ + void (*truncate) (struct rw *rw, int64_t size); + /* Flush pending writes to permanent storage. */ void (*flush) (struct rw *rw); @@ -188,10 +207,6 @@ struct rw_ops { uint64_t offset, uint64_t count, extent_list *ret); }; -extern struct rw_ops file_ops; -extern struct rw_ops nbd_ops; -extern struct rw_ops pipe_ops; -extern struct rw_ops null_ops; extern void default_get_extents (struct rw *rw, uintptr_t index, uint64_t offset, uint64_t count, diff --git a/copy/null-ops.c b/copy/null-ops.c index b2ca66f..3262fb5 100644 --- a/copy/null-ops.c +++ b/copy/null-ops.c @@ -30,10 +30,28 @@ * and fast zeroing. */ +static struct rw_ops null_ops; + +struct rw_null { + struct rw rw; +}; + +struct rw * +null_create (const char *name) +{ + struct rw_null *rw = calloc (1, sizeof *rw); + if (rw == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } + + rw->rw.ops = &null_ops; + rw->rw.name = name; + rw->rw.size = INT64_MAX; + return (struct rw *) rw; +} + static void null_close (struct rw *rw) { - /* nothing */ + free (rw); } static void @@ -42,6 +60,30 @@ null_flush (struct rw *rw) /* nothing */ } +static bool +null_is_read_only (struct rw *rw) +{ + return false; +} + +static bool +null_can_extents (struct rw *rw) +{ + return false; +} + +static bool +null_can_multi_conn (struct rw *rw) +{ + return true; +} + +static void +null_start_multi_conn (struct rw *rw) +{ + /* nothing */ +} + static size_t null_synch_read (struct rw *rw, void *data, size_t len, uint64_t offset) @@ -126,9 +168,13 @@ null_get_extents (struct rw *rw, uintptr_t index, abort (); } -struct rw_ops null_ops = { +static struct rw_ops null_ops = { .ops_name = "null_ops", .close = null_close, + .is_read_only = null_is_read_only, + .can_extents = null_can_extents, + .can_multi_conn = null_can_multi_conn, + .start_multi_conn = null_start_multi_conn, .flush = null_flush, .synch_read = null_synch_read, .synch_write = null_synch_write, diff --git a/copy/pipe-ops.c b/copy/pipe-ops.c index d127dad..286e6c0 100644 --- a/copy/pipe-ops.c +++ b/copy/pipe-ops.c @@ -26,10 +26,33 @@ #include "nbdcopy.h" +static struct rw_ops pipe_ops; + +struct rw_pipe { + struct rw rw; + int fd; +}; + +struct rw * +pipe_create (const char *name, int fd) +{ + struct rw_pipe *rwp = calloc (1, sizeof *rwp); + if (rwp == NULL) { perror ("calloc"); exit (EXIT_FAILURE); } + + /* Set size == -1 which means don't know. */ + rwp->rw.ops = &pipe_ops; + rwp->rw.name = name; + rwp->rw.size = -1; + rwp->fd = fd; + return (struct rw *) rwp; +} + static void pipe_close (struct rw *rw) { - if (close (rw->u.local.fd) == -1) { + struct rw_pipe *rwp = (struct rw_pipe *) rw; + + if (close (rwp->fd) == -1) { fprintf (stderr, "%s: close: %m\n", rw->name); exit (EXIT_FAILURE); } @@ -43,13 +66,39 @@ pipe_flush (struct rw *rw) */ } +static bool +pipe_is_read_only (struct rw *rw) +{ + return false; +} + +static bool +pipe_can_extents (struct rw *rw) +{ + return false; +} + +static bool +pipe_can_multi_conn (struct rw *rw) +{ + return false; +} + +static void +pipe_start_multi_conn (struct rw *rw) +{ + /* Should never be called. */ + abort (); +} + static size_t pipe_synch_read (struct rw *rw, void *data, size_t len, uint64_t offset) { + struct rw_pipe *rwp = (struct rw_pipe *) rw; ssize_t r; - r = read (rw->u.local.fd, data, len); + r = read (rwp->fd, data, len); if (r == -1) { perror (rw->name); exit (EXIT_FAILURE); @@ -61,10 +110,11 @@ static void pipe_synch_write (struct rw *rw, const void *data, size_t len, uint64_t offset) { + struct rw_pipe *rwp = (struct rw_pipe *) rw; ssize_t r; while (len > 0) { - r = write (rw->u.local.fd, data, len); + r = write (rwp->fd, data, len); if (r == -1) { perror (rw->name); exit (EXIT_FAILURE); @@ -109,10 +159,16 @@ pipe_in_flight (struct rw *rw, uintptr_t index) return 0; } -struct rw_ops pipe_ops = { +static struct rw_ops pipe_ops = { .ops_name = "pipe_ops", .close = pipe_close, + + .is_read_only = pipe_is_read_only, + .can_extents = pipe_can_extents, + .can_multi_conn = pipe_can_multi_conn, + .start_multi_conn = pipe_start_multi_conn, + .flush = pipe_flush, .synch_read = pipe_synch_read, diff --git a/copy/synch-copying.c b/copy/synch-copying.c index 2712c10..985f005 100644 --- a/copy/synch-copying.c +++ b/copy/synch-copying.c @@ -38,13 +38,13 @@ synch_copying (void) /* If the source size is unknown then we copy data and cannot use * extent information. */ - if (src.size == -1) { + if (src->size == -1) { size_t r; - while ((r = src.ops->synch_read (&src, buf, sizeof buf, offset)) > 0) { - dst.ops->synch_write (&dst, buf, r, offset); + while ((r = src->ops->synch_read (src, buf, sizeof buf, offset)) > 0) { + dst->ops->synch_write (dst, buf, r, offset); offset += r; - progress_bar (offset, src.size); + progress_bar (offset, src->size); } } @@ -52,47 +52,47 @@ synch_copying (void) * blocks and use extent information to optimize the case. */ else { - while (offset < src.size) { + while (offset < src->size) { extent_list exts = empty_vector; - uint64_t count = src.size - offset; + uint64_t count = src->size - offset; size_t i, r; if (count > sizeof buf) count = sizeof buf; if (extents) - src.ops->get_extents (&src, 0, offset, count, &exts); + src->ops->get_extents (src, 0, offset, count, &exts); else - default_get_extents (&src, 0, offset, count, &exts); + default_get_extents (src, 0, offset, count, &exts); for (i = 0; i < exts.size; ++i) { assert (exts.ptr[i].length <= count); if (exts.ptr[i].zero) { - if (!dst.ops->synch_trim (&dst, offset, exts.ptr[i].length) && - !dst.ops->synch_zero (&dst, offset, exts.ptr[i].length)) { + if (!dst->ops->synch_trim (dst, offset, exts.ptr[i].length) && + !dst->ops->synch_zero (dst, offset, exts.ptr[i].length)) { /* If neither trimming nor efficient zeroing are possible, * write zeroes the hard way. */ memset (buf, 0, exts.ptr[i].length); - dst.ops->synch_write (&dst, buf, exts.ptr[i].length, offset); + dst->ops->synch_write (dst, buf, exts.ptr[i].length, offset); } offset += exts.ptr[i].length; } else /* data */ { - r = src.ops->synch_read (&src, buf, exts.ptr[i].length, offset); + r = src->ops->synch_read (src, buf, exts.ptr[i].length, offset); /* These cases should never happen unless the file is * truncated underneath us. */ if (r == 0 || r < exts.ptr[i].length) { - fprintf (stderr, "%s: unexpected end of file\n", src.name); + fprintf (stderr, "%s: unexpected end of file\n", src->name); exit (EXIT_FAILURE); } - dst.ops->synch_write (&dst, buf, r, offset); + dst->ops->synch_write (dst, buf, r, offset); offset += r; - progress_bar (offset, src.size); + progress_bar (offset, src->size); } } -- 2.29.0.rc2