On Tue, Aug 30, 2022 at 10:32:02AM +0800, Ming Lei wrote:> Hi Jones, > > On Thu, Aug 25, 2022 at 01:10:55PM +0100, Richard W.M. Jones wrote: > > This patch adds simple support for a ublk-based NBD client. > > It is also available here: > > https://gitlab.com/rwmjones/libnbd/-/tree/nbdublk/ublk > > > > ublk is a way to write Linux block device drivers in userspace: > > Just looked at your nbdublk implementation a bit, basically it is good, > and one really nice work. > > Also follows two suggestions: > > 1) the io_uring context is multilexed with ublk io command handling, so > we should avoid to block in both ->handle_io_async() and > ->handle_event(), otherwise performance may be bad > > 2) in the implementation of nbd worker thread, there are two sleep > points(wait for incoming io command, and network FD), I'd suggest to use > poll to wait on any of them > > Recently I are working to add ublksrv io offloading or aio interfaces on this > sort of case in which io_uring can't be used, which may simplified this > area, please see the attached patch which applies the above two points against > your patch. And obvious improvement can be observed on my simple fio test( > randread, io, 4k bs, libaio) against backend of 'nbdkit file'. > > But these interfaces aren't merged to ublksrv github tree yet, you can find > them in the aio branch, and demo_event.c is one example wrt. how to use > them: > > https://github.com/ming1/ubdsrv/tree/aio > > Actually this interface can be improved further for nbdublk case, > and the request allocation isn't needed actually for this direct > offloading. But they are added for covering some IOs not from ublk > driver, such as meta data, so 'struct ublksrv_aio' is allocated. > I will try best to finalize them and merge to master branch. > > BTW, IOPS on nbdublk(backend: nbdkit file) still has big gap compared > with ublk-loop, so I guess in future maybe io_uring should be tried and > see if big improvement can be observed. >The patch sent in last email may cause io hang on MQ, and follows the fixed version: diff --git a/generator/API.ml b/generator/API.ml index 3e948aa..bdd0fb8 100644 --- a/generator/API.ml +++ b/generator/API.ml @@ -2289,6 +2289,26 @@ that eventual action is actually expected - for example, if the connection is established but there are no commands in flight, using an infinite timeout will permanently block). +This function is mainly useful as an example of how you might +integrate libnbd with your own main loop, rather than being +intended as something you would use."; + example = Some "examples/aio-connect-read.c"; + }; + + "poll2", { + default_call with + args = [Int "evt"; Int "timeout" ]; ret = RInt; + shortdesc = "poll the handle once with eventfd"; + longdesc = "\ +This is a simple implementation of L<poll(2)> which is used +internally by synchronous API calls. On success, it returns +C<0> if the C<timeout> (in milliseconds) occurs, or C<1> if +the poll completed and the state machine progressed. Set +C<timeout> to C<-1> to block indefinitely (but be careful +that eventual action is actually expected - for example, if +the connection is established but there are no commands in +flight, using an infinite timeout will permanently block). + This function is mainly useful as an example of how you might integrate libnbd with your own main loop, rather than being intended as something you would use."; @@ -3153,6 +3173,7 @@ let first_version = [ "zero", (1, 0); "block_status", (1, 0); "poll", (1, 0); + "poll2", (1, 0); "aio_connect", (1, 0); "aio_connect_uri", (1, 0); "aio_connect_unix", (1, 0); diff --git a/lib/poll.c b/lib/poll.c index df01d94..e9d7924 100644 --- a/lib/poll.c +++ b/lib/poll.c @@ -27,14 +27,21 @@ #include "internal.h" /* A simple main loop implementation using poll(2). */ -int -nbd_unlocked_poll (struct nbd_handle *h, int timeout) +static int +__nbd_unlocked_poll (struct nbd_handle *h, int evt, int timeout) { - struct pollfd fds[1]; - int r; + struct pollfd fds[2]; + int r, nr_fds = 1; /* fd might be negative, and poll will ignore it. */ fds[0].fd = nbd_unlocked_aio_get_fd (h); + if (evt > 0) { + fds[1].fd = evt; + fds[1].events = POLLIN; + fds[1].revents = 0; + nr_fds = 2; + } + switch (nbd_internal_aio_get_direction (get_next_state (h))) { case LIBNBD_AIO_DIRECTION_READ: fds[0].events = POLLIN; @@ -58,7 +65,7 @@ nbd_unlocked_poll (struct nbd_handle *h, int timeout) * passed to poll. */ do { - r = poll (fds, 1, timeout); + r = poll (fds, nr_fds, timeout); debug (h, "poll end: r=%d revents=%x", r, fds[0].revents); } while (r == -1 && errno == EINTR); @@ -91,3 +98,15 @@ nbd_unlocked_poll (struct nbd_handle *h, int timeout) return 1; } + +int +nbd_unlocked_poll (struct nbd_handle *h, int timeout) +{ + return __nbd_unlocked_poll (h, -1, timeout); +} + +int +nbd_unlocked_poll2 (struct nbd_handle *h, int evt, int timeout) +{ + return __nbd_unlocked_poll (h, evt, timeout); +} diff --git a/ublk/tgt.c b/ublk/tgt.c index 4cdd42a..5b478ae 100644 --- a/ublk/tgt.c +++ b/ublk/tgt.c @@ -35,6 +35,7 @@ #endif #include <ublksrv.h> +#include <ublksrv_aio.h> #include <libnbd.h> @@ -46,14 +47,6 @@ /* Number of seconds to wait for commands to complete when closing the dev. */ #define RELEASE_TIMEOUT 5 -/* List of completed commands. */ -struct completion { - struct ublksrv_queue *q; - int tag; - int res; /* The normal return value, if the command completes OK. */ -}; -DEFINE_VECTOR_TYPE(completions, struct completion) - /* Thread model: * * There are two threads per NBD connection. One thread @@ -69,23 +62,8 @@ struct thread_info { pthread_t io_uring_thread; pthread_t nbd_work_thread; - /* This counts the number of commands in flight. The condition is - * used to allow the operations thread to process commands when - * in_flight goes from 0 -> 1. This is roughly equivalent to - * nbd_aio_in_flight, but we need to count it ourselves in order to - * use the condition. - */ - _Atomic size_t in_flight; - pthread_mutex_t in_flight_mutex; - pthread_cond_t in_flight_cond; - - /* Commands have to be completed on the io_uring thread, but they - * run on the NBD thread. So when the NBD command completes we put - * the command on this queue and they are passed to the io_uring - * thread to call ublksrv_complete_io. - */ - pthread_mutex_t completed_commands_lock; - completions completed_commands; + struct ublksrv_aio_ctx *aio_ctx; + struct ublksrv_aio_list compl; }; DEFINE_VECTOR_TYPE(thread_infos, struct thread_info) static thread_infos thread_info; @@ -95,6 +73,161 @@ static pthread_barrier_t barrier; static char jbuf[4096]; static pthread_mutex_t jbuf_lock = PTHREAD_MUTEX_INITIALIZER; +/* Command completion callback (called on the NBD thread). */ +static int +command_completed (void *vpdata, int *error) +{ + struct ublksrv_aio *req = vpdata; + int q_id = ublksrv_aio_qid(req->id); + struct ublksrv_aio_ctx *aio_ctx = thread_info.ptr[q_id].aio_ctx; + struct ublksrv_queue *q = ublksrv_get_queue(aio_ctx->dev, q_id); + struct ublksrv_aio_list *compl = &thread_info.ptr[q_id].compl; + + if (verbose) + fprintf (stderr, + "%s: command_completed: tag=%d q_id=%zu error=%d\n", + "nbdublk", ublksrv_aio_tag(req->id), + ublksrv_aio_qid(req->id), *error); + + /* If the command failed, override the normal result. */ + if (*error != 0) + req->res = *error; + + pthread_spin_lock(&compl->lock); + aio_list_add(&compl->list, req); + pthread_spin_unlock(&compl->lock); + + return 1; +} + + +int aio_submitter(struct ublksrv_aio_ctx *ctx, + struct ublksrv_aio *req) +{ + const struct ublksrv_io_desc *iod = &req->io; + const unsigned op = ublksrv_get_op (iod); + const unsigned flags = ublksrv_get_flags (iod); + const bool fua = flags & UBLK_IO_F_FUA; + const bool alloc_zero = flags & UBLK_IO_F_NOUNMAP; /* else punch hole */ + const size_t q_id = ublksrv_aio_qid(req->id); /* also the NBD handle number */ + struct nbd_handle *h = nbd.ptr[q_id]; + uint32_t nbd_flags = 0; + int64_t r; + nbd_completion_callback cb; + bool sync = false; + + if (verbose) + fprintf (stderr, "%s: handle_io_async: tag = %d q_id = %zu\n", + "nbdublk", ublksrv_aio_tag(req->id), q_id); + + req->res = iod->nr_sectors << 9; + cb.callback = command_completed; + cb.user_data = req; + cb.free = NULL; + + switch (op) { + case UBLK_IO_OP_READ: + r = nbd_aio_pread (h, (void *) iod->addr, iod->nr_sectors << 9, + iod->start_sector << 9, cb, 0); + if (r == -1) { + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); + return -EINVAL; + } + break; + + case UBLK_IO_OP_WRITE: + if (fua && can_fua) + nbd_flags |= LIBNBD_CMD_FLAG_FUA; + + r = nbd_aio_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9, + iod->start_sector << 9, cb, nbd_flags); + if (r == -1) { + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); + return -EINVAL; + } + break; + + case UBLK_IO_OP_FLUSH: + r = nbd_flush (h, 0); + if (r == -1) { + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); + return -EINVAL; + } + sync = true; + break; + + case UBLK_IO_OP_DISCARD: + if (fua && can_fua) + nbd_flags |= LIBNBD_CMD_FLAG_FUA; + + r = nbd_trim (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); + if (r == -1) { + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); + return -EINVAL; + } + sync = true; + break; + + case UBLK_IO_OP_WRITE_ZEROES: + if (fua && can_fua) + nbd_flags |= LIBNBD_CMD_FLAG_FUA; + + if (alloc_zero) + nbd_flags |= LIBNBD_CMD_FLAG_NO_HOLE; + + r = nbd_zero (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); + if (r == -1) { + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); + return -EINVAL; + } + sync = true; + break; + + default: + fprintf (stderr, "%s: unknown operation %u\n", "nbdublk", op); + return -ENOTSUP; + } + + /* return if this request is completed */ + if (sync) + return 1; + return 0; +} + +static void * +nbd_work_thread (void *vpinfo) +{ + struct thread_info *ti = vpinfo; + struct nbd_handle *h = nbd.ptr[ti->i]; + struct ublksrv_aio_ctx *aio_ctx = thread_info.ptr[ti->i].aio_ctx; + struct ublksrv_queue *q = ublksrv_get_queue(aio_ctx->dev, ti->i); + struct ublksrv_aio_list *c = &thread_info.ptr[ti->i].compl; + + /* Signal to the main thread that we have initialized. */ + pthread_barrier_wait (&barrier); + + while (!ublksrv_aio_ctx_dead(aio_ctx)) { + struct aio_list compl; + + aio_list_init(&compl); + ublksrv_aio_submit_worker(aio_ctx, aio_submitter, &compl); + + pthread_spin_lock(&c->lock); + aio_list_splice(&c->list, &compl); + pthread_spin_unlock(&c->lock); + + ublksrv_aio_complete_worker(aio_ctx, &compl); + + if (nbd_poll2 (h, aio_ctx->efd, -1) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + + /*NOTREACHED*/ + return NULL; +} + static void * io_uring_thread (void *vpinfo) { @@ -139,37 +272,6 @@ io_uring_thread (void *vpinfo) return NULL; } -static void * -nbd_work_thread (void *vpinfo) -{ - struct thread_info *thread_info = vpinfo; - const size_t i = thread_info->i; - struct nbd_handle *h = nbd.ptr[i]; - - /* Signal to the main thread that we have initialized. */ - pthread_barrier_wait (&barrier); - - while (1) { - /* Sleep until at least one command is in flight. */ - pthread_mutex_lock (&thread_info->in_flight_mutex); - while (thread_info->in_flight == 0) - pthread_cond_wait (&thread_info->in_flight_cond, - &thread_info->in_flight_mutex); - pthread_mutex_unlock (&thread_info->in_flight_mutex); - - /* Dispatch work while there are commands in flight. */ - while (thread_info->in_flight > 0) { - if (nbd_poll (h, -1) == -1) { - fprintf (stderr, "%s\n", nbd_get_error ()); - exit (EXIT_FAILURE); - } - } - } - - /*NOTREACHED*/ - return NULL; -} - static int set_parameters (struct ublksrv_ctrl_dev *ctrl_dev, const struct ublksrv_dev *dev) @@ -215,6 +317,7 @@ int start_daemon (struct ublksrv_ctrl_dev *ctrl_dev) { const struct ublksrv_ctrl_dev_info *dinfo = &ctrl_dev->dev_info; + int dev_id = ctrl_dev->dev_info.dev_id; struct ublksrv_dev *dev; size_t i; int r; @@ -265,17 +368,16 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev) /* Note this cannot fail because of previous reserve. */ thread_infos_append (&thread_info, (struct thread_info) - { .dev = dev, .i = i, .in_flight = 0 }); + { .dev = dev, .i = i,}); + + thread_info.ptr[i].aio_ctx = ublksrv_aio_ctx_init(dev, 0); + if (!thread_info.ptr[i].aio_ctx) { + fprintf(stderr, "dev %d queue %d call ublk_aio_ctx_init failed\n", + dev_id, i); + return -ENOMEM; + } + ublksrv_aio_init_list(&thread_info.ptr[i].compl); - r = pthread_mutex_init (&thread_info.ptr[i].in_flight_mutex, NULL); - if (r != 0) - goto bad_pthread; - r = pthread_cond_init (&thread_info.ptr[i].in_flight_cond, NULL); - if (r != 0) - goto bad_pthread; - r = pthread_mutex_init (&thread_info.ptr[i].completed_commands_lock, NULL); - if (r != 0) - goto bad_pthread; r = pthread_create (&thread_info.ptr[i].io_uring_thread, NULL, io_uring_thread, &thread_info.ptr[i]); if (r != 0) @@ -316,24 +418,10 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev) for (i = 0; i < nbd.len; ++i) pthread_join (thread_info.ptr[i].io_uring_thread, NULL); - /* Wait until a timeout while there are NBD commands in flight. */ - time (&st); - while (time (NULL) - st <= RELEASE_TIMEOUT) { - for (i = 0; i < nbd.len; ++i) { - if (thread_info.ptr[i].in_flight > 0) - break; - } - if (i == nbd.len) /* no commands in flight */ - break; - - /* Signal to the operations threads to work. */ - for (i = 0; i < nbd.len; ++i) { - pthread_mutex_lock (&thread_info.ptr[i].in_flight_mutex); - pthread_cond_signal (&thread_info.ptr[i].in_flight_cond); - pthread_mutex_unlock (&thread_info.ptr[i].in_flight_mutex); - } - - sleep (1); + for (i = 0; i < nbd.len; ++i) { + ublksrv_aio_ctx_shutdown(thread_info.ptr[i].aio_ctx); + pthread_join (thread_info.ptr[i].nbd_work_thread, NULL); + ublksrv_aio_ctx_deinit(thread_info.ptr[i].aio_ctx); } ublksrv_dev_deinit (dev); @@ -367,176 +455,37 @@ init_tgt (struct ublksrv_dev *dev, int type, int argc, char *argv[]) return 0; } -/* Command completion callback (called on the NBD thread). */ -static int -command_completed (void *vpdata, int *error) -{ - struct completion *completion = vpdata; - struct ublksrv_queue *q = completion->q; - const size_t i = q->q_id; - - if (verbose) - fprintf (stderr, - "%s: command_completed: tag=%d q_id=%zu res=%d error=%d\n", - "nbdublk", completion->tag, i, completion->res, *error); - - /* If the command failed, override the normal result. */ - if (*error != 0) - completion->res = *error; - - assert (thread_info.ptr[i].in_flight >= 1); - thread_info.ptr[i].in_flight--; - - /* Copy the command to the list of completed commands. - * - * Note *completion is freed by the .free handler that we added to - * this completion callback. - */ - pthread_mutex_lock (&thread_info.ptr[i].completed_commands_lock); - completions_append (&thread_info.ptr[i].completed_commands, *completion); - - /* Signal io_uring thread that the command has been completed. - * It will call us back in a different thread on ->handle_event - * and we can finally complete the command(s) there. - */ - ublksrv_queue_send_event (q); - pthread_mutex_unlock (&thread_info.ptr[i].completed_commands_lock); - - /* Retire the NBD command. */ - return 1; -} - static void -handle_event (struct ublksrv_queue *q) +nbd_handle_event (struct ublksrv_queue *q) { - const size_t i = q->q_id; - size_t j; + struct ublksrv_aio_ctx *aio_ctx = thread_info.ptr[q->q_id].aio_ctx; if (verbose) - fprintf (stderr, "%s: handle_event: q_id = %d\n", "nbdublk", q->q_id); + fprintf (stderr, "%s: handle_event: q_id = %d\n", "nbdublk", q->q_id); - pthread_mutex_lock (&thread_info.ptr[i].completed_commands_lock); - - for (j = 0; j < thread_info.ptr[i].completed_commands.len; ++j) { - struct completion *completion - &thread_info.ptr[i].completed_commands.ptr[j]; - ublksrv_complete_io (completion->q, completion->tag, completion->res); - } - completions_reset (&thread_info.ptr[i].completed_commands); - ublksrv_queue_handled_event (q); - - pthread_mutex_unlock (&thread_info.ptr[i].completed_commands_lock); + ublksrv_aio_handle_event(aio_ctx, q); } -/* Start a single command. */ -static int -handle_io_async (struct ublksrv_queue *q, int tag) +static int nbd_handle_io_async(struct ublksrv_queue *q, int tag) { - const struct ublksrv_io_desc *iod = ublksrv_get_iod (q, tag); - const unsigned op = ublksrv_get_op (iod); - const unsigned flags = ublksrv_get_flags (iod); - const bool fua = flags & UBLK_IO_F_FUA; - const bool alloc_zero = flags & UBLK_IO_F_NOUNMAP; /* else punch hole */ - const size_t q_id = q->q_id; /* also the NBD handle number */ - struct nbd_handle *h = nbd.ptr[q_id]; - uint32_t nbd_flags = 0; - int64_t r; - nbd_completion_callback cb; - struct completion *completion; - - if (verbose) - fprintf (stderr, "%s: handle_io_async: tag = %d q_id = %zu\n", - "nbdublk", tag, q_id); - - /* Set up a completion callback and its user data. */ - completion = malloc (sizeof *completion); - if (completion == NULL) abort (); - completion->q = q; - completion->tag = tag; - completion->res = iod->nr_sectors << 9; - cb.callback = command_completed; - cb.user_data = completion; - cb.free = free; - - switch (op) { - case UBLK_IO_OP_READ: - r = nbd_aio_pread (h, (void *) iod->addr, iod->nr_sectors << 9, - iod->start_sector << 9, cb, 0); - if (r == -1) { - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); - return 0; - } - break; - - case UBLK_IO_OP_WRITE: - if (fua && can_fua) - nbd_flags |= LIBNBD_CMD_FLAG_FUA; - - r = nbd_aio_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9, - iod->start_sector << 9, cb, nbd_flags); - if (r == -1) { - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); - return 0; - } - break; - - case UBLK_IO_OP_FLUSH: - r = nbd_flush (h, 0); - if (r == -1) { - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); - return 0; - } - break; - - case UBLK_IO_OP_DISCARD: - if (fua && can_fua) - nbd_flags |= LIBNBD_CMD_FLAG_FUA; - - r = nbd_trim (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); - if (r == -1) { - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); - return 0; - } - break; - - case UBLK_IO_OP_WRITE_ZEROES: - if (fua && can_fua) - nbd_flags |= LIBNBD_CMD_FLAG_FUA; - - if (alloc_zero) - nbd_flags |= LIBNBD_CMD_FLAG_NO_HOLE; - - r = nbd_zero (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); - if (r == -1) { - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); - return 0; - } - break; - - default: - fprintf (stderr, "%s: unknown operation %u\n", "nbdublk", op); - ublksrv_complete_io (q, tag, -ENOTSUP); - return 0; - } - - /* Make sure the corresponding NBD worker sees the command. */ - pthread_mutex_lock (&thread_info.ptr[q_id].in_flight_mutex); - thread_info.ptr[q_id].in_flight++; - pthread_cond_signal (&thread_info.ptr[q_id].in_flight_cond); - pthread_mutex_unlock (&thread_info.ptr[q_id].in_flight_mutex); - - return 0; + struct ublksrv_aio_ctx *aio_ctx = thread_info.ptr[q->q_id].aio_ctx; + const struct ublksrv_io_desc *iod = ublksrv_get_iod(q, tag); + struct ublksrv_aio *req = ublksrv_aio_alloc_req(aio_ctx, 0); + + req->io = *iod; + req->id = ublksrv_aio_pid_tag(q->q_id, tag); + if (verbose) + fprintf (stderr, "%s %d qid %d tag %d\n", __func__, __LINE__, + q->q_id, tag); + ublksrv_aio_submit_req(aio_ctx, req); + + return 0; } struct ublksrv_tgt_type tgt_type = { .type = UBLKSRV_TGT_TYPE_NBD, .name = "nbd", .init_tgt = init_tgt, - .handle_io_async = handle_io_async, - .handle_event = handle_event, + .handle_io_async = nbd_handle_io_async, + .handle_event = nbd_handle_event, }; -- Ming
Richard W.M. Jones
2022-Aug-30 14:38 UTC
[Libguestfs] [PATCH libnbd] ublk: Add new nbdublk program
On Tue, Aug 30, 2022 at 03:12:23PM +0800, Ming Lei wrote:> The patch sent in last email may cause io hang on MQ, and follows the fixed > version:I split this into two commits and cleaned them up and posted them here: https://gitlab.com/rwmjones/libnbd/-/commits/nbdublk/ Unfortunately this doesn't work for me. When I do various filesystem operations like git clone and a compile I see some subtle disk errors and eventually it deadlocks, so I guess there is some problem. It might be related to the changes that I made, but I'm fairly sure it's not as I only made small fixes to remove unused variables and fix types in printf statements and that sort of thing. Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-builder quickly builds VMs from scratch http://libguestfs.org/virt-builder.1.html