Hi Jones, On Thu, Aug 25, 2022 at 01:10:55PM +0100, Richard W.M. Jones wrote:> This patch adds simple support for a ublk-based NBD client. > It is also available here: > https://gitlab.com/rwmjones/libnbd/-/tree/nbdublk/ublk > > ublk is a way to write Linux block device drivers in userspace:Just looked at your nbdublk implementation a bit, basically it is good, and one really nice work. Also follows two suggestions: 1) the io_uring context is multilexed with ublk io command handling, so we should avoid to block in both ->handle_io_async() and ->handle_event(), otherwise performance may be bad 2) in the implementation of nbd worker thread, there are two sleep points(wait for incoming io command, and network FD), I'd suggest to use poll to wait on any of them Recently I are working to add ublksrv io offloading or aio interfaces on this sort of case in which io_uring can't be used, which may simplified this area, please see the attached patch which applies the above two points against your patch. And obvious improvement can be observed on my simple fio test( randread, io, 4k bs, libaio) against backend of 'nbdkit file'. But these interfaces aren't merged to ublksrv github tree yet, you can find them in the aio branch, and demo_event.c is one example wrt. how to use them: https://github.com/ming1/ubdsrv/tree/aio Actually this interface can be improved further for nbdublk case, and the request allocation isn't needed actually for this direct offloading. But they are added for covering some IOs not from ublk driver, such as meta data, so 'struct ublksrv_aio' is allocated. I will try best to finalize them and merge to master branch. BTW, IOPS on nbdublk(backend: nbdkit file) still has big gap compared with ublk-loop, so I guess in future maybe io_uring should be tried and see if big improvement can be observed. diff --git a/generator/API.ml b/generator/API.ml index 3e948aa..bdd0fb8 100644 --- a/generator/API.ml +++ b/generator/API.ml @@ -2289,6 +2289,26 @@ that eventual action is actually expected - for example, if the connection is established but there are no commands in flight, using an infinite timeout will permanently block). +This function is mainly useful as an example of how you might +integrate libnbd with your own main loop, rather than being +intended as something you would use."; + example = Some "examples/aio-connect-read.c"; + }; + + "poll2", { + default_call with + args = [Int "evt"; Int "timeout" ]; ret = RInt; + shortdesc = "poll the handle once with eventfd"; + longdesc = "\ +This is a simple implementation of L<poll(2)> which is used +internally by synchronous API calls. On success, it returns +C<0> if the C<timeout> (in milliseconds) occurs, or C<1> if +the poll completed and the state machine progressed. Set +C<timeout> to C<-1> to block indefinitely (but be careful +that eventual action is actually expected - for example, if +the connection is established but there are no commands in +flight, using an infinite timeout will permanently block). + This function is mainly useful as an example of how you might integrate libnbd with your own main loop, rather than being intended as something you would use."; @@ -3153,6 +3173,7 @@ let first_version = [ "zero", (1, 0); "block_status", (1, 0); "poll", (1, 0); + "poll2", (1, 0); "aio_connect", (1, 0); "aio_connect_uri", (1, 0); "aio_connect_unix", (1, 0); diff --git a/lib/poll.c b/lib/poll.c index df01d94..e9d7924 100644 --- a/lib/poll.c +++ b/lib/poll.c @@ -27,14 +27,21 @@ #include "internal.h" /* A simple main loop implementation using poll(2). */ -int -nbd_unlocked_poll (struct nbd_handle *h, int timeout) +static int +__nbd_unlocked_poll (struct nbd_handle *h, int evt, int timeout) { - struct pollfd fds[1]; - int r; + struct pollfd fds[2]; + int r, nr_fds = 1; /* fd might be negative, and poll will ignore it. */ fds[0].fd = nbd_unlocked_aio_get_fd (h); + if (evt > 0) { + fds[1].fd = evt; + fds[1].events = POLLIN; + fds[1].revents = 0; + nr_fds = 2; + } + switch (nbd_internal_aio_get_direction (get_next_state (h))) { case LIBNBD_AIO_DIRECTION_READ: fds[0].events = POLLIN; @@ -58,7 +65,7 @@ nbd_unlocked_poll (struct nbd_handle *h, int timeout) * passed to poll. */ do { - r = poll (fds, 1, timeout); + r = poll (fds, nr_fds, timeout); debug (h, "poll end: r=%d revents=%x", r, fds[0].revents); } while (r == -1 && errno == EINTR); @@ -91,3 +98,15 @@ nbd_unlocked_poll (struct nbd_handle *h, int timeout) return 1; } + +int +nbd_unlocked_poll (struct nbd_handle *h, int timeout) +{ + return __nbd_unlocked_poll (h, -1, timeout); +} + +int +nbd_unlocked_poll2 (struct nbd_handle *h, int evt, int timeout) +{ + return __nbd_unlocked_poll (h, evt, timeout); +} diff --git a/ublk/tgt.c b/ublk/tgt.c index 4cdd42a..2ab995a 100644 --- a/ublk/tgt.c +++ b/ublk/tgt.c @@ -35,6 +35,7 @@ #endif #include <ublksrv.h> +#include <ublksrv_aio.h> #include <libnbd.h> @@ -46,14 +47,6 @@ /* Number of seconds to wait for commands to complete when closing the dev. */ #define RELEASE_TIMEOUT 5 -/* List of completed commands. */ -struct completion { - struct ublksrv_queue *q; - int tag; - int res; /* The normal return value, if the command completes OK. */ -}; -DEFINE_VECTOR_TYPE(completions, struct completion) - /* Thread model: * * There are two threads per NBD connection. One thread @@ -69,32 +62,170 @@ struct thread_info { pthread_t io_uring_thread; pthread_t nbd_work_thread; - /* This counts the number of commands in flight. The condition is - * used to allow the operations thread to process commands when - * in_flight goes from 0 -> 1. This is roughly equivalent to - * nbd_aio_in_flight, but we need to count it ourselves in order to - * use the condition. - */ - _Atomic size_t in_flight; - pthread_mutex_t in_flight_mutex; - pthread_cond_t in_flight_cond; - - /* Commands have to be completed on the io_uring thread, but they - * run on the NBD thread. So when the NBD command completes we put - * the command on this queue and they are passed to the io_uring - * thread to call ublksrv_complete_io. - */ - pthread_mutex_t completed_commands_lock; - completions completed_commands; + struct ublksrv_aio_list compl; }; DEFINE_VECTOR_TYPE(thread_infos, struct thread_info) static thread_infos thread_info; static pthread_barrier_t barrier; +static struct ublksrv_aio_ctx *aio_ctx = NULL; static char jbuf[4096]; static pthread_mutex_t jbuf_lock = PTHREAD_MUTEX_INITIALIZER; +/* Command completion callback (called on the NBD thread). */ +static int +command_completed (void *vpdata, int *error) +{ + struct ublksrv_aio *req = vpdata; + int q_id = ublksrv_aio_qid(req->id); + struct ublksrv_queue *q = ublksrv_get_queue(aio_ctx->dev, q_id); + struct ublksrv_aio_list *compl = &thread_info.ptr[q_id].compl; + + if (verbose) + fprintf (stderr, + "%s: command_completed: tag=%d q_id=%zu error=%d\n", + "nbdublk", ublksrv_aio_tag(req->id), + ublksrv_aio_qid(req->id), *error); + + /* If the command failed, override the normal result. */ + if (*error != 0) + req->res = *error; + + pthread_spin_lock(&compl->lock); + aio_list_add(&compl->list, req); + pthread_spin_unlock(&compl->lock); + + return 1; +} + + +int aio_submitter(struct ublksrv_aio_ctx *ctx, + struct ublksrv_aio *req) +{ + const struct ublksrv_io_desc *iod = &req->io; + const unsigned op = ublksrv_get_op (iod); + const unsigned flags = ublksrv_get_flags (iod); + const bool fua = flags & UBLK_IO_F_FUA; + const bool alloc_zero = flags & UBLK_IO_F_NOUNMAP; /* else punch hole */ + const size_t q_id = ublksrv_aio_qid(req->id); /* also the NBD handle number */ + struct nbd_handle *h = nbd.ptr[q_id]; + uint32_t nbd_flags = 0; + int64_t r; + nbd_completion_callback cb; + bool sync = false; + + if (verbose) + fprintf (stderr, "%s: handle_io_async: tag = %d q_id = %zu\n", + "nbdublk", ublksrv_aio_tag(req->id), q_id); + + req->res = iod->nr_sectors << 9; + cb.callback = command_completed; + cb.user_data = req; + cb.free = NULL; + + switch (op) { + case UBLK_IO_OP_READ: + r = nbd_aio_pread (h, (void *) iod->addr, iod->nr_sectors << 9, + iod->start_sector << 9, cb, 0); + if (r == -1) { + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); + return -EINVAL; + } + break; + + case UBLK_IO_OP_WRITE: + if (fua && can_fua) + nbd_flags |= LIBNBD_CMD_FLAG_FUA; + + r = nbd_aio_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9, + iod->start_sector << 9, cb, nbd_flags); + if (r == -1) { + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); + return -EINVAL; + } + break; + + case UBLK_IO_OP_FLUSH: + r = nbd_flush (h, 0); + if (r == -1) { + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); + return -EINVAL; + } + sync = true; + break; + + case UBLK_IO_OP_DISCARD: + if (fua && can_fua) + nbd_flags |= LIBNBD_CMD_FLAG_FUA; + + r = nbd_trim (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); + if (r == -1) { + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); + return -EINVAL; + } + sync = true; + break; + + case UBLK_IO_OP_WRITE_ZEROES: + if (fua && can_fua) + nbd_flags |= LIBNBD_CMD_FLAG_FUA; + + if (alloc_zero) + nbd_flags |= LIBNBD_CMD_FLAG_NO_HOLE; + + r = nbd_zero (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); + if (r == -1) { + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); + return -EINVAL; + } + sync = true; + break; + + default: + fprintf (stderr, "%s: unknown operation %u\n", "nbdublk", op); + return -ENOTSUP; + } + + /* return if this request is completed */ + if (sync) + return 1; + return 0; +} + +static void * +nbd_work_thread (void *vpinfo) +{ + struct thread_info *ti = vpinfo; + struct nbd_handle *h = nbd.ptr[ti->i]; + struct ublksrv_queue *q = ublksrv_get_queue(aio_ctx->dev, ti->i); + struct ublksrv_aio_list *c = &thread_info.ptr[ti->i].compl; + + /* Signal to the main thread that we have initialized. */ + pthread_barrier_wait (&barrier); + + while (!ublksrv_aio_ctx_dead(aio_ctx)) { + struct aio_list compl; + + aio_list_init(&compl); + ublksrv_aio_submit_worker(aio_ctx, aio_submitter, &compl); + + pthread_spin_lock(&c->lock); + aio_list_splice(&c->list, &compl); + pthread_spin_unlock(&c->lock); + + ublksrv_aio_complete_worker(aio_ctx, &compl); + + if (nbd_poll2 (h, aio_ctx->efd, -1) == -1) { + fprintf (stderr, "%s\n", nbd_get_error ()); + exit (EXIT_FAILURE); + } + } + + /*NOTREACHED*/ + return NULL; +} + static void * io_uring_thread (void *vpinfo) { @@ -139,37 +270,6 @@ io_uring_thread (void *vpinfo) return NULL; } -static void * -nbd_work_thread (void *vpinfo) -{ - struct thread_info *thread_info = vpinfo; - const size_t i = thread_info->i; - struct nbd_handle *h = nbd.ptr[i]; - - /* Signal to the main thread that we have initialized. */ - pthread_barrier_wait (&barrier); - - while (1) { - /* Sleep until at least one command is in flight. */ - pthread_mutex_lock (&thread_info->in_flight_mutex); - while (thread_info->in_flight == 0) - pthread_cond_wait (&thread_info->in_flight_cond, - &thread_info->in_flight_mutex); - pthread_mutex_unlock (&thread_info->in_flight_mutex); - - /* Dispatch work while there are commands in flight. */ - while (thread_info->in_flight > 0) { - if (nbd_poll (h, -1) == -1) { - fprintf (stderr, "%s\n", nbd_get_error ()); - exit (EXIT_FAILURE); - } - } - } - - /*NOTREACHED*/ - return NULL; -} - static int set_parameters (struct ublksrv_ctrl_dev *ctrl_dev, const struct ublksrv_dev *dev) @@ -215,6 +315,7 @@ int start_daemon (struct ublksrv_ctrl_dev *ctrl_dev) { const struct ublksrv_ctrl_dev_info *dinfo = &ctrl_dev->dev_info; + int dev_id = ctrl_dev->dev_info.dev_id; struct ublksrv_dev *dev; size_t i; int r; @@ -260,22 +361,21 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev) return -1; } + aio_ctx = ublksrv_aio_ctx_init(dev, 0); + if (!aio_ctx) { + fprintf(stderr, "dev %d call ublk_aio_ctx_init failed\n", dev_id); + return -ENOMEM; + } + /* Create the threads. */ for (i = 0; i < nbd.len; ++i) { /* Note this cannot fail because of previous reserve. */ thread_infos_append (&thread_info, (struct thread_info) - { .dev = dev, .i = i, .in_flight = 0 }); + { .dev = dev, .i = i,}); + + ublksrv_aio_init_list(&thread_info.ptr[i].compl); - r = pthread_mutex_init (&thread_info.ptr[i].in_flight_mutex, NULL); - if (r != 0) - goto bad_pthread; - r = pthread_cond_init (&thread_info.ptr[i].in_flight_cond, NULL); - if (r != 0) - goto bad_pthread; - r = pthread_mutex_init (&thread_info.ptr[i].completed_commands_lock, NULL); - if (r != 0) - goto bad_pthread; r = pthread_create (&thread_info.ptr[i].io_uring_thread, NULL, io_uring_thread, &thread_info.ptr[i]); if (r != 0) @@ -316,25 +416,11 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev) for (i = 0; i < nbd.len; ++i) pthread_join (thread_info.ptr[i].io_uring_thread, NULL); - /* Wait until a timeout while there are NBD commands in flight. */ - time (&st); - while (time (NULL) - st <= RELEASE_TIMEOUT) { - for (i = 0; i < nbd.len; ++i) { - if (thread_info.ptr[i].in_flight > 0) - break; - } - if (i == nbd.len) /* no commands in flight */ - break; - - /* Signal to the operations threads to work. */ - for (i = 0; i < nbd.len; ++i) { - pthread_mutex_lock (&thread_info.ptr[i].in_flight_mutex); - pthread_cond_signal (&thread_info.ptr[i].in_flight_cond); - pthread_mutex_unlock (&thread_info.ptr[i].in_flight_mutex); - } - - sleep (1); + for (i = 0; i < nbd.len; ++i) { + ublksrv_aio_ctx_shutdown(aio_ctx); + pthread_join (thread_info.ptr[i].nbd_work_thread, NULL); } + ublksrv_aio_ctx_deinit(aio_ctx); ublksrv_dev_deinit (dev); //thread_infos_reset (&thread_info); @@ -367,176 +453,31 @@ init_tgt (struct ublksrv_dev *dev, int type, int argc, char *argv[]) return 0; } -/* Command completion callback (called on the NBD thread). */ -static int -command_completed (void *vpdata, int *error) -{ - struct completion *completion = vpdata; - struct ublksrv_queue *q = completion->q; - const size_t i = q->q_id; - - if (verbose) - fprintf (stderr, - "%s: command_completed: tag=%d q_id=%zu res=%d error=%d\n", - "nbdublk", completion->tag, i, completion->res, *error); - - /* If the command failed, override the normal result. */ - if (*error != 0) - completion->res = *error; - - assert (thread_info.ptr[i].in_flight >= 1); - thread_info.ptr[i].in_flight--; - - /* Copy the command to the list of completed commands. - * - * Note *completion is freed by the .free handler that we added to - * this completion callback. - */ - pthread_mutex_lock (&thread_info.ptr[i].completed_commands_lock); - completions_append (&thread_info.ptr[i].completed_commands, *completion); - - /* Signal io_uring thread that the command has been completed. - * It will call us back in a different thread on ->handle_event - * and we can finally complete the command(s) there. - */ - ublksrv_queue_send_event (q); - pthread_mutex_unlock (&thread_info.ptr[i].completed_commands_lock); - - /* Retire the NBD command. */ - return 1; -} - static void -handle_event (struct ublksrv_queue *q) +nbd_handle_event (struct ublksrv_queue *q) { - const size_t i = q->q_id; - size_t j; - if (verbose) - fprintf (stderr, "%s: handle_event: q_id = %d\n", "nbdublk", q->q_id); + fprintf (stderr, "%s: handle_event: q_id = %d\n", "nbdublk", q->q_id); - pthread_mutex_lock (&thread_info.ptr[i].completed_commands_lock); - - for (j = 0; j < thread_info.ptr[i].completed_commands.len; ++j) { - struct completion *completion - &thread_info.ptr[i].completed_commands.ptr[j]; - ublksrv_complete_io (completion->q, completion->tag, completion->res); - } - completions_reset (&thread_info.ptr[i].completed_commands); - ublksrv_queue_handled_event (q); - - pthread_mutex_unlock (&thread_info.ptr[i].completed_commands_lock); + ublksrv_aio_handle_event(aio_ctx, q); } -/* Start a single command. */ -static int -handle_io_async (struct ublksrv_queue *q, int tag) +static int nbd_handle_io_async(struct ublksrv_queue *q, int tag) { - const struct ublksrv_io_desc *iod = ublksrv_get_iod (q, tag); - const unsigned op = ublksrv_get_op (iod); - const unsigned flags = ublksrv_get_flags (iod); - const bool fua = flags & UBLK_IO_F_FUA; - const bool alloc_zero = flags & UBLK_IO_F_NOUNMAP; /* else punch hole */ - const size_t q_id = q->q_id; /* also the NBD handle number */ - struct nbd_handle *h = nbd.ptr[q_id]; - uint32_t nbd_flags = 0; - int64_t r; - nbd_completion_callback cb; - struct completion *completion; + const struct ublksrv_io_desc *iod = ublksrv_get_iod(q, tag); + struct ublksrv_aio *req = ublksrv_aio_alloc_req(aio_ctx, 0); - if (verbose) - fprintf (stderr, "%s: handle_io_async: tag = %d q_id = %zu\n", - "nbdublk", tag, q_id); - - /* Set up a completion callback and its user data. */ - completion = malloc (sizeof *completion); - if (completion == NULL) abort (); - completion->q = q; - completion->tag = tag; - completion->res = iod->nr_sectors << 9; - cb.callback = command_completed; - cb.user_data = completion; - cb.free = free; - - switch (op) { - case UBLK_IO_OP_READ: - r = nbd_aio_pread (h, (void *) iod->addr, iod->nr_sectors << 9, - iod->start_sector << 9, cb, 0); - if (r == -1) { - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); - return 0; - } - break; - - case UBLK_IO_OP_WRITE: - if (fua && can_fua) - nbd_flags |= LIBNBD_CMD_FLAG_FUA; - - r = nbd_aio_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9, - iod->start_sector << 9, cb, nbd_flags); - if (r == -1) { - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); - return 0; - } - break; - - case UBLK_IO_OP_FLUSH: - r = nbd_flush (h, 0); - if (r == -1) { - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); - return 0; - } - break; + req->io = *iod; + req->id = ublksrv_aio_pid_tag(q->q_id, tag); + ublksrv_aio_submit_req(aio_ctx, req); - case UBLK_IO_OP_DISCARD: - if (fua && can_fua) - nbd_flags |= LIBNBD_CMD_FLAG_FUA; - - r = nbd_trim (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); - if (r == -1) { - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); - return 0; - } - break; - - case UBLK_IO_OP_WRITE_ZEROES: - if (fua && can_fua) - nbd_flags |= LIBNBD_CMD_FLAG_FUA; - - if (alloc_zero) - nbd_flags |= LIBNBD_CMD_FLAG_NO_HOLE; - - r = nbd_zero (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); - if (r == -1) { - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); - return 0; - } - break; - - default: - fprintf (stderr, "%s: unknown operation %u\n", "nbdublk", op); - ublksrv_complete_io (q, tag, -ENOTSUP); - return 0; - } - - /* Make sure the corresponding NBD worker sees the command. */ - pthread_mutex_lock (&thread_info.ptr[q_id].in_flight_mutex); - thread_info.ptr[q_id].in_flight++; - pthread_cond_signal (&thread_info.ptr[q_id].in_flight_cond); - pthread_mutex_unlock (&thread_info.ptr[q_id].in_flight_mutex); - - return 0; + return 0; } struct ublksrv_tgt_type tgt_type = { .type = UBLKSRV_TGT_TYPE_NBD, .name = "nbd", .init_tgt = init_tgt, - .handle_io_async = handle_io_async, - .handle_event = handle_event, + .handle_io_async = nbd_handle_io_async, + .handle_event = nbd_handle_event, }; Thanks, Ming
Richard W.M. Jones
2022-Aug-30 08:04 UTC
[Libguestfs] [PATCH libnbd] ublk: Add new nbdublk program
On Tue, Aug 30, 2022 at 10:32:02AM +0800, Ming Lei wrote:> Hi Jones, > > On Thu, Aug 25, 2022 at 01:10:55PM +0100, Richard W.M. Jones wrote: > > This patch adds simple support for a ublk-based NBD client. > > It is also available here: > > https://gitlab.com/rwmjones/libnbd/-/tree/nbdublk/ublk > > > > ublk is a way to write Linux block device drivers in userspace: > > Just looked at your nbdublk implementation a bit, basically it is good, > and one really nice work. > > Also follows two suggestions: > > 1) the io_uring context is multilexed with ublk io command handling, so > we should avoid to block in both ->handle_io_async() and > ->handle_event(), otherwise performance may be badThe nbd_aio_* calls don't block. However I noticed that I made a mistake with the trim and zero paths because I am using synchronous (blocking) nbd_flush / nbd_trim / nbd_zero instead of nbd_aio_flush / nbd_aio_trim / nbd_aio_zero. I will fix this soon. Nothing in handle_event should block except for the call to pthread_mutex_lock. This lock is necessary because new commands can be retired on the nbd_work_thread while handle_event is being called from the io_uring thread.> 2) in the implementation of nbd worker thread, there are two sleep > points(wait for incoming io command, and network FD), I'd suggest to use > poll to wait on any of them > > Recently I are working to add ublksrv io offloading or aio > interfaces on this sort of case in which io_uring can't be used, > which may simplified this area, please see the attached patch which > applies the above two points against your patch. And obvious > improvement can be observed on my simple fio test( randread, io, 4k > bs, libaio) against backend of 'nbdkit file'. > > But these interfaces aren't merged to ublksrv github tree yet, you can find > them in the aio branch, and demo_event.c is one example wrt. how to use > them: > > https://github.com/ming1/ubdsrv/tree/aio > > Actually this interface can be improved further for nbdublk case, > and the request allocation isn't needed actually for this direct > offloading. But they are added for covering some IOs not from ublk > driver, such as meta data, so 'struct ublksrv_aio' is allocated. > I will try best to finalize them and merge to master branch.I didn't really understand what these patches to ubdsrv do when I looked at them before. Maybe add some diagrams?> BTW, IOPS on nbdublk(backend: nbdkit file) still has big gap compared > with ublk-loop, so I guess in future maybe io_uring should be tried and > see if big improvement can be observed.It's always going to be a bit slower because we're converting the requests into a network protocol and passing them to another process. Rich.> > diff --git a/generator/API.ml b/generator/API.ml > index 3e948aa..bdd0fb8 100644 > --- a/generator/API.ml > +++ b/generator/API.ml > @@ -2289,6 +2289,26 @@ that eventual action is actually expected - for example, if > the connection is established but there are no commands in > flight, using an infinite timeout will permanently block). > > +This function is mainly useful as an example of how you might > +integrate libnbd with your own main loop, rather than being > +intended as something you would use."; > + example = Some "examples/aio-connect-read.c"; > + }; > + > + "poll2", { > + default_call with > + args = [Int "evt"; Int "timeout" ]; ret = RInt; > + shortdesc = "poll the handle once with eventfd"; > + longdesc = "\ > +This is a simple implementation of L<poll(2)> which is used > +internally by synchronous API calls. On success, it returns > +C<0> if the C<timeout> (in milliseconds) occurs, or C<1> if > +the poll completed and the state machine progressed. Set > +C<timeout> to C<-1> to block indefinitely (but be careful > +that eventual action is actually expected - for example, if > +the connection is established but there are no commands in > +flight, using an infinite timeout will permanently block). > + > This function is mainly useful as an example of how you might > integrate libnbd with your own main loop, rather than being > intended as something you would use."; > @@ -3153,6 +3173,7 @@ let first_version = [ > "zero", (1, 0); > "block_status", (1, 0); > "poll", (1, 0); > + "poll2", (1, 0); > "aio_connect", (1, 0); > "aio_connect_uri", (1, 0); > "aio_connect_unix", (1, 0); > diff --git a/lib/poll.c b/lib/poll.c > index df01d94..e9d7924 100644 > --- a/lib/poll.c > +++ b/lib/poll.c > @@ -27,14 +27,21 @@ > #include "internal.h" > > /* A simple main loop implementation using poll(2). */ > -int > -nbd_unlocked_poll (struct nbd_handle *h, int timeout) > +static int > +__nbd_unlocked_poll (struct nbd_handle *h, int evt, int timeout) > { > - struct pollfd fds[1]; > - int r; > + struct pollfd fds[2]; > + int r, nr_fds = 1; > > /* fd might be negative, and poll will ignore it. */ > fds[0].fd = nbd_unlocked_aio_get_fd (h); > + if (evt > 0) { > + fds[1].fd = evt; > + fds[1].events = POLLIN; > + fds[1].revents = 0; > + nr_fds = 2; > + } > + > switch (nbd_internal_aio_get_direction (get_next_state (h))) { > case LIBNBD_AIO_DIRECTION_READ: > fds[0].events = POLLIN; > @@ -58,7 +65,7 @@ nbd_unlocked_poll (struct nbd_handle *h, int timeout) > * passed to poll. > */ > do { > - r = poll (fds, 1, timeout); > + r = poll (fds, nr_fds, timeout); > debug (h, "poll end: r=%d revents=%x", r, fds[0].revents); > } while (r == -1 && errno == EINTR); > > @@ -91,3 +98,15 @@ nbd_unlocked_poll (struct nbd_handle *h, int timeout) > > return 1; > } > + > +int > +nbd_unlocked_poll (struct nbd_handle *h, int timeout) > +{ > + return __nbd_unlocked_poll (h, -1, timeout); > +} > + > +int > +nbd_unlocked_poll2 (struct nbd_handle *h, int evt, int timeout) > +{ > + return __nbd_unlocked_poll (h, evt, timeout); > +} > diff --git a/ublk/tgt.c b/ublk/tgt.c > index 4cdd42a..2ab995a 100644 > --- a/ublk/tgt.c > +++ b/ublk/tgt.c > @@ -35,6 +35,7 @@ > #endif > > #include <ublksrv.h> > +#include <ublksrv_aio.h> > > #include <libnbd.h> > > @@ -46,14 +47,6 @@ > /* Number of seconds to wait for commands to complete when closing the dev. */ > #define RELEASE_TIMEOUT 5 > > -/* List of completed commands. */ > -struct completion { > - struct ublksrv_queue *q; > - int tag; > - int res; /* The normal return value, if the command completes OK. */ > -}; > -DEFINE_VECTOR_TYPE(completions, struct completion) > - > /* Thread model: > * > * There are two threads per NBD connection. One thread > @@ -69,32 +62,170 @@ struct thread_info { > pthread_t io_uring_thread; > pthread_t nbd_work_thread; > > - /* This counts the number of commands in flight. The condition is > - * used to allow the operations thread to process commands when > - * in_flight goes from 0 -> 1. This is roughly equivalent to > - * nbd_aio_in_flight, but we need to count it ourselves in order to > - * use the condition. > - */ > - _Atomic size_t in_flight; > - pthread_mutex_t in_flight_mutex; > - pthread_cond_t in_flight_cond; > - > - /* Commands have to be completed on the io_uring thread, but they > - * run on the NBD thread. So when the NBD command completes we put > - * the command on this queue and they are passed to the io_uring > - * thread to call ublksrv_complete_io. > - */ > - pthread_mutex_t completed_commands_lock; > - completions completed_commands; > + struct ublksrv_aio_list compl; > }; > DEFINE_VECTOR_TYPE(thread_infos, struct thread_info) > static thread_infos thread_info; > > static pthread_barrier_t barrier; > +static struct ublksrv_aio_ctx *aio_ctx = NULL; > > static char jbuf[4096]; > static pthread_mutex_t jbuf_lock = PTHREAD_MUTEX_INITIALIZER; > > +/* Command completion callback (called on the NBD thread). */ > +static int > +command_completed (void *vpdata, int *error) > +{ > + struct ublksrv_aio *req = vpdata; > + int q_id = ublksrv_aio_qid(req->id); > + struct ublksrv_queue *q = ublksrv_get_queue(aio_ctx->dev, q_id); > + struct ublksrv_aio_list *compl = &thread_info.ptr[q_id].compl; > + > + if (verbose) > + fprintf (stderr, > + "%s: command_completed: tag=%d q_id=%zu error=%d\n", > + "nbdublk", ublksrv_aio_tag(req->id), > + ublksrv_aio_qid(req->id), *error); > + > + /* If the command failed, override the normal result. */ > + if (*error != 0) > + req->res = *error; > + > + pthread_spin_lock(&compl->lock); > + aio_list_add(&compl->list, req); > + pthread_spin_unlock(&compl->lock); > + > + return 1; > +} > + > + > +int aio_submitter(struct ublksrv_aio_ctx *ctx, > + struct ublksrv_aio *req) > +{ > + const struct ublksrv_io_desc *iod = &req->io; > + const unsigned op = ublksrv_get_op (iod); > + const unsigned flags = ublksrv_get_flags (iod); > + const bool fua = flags & UBLK_IO_F_FUA; > + const bool alloc_zero = flags & UBLK_IO_F_NOUNMAP; /* else punch hole */ > + const size_t q_id = ublksrv_aio_qid(req->id); /* also the NBD handle number */ > + struct nbd_handle *h = nbd.ptr[q_id]; > + uint32_t nbd_flags = 0; > + int64_t r; > + nbd_completion_callback cb; > + bool sync = false; > + > + if (verbose) > + fprintf (stderr, "%s: handle_io_async: tag = %d q_id = %zu\n", > + "nbdublk", ublksrv_aio_tag(req->id), q_id); > + > + req->res = iod->nr_sectors << 9; > + cb.callback = command_completed; > + cb.user_data = req; > + cb.free = NULL; > + > + switch (op) { > + case UBLK_IO_OP_READ: > + r = nbd_aio_pread (h, (void *) iod->addr, iod->nr_sectors << 9, > + iod->start_sector << 9, cb, 0); > + if (r == -1) { > + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); > + return -EINVAL; > + } > + break; > + > + case UBLK_IO_OP_WRITE: > + if (fua && can_fua) > + nbd_flags |= LIBNBD_CMD_FLAG_FUA; > + > + r = nbd_aio_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9, > + iod->start_sector << 9, cb, nbd_flags); > + if (r == -1) { > + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); > + return -EINVAL; > + } > + break; > + > + case UBLK_IO_OP_FLUSH: > + r = nbd_flush (h, 0); > + if (r == -1) { > + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); > + return -EINVAL; > + } > + sync = true; > + break; > + > + case UBLK_IO_OP_DISCARD: > + if (fua && can_fua) > + nbd_flags |= LIBNBD_CMD_FLAG_FUA; > + > + r = nbd_trim (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); > + if (r == -1) { > + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); > + return -EINVAL; > + } > + sync = true; > + break; > + > + case UBLK_IO_OP_WRITE_ZEROES: > + if (fua && can_fua) > + nbd_flags |= LIBNBD_CMD_FLAG_FUA; > + > + if (alloc_zero) > + nbd_flags |= LIBNBD_CMD_FLAG_NO_HOLE; > + > + r = nbd_zero (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); > + if (r == -1) { > + fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); > + return -EINVAL; > + } > + sync = true; > + break; > + > + default: > + fprintf (stderr, "%s: unknown operation %u\n", "nbdublk", op); > + return -ENOTSUP; > + } > + > + /* return if this request is completed */ > + if (sync) > + return 1; > + return 0; > +} > + > +static void * > +nbd_work_thread (void *vpinfo) > +{ > + struct thread_info *ti = vpinfo; > + struct nbd_handle *h = nbd.ptr[ti->i]; > + struct ublksrv_queue *q = ublksrv_get_queue(aio_ctx->dev, ti->i); > + struct ublksrv_aio_list *c = &thread_info.ptr[ti->i].compl; > + > + /* Signal to the main thread that we have initialized. */ > + pthread_barrier_wait (&barrier); > + > + while (!ublksrv_aio_ctx_dead(aio_ctx)) { > + struct aio_list compl; > + > + aio_list_init(&compl); > + ublksrv_aio_submit_worker(aio_ctx, aio_submitter, &compl); > + > + pthread_spin_lock(&c->lock); > + aio_list_splice(&c->list, &compl); > + pthread_spin_unlock(&c->lock); > + > + ublksrv_aio_complete_worker(aio_ctx, &compl); > + > + if (nbd_poll2 (h, aio_ctx->efd, -1) == -1) { > + fprintf (stderr, "%s\n", nbd_get_error ()); > + exit (EXIT_FAILURE); > + } > + } > + > + /*NOTREACHED*/ > + return NULL; > +} > + > static void * > io_uring_thread (void *vpinfo) > { > @@ -139,37 +270,6 @@ io_uring_thread (void *vpinfo) > return NULL; > } > > -static void * > -nbd_work_thread (void *vpinfo) > -{ > - struct thread_info *thread_info = vpinfo; > - const size_t i = thread_info->i; > - struct nbd_handle *h = nbd.ptr[i]; > - > - /* Signal to the main thread that we have initialized. */ > - pthread_barrier_wait (&barrier); > - > - while (1) { > - /* Sleep until at least one command is in flight. */ > - pthread_mutex_lock (&thread_info->in_flight_mutex); > - while (thread_info->in_flight == 0) > - pthread_cond_wait (&thread_info->in_flight_cond, > - &thread_info->in_flight_mutex); > - pthread_mutex_unlock (&thread_info->in_flight_mutex); > - > - /* Dispatch work while there are commands in flight. */ > - while (thread_info->in_flight > 0) { > - if (nbd_poll (h, -1) == -1) { > - fprintf (stderr, "%s\n", nbd_get_error ()); > - exit (EXIT_FAILURE); > - } > - } > - } > - > - /*NOTREACHED*/ > - return NULL; > -} > - > static int > set_parameters (struct ublksrv_ctrl_dev *ctrl_dev, > const struct ublksrv_dev *dev) > @@ -215,6 +315,7 @@ int > start_daemon (struct ublksrv_ctrl_dev *ctrl_dev) > { > const struct ublksrv_ctrl_dev_info *dinfo = &ctrl_dev->dev_info; > + int dev_id = ctrl_dev->dev_info.dev_id; > struct ublksrv_dev *dev; > size_t i; > int r; > @@ -260,22 +361,21 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev) > return -1; > } > > + aio_ctx = ublksrv_aio_ctx_init(dev, 0); > + if (!aio_ctx) { > + fprintf(stderr, "dev %d call ublk_aio_ctx_init failed\n", dev_id); > + return -ENOMEM; > + } > + > /* Create the threads. */ > for (i = 0; i < nbd.len; ++i) { > /* Note this cannot fail because of previous reserve. */ > thread_infos_append (&thread_info, > (struct thread_info) > - { .dev = dev, .i = i, .in_flight = 0 }); > + { .dev = dev, .i = i,}); > + > + ublksrv_aio_init_list(&thread_info.ptr[i].compl); > > - r = pthread_mutex_init (&thread_info.ptr[i].in_flight_mutex, NULL); > - if (r != 0) > - goto bad_pthread; > - r = pthread_cond_init (&thread_info.ptr[i].in_flight_cond, NULL); > - if (r != 0) > - goto bad_pthread; > - r = pthread_mutex_init (&thread_info.ptr[i].completed_commands_lock, NULL); > - if (r != 0) > - goto bad_pthread; > r = pthread_create (&thread_info.ptr[i].io_uring_thread, NULL, > io_uring_thread, &thread_info.ptr[i]); > if (r != 0) > @@ -316,25 +416,11 @@ start_daemon (struct ublksrv_ctrl_dev *ctrl_dev) > for (i = 0; i < nbd.len; ++i) > pthread_join (thread_info.ptr[i].io_uring_thread, NULL); > > - /* Wait until a timeout while there are NBD commands in flight. */ > - time (&st); > - while (time (NULL) - st <= RELEASE_TIMEOUT) { > - for (i = 0; i < nbd.len; ++i) { > - if (thread_info.ptr[i].in_flight > 0) > - break; > - } > - if (i == nbd.len) /* no commands in flight */ > - break; > - > - /* Signal to the operations threads to work. */ > - for (i = 0; i < nbd.len; ++i) { > - pthread_mutex_lock (&thread_info.ptr[i].in_flight_mutex); > - pthread_cond_signal (&thread_info.ptr[i].in_flight_cond); > - pthread_mutex_unlock (&thread_info.ptr[i].in_flight_mutex); > - } > - > - sleep (1); > + for (i = 0; i < nbd.len; ++i) { > + ublksrv_aio_ctx_shutdown(aio_ctx); > + pthread_join (thread_info.ptr[i].nbd_work_thread, NULL); > } > + ublksrv_aio_ctx_deinit(aio_ctx); > > ublksrv_dev_deinit (dev); > //thread_infos_reset (&thread_info); > @@ -367,176 +453,31 @@ init_tgt (struct ublksrv_dev *dev, int type, int argc, char *argv[]) > return 0; > } > > -/* Command completion callback (called on the NBD thread). */ > -static int > -command_completed (void *vpdata, int *error) > -{ > - struct completion *completion = vpdata; > - struct ublksrv_queue *q = completion->q; > - const size_t i = q->q_id; > - > - if (verbose) > - fprintf (stderr, > - "%s: command_completed: tag=%d q_id=%zu res=%d error=%d\n", > - "nbdublk", completion->tag, i, completion->res, *error); > - > - /* If the command failed, override the normal result. */ > - if (*error != 0) > - completion->res = *error; > - > - assert (thread_info.ptr[i].in_flight >= 1); > - thread_info.ptr[i].in_flight--; > - > - /* Copy the command to the list of completed commands. > - * > - * Note *completion is freed by the .free handler that we added to > - * this completion callback. > - */ > - pthread_mutex_lock (&thread_info.ptr[i].completed_commands_lock); > - completions_append (&thread_info.ptr[i].completed_commands, *completion); > - > - /* Signal io_uring thread that the command has been completed. > - * It will call us back in a different thread on ->handle_event > - * and we can finally complete the command(s) there. > - */ > - ublksrv_queue_send_event (q); > - pthread_mutex_unlock (&thread_info.ptr[i].completed_commands_lock); > - > - /* Retire the NBD command. */ > - return 1; > -} > - > static void > -handle_event (struct ublksrv_queue *q) > +nbd_handle_event (struct ublksrv_queue *q) > { > - const size_t i = q->q_id; > - size_t j; > - > if (verbose) > - fprintf (stderr, "%s: handle_event: q_id = %d\n", "nbdublk", q->q_id); > + fprintf (stderr, "%s: handle_event: q_id = %d\n", "nbdublk", q->q_id); > > - pthread_mutex_lock (&thread_info.ptr[i].completed_commands_lock); > - > - for (j = 0; j < thread_info.ptr[i].completed_commands.len; ++j) { > - struct completion *completion > - &thread_info.ptr[i].completed_commands.ptr[j]; > - ublksrv_complete_io (completion->q, completion->tag, completion->res); > - } > - completions_reset (&thread_info.ptr[i].completed_commands); > - ublksrv_queue_handled_event (q); > - > - pthread_mutex_unlock (&thread_info.ptr[i].completed_commands_lock); > + ublksrv_aio_handle_event(aio_ctx, q); > } > > -/* Start a single command. */ > -static int > -handle_io_async (struct ublksrv_queue *q, int tag) > +static int nbd_handle_io_async(struct ublksrv_queue *q, int tag) > { > - const struct ublksrv_io_desc *iod = ublksrv_get_iod (q, tag); > - const unsigned op = ublksrv_get_op (iod); > - const unsigned flags = ublksrv_get_flags (iod); > - const bool fua = flags & UBLK_IO_F_FUA; > - const bool alloc_zero = flags & UBLK_IO_F_NOUNMAP; /* else punch hole */ > - const size_t q_id = q->q_id; /* also the NBD handle number */ > - struct nbd_handle *h = nbd.ptr[q_id]; > - uint32_t nbd_flags = 0; > - int64_t r; > - nbd_completion_callback cb; > - struct completion *completion; > + const struct ublksrv_io_desc *iod = ublksrv_get_iod(q, tag); > + struct ublksrv_aio *req = ublksrv_aio_alloc_req(aio_ctx, 0); > > - if (verbose) > - fprintf (stderr, "%s: handle_io_async: tag = %d q_id = %zu\n", > - "nbdublk", tag, q_id); > - > - /* Set up a completion callback and its user data. */ > - completion = malloc (sizeof *completion); > - if (completion == NULL) abort (); > - completion->q = q; > - completion->tag = tag; > - completion->res = iod->nr_sectors << 9; > - cb.callback = command_completed; > - cb.user_data = completion; > - cb.free = free; > - > - switch (op) { > - case UBLK_IO_OP_READ: > - r = nbd_aio_pread (h, (void *) iod->addr, iod->nr_sectors << 9, > - iod->start_sector << 9, cb, 0); > - if (r == -1) { > - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); > - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); > - return 0; > - } > - break; > - > - case UBLK_IO_OP_WRITE: > - if (fua && can_fua) > - nbd_flags |= LIBNBD_CMD_FLAG_FUA; > - > - r = nbd_aio_pwrite (h, (const void *) iod->addr, iod->nr_sectors << 9, > - iod->start_sector << 9, cb, nbd_flags); > - if (r == -1) { > - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); > - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); > - return 0; > - } > - break; > - > - case UBLK_IO_OP_FLUSH: > - r = nbd_flush (h, 0); > - if (r == -1) { > - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); > - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); > - return 0; > - } > - break; > + req->io = *iod; > + req->id = ublksrv_aio_pid_tag(q->q_id, tag); > + ublksrv_aio_submit_req(aio_ctx, req); > > - case UBLK_IO_OP_DISCARD: > - if (fua && can_fua) > - nbd_flags |= LIBNBD_CMD_FLAG_FUA; > - > - r = nbd_trim (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); > - if (r == -1) { > - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); > - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); > - return 0; > - } > - break; > - > - case UBLK_IO_OP_WRITE_ZEROES: > - if (fua && can_fua) > - nbd_flags |= LIBNBD_CMD_FLAG_FUA; > - > - if (alloc_zero) > - nbd_flags |= LIBNBD_CMD_FLAG_NO_HOLE; > - > - r = nbd_zero (h, iod->nr_sectors << 9, iod->start_sector << 9, nbd_flags); > - if (r == -1) { > - fprintf (stderr, "%s: %s\n", "nbdublk", nbd_get_error ()); > - ublksrv_complete_io (q, tag, - (nbd_get_errno () ? : EINVAL)); > - return 0; > - } > - break; > - > - default: > - fprintf (stderr, "%s: unknown operation %u\n", "nbdublk", op); > - ublksrv_complete_io (q, tag, -ENOTSUP); > - return 0; > - } > - > - /* Make sure the corresponding NBD worker sees the command. */ > - pthread_mutex_lock (&thread_info.ptr[q_id].in_flight_mutex); > - thread_info.ptr[q_id].in_flight++; > - pthread_cond_signal (&thread_info.ptr[q_id].in_flight_cond); > - pthread_mutex_unlock (&thread_info.ptr[q_id].in_flight_mutex); > - > - return 0; > + return 0; > } > > struct ublksrv_tgt_type tgt_type = { > .type = UBLKSRV_TGT_TYPE_NBD, > .name = "nbd", > .init_tgt = init_tgt, > - .handle_io_async = handle_io_async, > - .handle_event = handle_event, > + .handle_io_async = nbd_handle_io_async, > + .handle_event = nbd_handle_event, > }; > > Thanks, > Ming-- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com nbdkit - Flexible, fast NBD server with plugins https://gitlab.com/nbdkit/nbdkit