Eric Blake
2023-Aug-04 16:38 UTC
[Libguestfs] [PATCH nbdkit v2 8/9] curl: Use curl multi interface
On Fri, Jul 28, 2023 at 06:17:52PM +0100, Richard W.M. Jones wrote:> See the comment at the top of plugins/curl/pool.c for general > information about how this works. > > This makes a very large difference to performance over the previous > implementation. Note for the tests below I also applied the next > commit changing the behaviour of the connections parameter. > > Using this test case: > > $ uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img > $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri null' > > The times are as follows: > > multi, connections=64 21.5s > multi, connections=32 30.2s > multi, connections=16 56.0s > before this commit 166sAwesome performance improvements! As painful as this series has been for you to write and debug, it is showing its worth.> --- > plugins/curl/curldefs.h | 35 ++-- > plugins/curl/config.c | 246 --------------------------- > plugins/curl/curl.c | 366 +++++++++++++++++++++++++++++++++++----- > plugins/curl/pool.c | 346 ++++++++++++++++++++++++++++--------- > 4 files changed, 616 insertions(+), 377 deletions(-)Finally taking time to review this, even though it is already in-tree.> @@ -98,8 +88,30 @@ struct curl_handle { > const char *read_buf; > uint32_t read_count; > > + /* This field is used by curl_get_size. */ > + bool accept_range; > + > /* Used by scripts.c */ > struct curl_slist *headers_copy; > + > + /* Used by pool.c */ > + struct command *cmd; > +}; > + > +/* Asynchronous commands that can be sent to the pool thread. */ > +enum command_type { EASY_HANDLE, STOP }; > +struct command { > + /* These fields are set by the caller. */ > + enum command_type type; /* command */ > + struct curl_handle *ch; /* for EASY_HANDLE, the easy handle */ > + > + /* This field is set to a unique value by send_command_and_wait. */ > + uint64_t id; /* serial number */ > + > + /* These fields are used to signal back that the command finished. */ > + pthread_mutex_t mutex; /* completion mutex */ > + pthread_cond_t cond; /* completion condition */ > + CURLcode status; /* status code (CURLE_OK = succeeded) */ > };Makes sense. The two types are mutually recursive (curl_handle includes a struct command *; command includes a struct curl_handle *); hopefully you have proper locking when altering multiple objects to adjust how they point to one another.> +++ b/plugins/curl/config.c> +++ b/plugins/curl/curl.c > > +/* Get the file size. */ > +static int get_content_length_accept_range (struct curl_handle *ch); > +static bool try_fallback_GET_method (struct curl_handle *ch); > +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque); > +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque); > + > +static int64_t > +curl_get_size (void *handle) > +{ > + struct curl_handle *ch; > + CURLcode r; > + long code; > +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T > + curl_off_t o; > +#else > + double d; > +#endif > + int64_t exportsize; > + > + /* Get a curl easy handle. */ > + ch = allocate_handle (); > + if (ch == NULL) goto err; > + > + /* Prepare to read the headers. */ > + if (get_content_length_accept_range (ch) == -1) > + goto err; > + > + /* Send the command to the worker thread and wait. */ > + struct command cmd = { > + .type = EASY_HANDLE, > + .ch = ch, > + }; > + > + r = send_command_and_wait (&cmd); > + update_times (ch->c); > + if (r != CURLE_OK) { > + display_curl_error (ch, r, > + "problem doing HEAD request to fetch size of URL [%s]", > + url); > + > + /* Get the HTTP status code, if available. */ > + r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code); > + if (r == CURLE_OK) > + nbdkit_debug ("HTTP status code: %ld", code); > + else > + code = -1; > + > + /* See comment on try_fallback_GET_method below. */ > + if (code != 403 || !try_fallback_GET_method (ch)) > + goto err; > + } > + > + /* Get the content length. > + * > + * Note there is some subtlety here: For web servers using chunked > + * encoding, either the Content-Length header will not be present, > + * or if present it should be ignored. (For such servers the only > + * way to find out the true length would be to read all of the > + * content, which we don't want to do). > + * > + * Curl itself resolves this for us. It will ignore the > + * Content-Length header if chunked encoding is used, returning the > + * length as -1 which we check below (see also > + * curl:lib/http.c:Curl_http_size). > + */ > +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T > + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o); > + if (r != CURLE_OK) { > + display_curl_error (ch, r, > + "could not get length of remote file [%s]", url); > + goto err; > + } > + > + if (o == -1) { > + nbdkit_error ("could not get length of remote file [%s], " > + "is the URL correct?", url); > + goto err; > + } > + > + exportsize = o; > +#else > + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d); > + if (r != CURLE_OK) { > + display_curl_error (ch, r, > + "could not get length of remote file [%s]", url); > + goto err; > + } > + > + if (d == -1) { > + nbdkit_error ("could not get length of remote file [%s], " > + "is the URL correct?", url); > + goto err; > + } > + > + exportsize = d;Does curl guarantee that the double d will contain a value assignable to int64_t without overflow/truncation? For particularly large sizes, double has insufficient precision for all possible file sizes, but I doubt someone is exposing such large files over HTTP.> +#endif > + nbdkit_debug ("content length: %" PRIi64, exportsize); > + > + /* If this is HTTP, check that byte ranges are supported. */ > + if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 || > + ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) { > + if (!ch->accept_range) { > + nbdkit_error ("server does not support 'range' (byte range) requests"); > + goto err; > + } > + > + nbdkit_debug ("accept range supported (for HTTP/HTTPS)"); > + } > + > + free_handle (ch); > + return exportsize; > + > + err: > + if (ch) > + free_handle (ch); > + return -1; > +} > + > +/* Get the file size and also whether the remote HTTP server > + * supports byte ranges. > + */ > +static int > +get_content_length_accept_range (struct curl_handle *ch) > +{ > + /* We must run the scripts if necessary and set headers in the > + * handle. > + */ > + if (do_scripts (ch) == -1) > + return -1; > + > + /* Set this flag in the handle to false. The callback should set it > + * to true if byte ranges are supported, which we check below. > + */ > + ch->accept_range = false; > + > + /* No Body, not nobody! This forces a HEAD request. */ > + curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L); > + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); > + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); > + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL); > + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL); > + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL); > + curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL); > + return 0; > +} > + > +/* S3 servers can return 403 Forbidden for HEAD but still respond > + * to GET, so we give it a second chance in that case. > + * https://github.com/kubevirt/containerized-data-importer/issues/2737 > + * > + * This function issues a GET request with a writefunction that always > + * returns an error, thus effectively getting the headers but > + * abandoning the transfer as soon as possible after. > + */ > +static bool > +try_fallback_GET_method (struct curl_handle *ch) > +{ > + CURLcode r; > + > + nbdkit_debug ("attempting to fetch headers using GET method"); > + > + curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L); > + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); > + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); > + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb); > + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); > + > + struct command cmd = { > + .type = EASY_HANDLE, > + .ch = ch, > + }; > + > + r = send_command_and_wait (&cmd); > + update_times (ch->c); > + > + /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too > + * (eg if the remote has zero length). Other errors might happen > + * but we ignore them since it is a fallback path. > + */ > + return r == CURLE_OK || r == CURLE_WRITE_ERROR; > +} > + > +static size_t > +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque) > +{ > + struct curl_handle *ch = opaque; > + size_t realsize = size * nmemb; > + const char *header = ptr; > + const char *end = header + realsize; > + const char *accept_ranges = "accept-ranges:"; > + const char *bytes = "bytes"; > + > + if (realsize >= strlen (accept_ranges) && > + ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) { > + const char *p = strchr (header, ':') + 1; > + > + /* Skip whitespace between the header name and value. */ > + while (p < end && *p && ascii_isspace (*p))Technically, '*p && ascii_isspace (*p)' can be shortened to 'ascii_isspace (*p)', since the NUL byte is not ascii space. I don't know if the compiler is smart enough to make that optimization on your behalf.> + p++; > + > + if (end - p >= strlen (bytes) > + && strncmp (p, bytes, strlen (bytes)) == 0) { > + /* Check that there is nothing but whitespace after the value. */ > + p += strlen (bytes); > + while (p < end && *p && ascii_isspace (*p))Another spot of the same.> + p++; > + > + if (p == end || !*p) > + ch->accept_range = true; > + } > + } > + > + return realsize; > +} > + > +static size_t > +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque) > +{ > +#ifdef CURL_WRITEFUNC_ERROR > + return CURL_WRITEFUNC_ERROR; > +#else > + return 0; /* in older curl, any size < requested will also be an error */ > +#endif > +} > + > /* Read data from the remote server. */ > +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque); > + > static int > curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > { > CURLcode r; > + struct curl_handle *ch; > char range[128]; > > - GET_HANDLE_FOR_CURRENT_SCOPE (ch); > - if (ch == NULL) > - return -1; > + /* Get a curl easy handle. */ > + ch = allocate_handle (); > + if (ch == NULL) goto err; > > /* Run the scripts if necessary and set headers in the handle. */ > - if (do_scripts (ch) == -1) return -1; > + if (do_scripts (ch) == -1) goto err; > > /* Tell the write_cb where we want the data to be written. write_cb > * will update this if the data comes in multiple sections. > */ > + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb); > + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); > ch->write_buf = buf; > ch->write_count = count; > > @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > offset, offset + count); > curl_easy_setopt (ch->c, CURLOPT_RANGE, range); > > - /* The assumption here is that curl will look after timeouts. */ > - r = curl_easy_perform (ch->c); > + /* Send the command to the worker thread and wait. */ > + struct command cmd = { > + .type = EASY_HANDLE, > + .ch = ch, > + }; > + > + r = send_command_and_wait (&cmd); > if (r != CURLE_OK) { > - display_curl_error (ch, r, "pread: curl_easy_perform"); > - return -1; > + display_curl_error (ch, r, "pread"); > + goto err; > } > update_times (ch->c); > > @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > /* As far as I understand the cURL API, this should never happen. */ > assert (ch->write_count == 0); > > + free_handle (ch); > return 0; > + > + err: > + if (ch) > + free_handle (ch); > + return -1; > +} > + > +/* NB: The terminology used by libcurl is confusing! > + * > + * WRITEFUNCTION / write_cb is used when reading from the remote server > + * READFUNCTION / read_cb is used when writing to the remote server. > + * > + * We use the same terminology as libcurl here. > + */ > +static size_t > +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque) > +{ > + struct curl_handle *ch = opaque; > + size_t orig_realsize = size * nmemb; > + size_t realsize = orig_realsize;Do we have to worry about overflow when compiling on 32-bit machines? Asked differently, should we be using off_t instead of size_t in any of this code? Thankfully, for now, we know NBD .pread and .pwrite requests are capped at 64M, so I think you're okay (we aren't ever going to ask curl for gigabytes in one request), but maybe a comment or assert() is worth it?> + > + assert (ch->write_buf); > + > + /* Don't read more than the requested amount of data, even if the > + * server or libcurl sends more. > + */ > + if (realsize > ch->write_count) > + realsize = ch->write_count; > + > + memcpy (ch->write_buf, ptr, realsize); > + > + ch->write_count -= realsize; > + ch->write_buf += realsize; > + > + return orig_realsize;[1]> } > > /* Write data to the remote server. */ > +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque); > + > static int > curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > { > CURLcode r; > + struct curl_handle *ch; > char range[128]; > > - GET_HANDLE_FOR_CURRENT_SCOPE (ch); > - if (ch == NULL) > - return -1; > + /* Get a curl easy handle. */ > + ch = allocate_handle (); > + if (ch == NULL) goto err; > > /* Run the scripts if necessary and set headers in the handle. */ > - if (do_scripts (ch) == -1) return -1; > + if (do_scripts (ch) == -1) goto err; > > /* Tell the read_cb where we want the data to be read from. read_cb > * will update this if the data comes in multiple sections. > */ > + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb); > + curl_easy_setopt (ch->c, CURLOPT_READDATA, ch); > ch->read_buf = buf; > ch->read_count = count; > > @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > offset, offset + count); > curl_easy_setopt (ch->c, CURLOPT_RANGE, range); > > - /* The assumption here is that curl will look after timeouts. */ > - r = curl_easy_perform (ch->c); > + /* Send the command to the worker thread and wait. */ > + struct command cmd = { > + .type = EASY_HANDLE, > + .ch = ch, > + }; > + > + r = send_command_and_wait (&cmd); > if (r != CURLE_OK) { > - display_curl_error (ch, r, "pwrite: curl_easy_perform"); > - return -1; > + display_curl_error (ch, r, "pwrite"); > + goto err; > } > update_times (ch->c); > > @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > /* As far as I understand the cURL API, this should never happen. */ > assert (ch->read_count == 0); > > + free_handle (ch); > return 0; > + > + err: > + if (ch) > + free_handle (ch); > + return -1; > +} > + > +static size_t > +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque) > +{ > + struct curl_handle *ch = opaque; > + size_t realsize = size * nmemb; > + > + assert (ch->read_buf); > + if (realsize > ch->read_count) > + realsize = ch->read_count; > + > + memcpy (ptr, ch->read_buf, realsize); > + > + ch->read_count -= realsize; > + ch->read_buf += realsize; > + > + return realsize;Why does write_cb in [1] above return orig_realsize, but read_cb returns the potentially modified realsize?> } > > static struct nbdkit_plugin plugin = { > diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c > index eb2d330e1..2974cda3f 100644 > --- a/plugins/curl/pool.c > +++ b/plugins/curl/pool.c > @@ -30,11 +30,29 @@ > * SUCH DAMAGE. > */ > > -/* Curl handle pool. > +/* Worker thread which processes the curl multi interface. > * > - * To get a libcurl handle, call get_handle(). When you hold the > - * handle, it is yours exclusively to use. After you have finished > - * with the handle, put it back into the pool by calling put_handle(). > + * The main nbdkit threads (see curl.c) create curl easy handles > + * initialized with the work they want to carry out. Note there is > + * one easy handle per task (eg. per pread/pwrite request). The easy > + * handles are not reused. > + * > + * The commands + optional easy handle are submitted to the worker > + * thread over a self-pipe (it's easy to use a pipe here because the > + * way curl multi works is it can listen on an extra fd, but not on > + * anything else like a pthread condition). The curl multi performs > + * the work of the outstanding easy handles. > + * > + * When an easy handle finishes work or errors, we retire the command > + * by signalling back to the waiting nbdkit thread using a pthread > + * condition. > + * > + * In my experiments, we're almost always I/O bound so I haven't seen > + * any strong need to use more than one curl multi / worker thread, > + * although it would be possible to add more in future. > + * > + * See also this extremely useful thread: > + * https://curl.se/mail/lib-2019-03/0100.htmlVery useful comment (and link).> */ > > #include <config.h> > @@ -45,9 +63,19 @@ > #include <stdint.h> > #include <inttypes.h> > #include <string.h> > +#include <unistd.h> > #include <assert.h> > #include <pthread.h> > > +#ifdef HAVE_STDATOMIC_H > +#include <stdatomic.h> > +#else > +/* Some old platforms lack atomic types, but 32 bit ints are usually > + * "atomic enough". > + */ > +#define _Atomic /**/ > +#endif > + > #include <curl/curl.h> > > #include <nbdkit-plugin.h> > @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0; > > unsigned connections = 4; > > -/* This lock protects access to the curl_handles vector below. */ > -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; > +/* Pipe used to notify background thread that a command is pending in > + * the queue. A pointer to the 'struct command' is sent over the > + * pipe. > + */ > +static int self_pipe[2] = { -1, -1 }; > > -/* List of curl handles. This is allocated dynamically as more > - * handles are requested. Currently it does not shrink. It may grow > - * up to 'connections' in length. > +/* The curl multi handle. */ > +static CURLM *multi; > + > +/* List of running easy handles. We only need to maintain this so we > + * can remove them from the multi handle when cleaning up. > */ > DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *); > static curl_handle_list curl_handles = empty_vector; > > -/* The condition is used when the curl handles vector is full and > - * we're waiting for a thread to put_handle. > - */ > -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; > -static size_t in_use = 0, waiting = 0; > +static const char * > +command_type_to_string (enum command_type type) > +{ > + switch (type) { > + case EASY_HANDLE: return "EASY_HANDLE"; > + case STOP: return "STOP"; > + default: abort (); > + } > +} > > int > pool_get_ready (void) > { > + multi = curl_multi_init (); > + if (multi == NULL) { > + nbdkit_error ("curl_multi_init failed: %m"); > + return -1; > + } > + > return 0; > } > > +/* Start and stop the background thread. */ > +static pthread_t thread; > +static bool thread_running; > +static void *pool_worker (void *); > + > int > pool_after_fork (void) > { > + int err; > + > + if (pipe (self_pipe) == -1) { > + nbdkit_error ("pipe: %m"); > + return -1; > + } > + > + /* Start the pool background thread where all the curl work is done. */ > + err = pthread_create (&thread, NULL, pool_worker, NULL); > + if (err != 0) { > + errno = err; > + nbdkit_error ("pthread_create: %m"); > + return -1; > + } > + thread_running = true; > + > return 0; > } > > -/* Close and free all handles in the pool. */ > +/* Unload the background thread. */ > void > pool_unload (void) > { > - size_t i; > + if (thread_running) { > + /* Stop the background thread. */ > + struct command cmd = { .type = STOP }; > + send_command_and_wait (&cmd); > + pthread_join (thread, NULL); > + thread_running = false; > + } > > - if (curl_debug_pool) > - nbdkit_debug ("unload_pool: number of curl handles allocated: %zu", > - curl_handles.len); > + if (self_pipe[0] >= 0) { > + close (self_pipe[0]); > + self_pipe[0] = -1; > + } > + if (self_pipe[1] >= 0) { > + close (self_pipe[1]); > + self_pipe[1] = -1; > + } > > - for (i = 0; i < curl_handles.len; ++i) > - free_handle (curl_handles.ptr[i]); > - curl_handle_list_reset (&curl_handles); > + if (multi) { > + size_t i; > + > + /* Remove and free any easy handles in the multi. */ > + for (i = 0; i < curl_handles.len; ++i) { > + curl_multi_remove_handle (multi, curl_handles.ptr[i]->c); > + free_handle (curl_handles.ptr[i]); > + } > + > + curl_multi_cleanup (multi); > + multi = NULL; > + } > } > > -/* Get a handle from the pool. > - * > - * It is owned exclusively by the caller until they call put_handle. > +/* Command queue. */ > +static _Atomic uint64_t id; /* next command ID */ > + > +/* Send command to the background thread and wait for completion. > + * This is only called by one of the nbdkit threads. > */ > -struct curl_handle * > -get_handle (void) > +CURLcode > +send_command_and_wait (struct command *cmd) > { > - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); > - size_t i; > - struct curl_handle *ch; > - > - again: > - /* Look for a handle which is not in_use. */ > - for (i = 0; i < curl_handles.len; ++i) { > - ch = curl_handles.ptr[i]; > - if (!ch->in_use) { > - ch->in_use = true; > - in_use++; > + cmd->id = id++; > + > + /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to > + * indicate that the command has not yet been completed and status > + * set. > + */ > + cmd->status = -1; > + > + /* This will be used to signal command completion back to us. */ > + pthread_mutex_init (&cmd->mutex, NULL); > + pthread_cond_init (&cmd->cond, NULL); > + > + /* Send the command to the background thread. */ > + if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd) > + abort (); > + > + /* Wait for the command to be completed by the background thread. */ > + { > + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); > + while (cmd->status == -1) /* for -1, see above */ > + pthread_cond_wait (&cmd->cond, &cmd->mutex); > + } > + > + pthread_mutex_destroy (&cmd->mutex); > + pthread_cond_destroy (&cmd->cond); > + > + /* Note the main thread must call nbdkit_error on error! */ > + return cmd->status; > +} > + > +/* The background thread. */ > +static void check_for_finished_handles (void); > +static void retire_command (struct command *cmd, CURLcode code); > +static void do_easy_handle (struct command *cmd); > + > +static void * > +pool_worker (void *vp) > +{ > + bool stop = false; > + > + if (curl_debug_pool) > + nbdkit_debug ("curl: background thread started"); > + > + while (!stop) { > + struct command *cmd = NULL; > + struct curl_waitfd extra_fds[1] > + { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } }; > + CURLMcode mc; > + int numfds, running_handles, repeats = 0; > + > + do { > + /* Process the multi handle. */ > + mc = curl_multi_perform (multi, &running_handles); > + if (mc != CURLM_OK) { > + nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc));Since nbdkit_error() stores its string in thread-local storage, is there anything that ever extracts this error over to the nbdkit thread that issued the original request to the worker thread?...> + abort (); /* XXX We don't expect this to happen */...Then again, if we abort, it doesn't matter.> + }> + > + check_for_finished_handles (); > + > + mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds); > + if (mc != CURLM_OK) { > + nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc)); > + abort (); /* XXX We don't expect this to happen */ > + } > + > if (curl_debug_pool) > - nbdkit_debug ("get_handle: %zu", ch->i); > - return ch; > - } > - } > + nbdkit_debug ("curl_multi_wait returned: running_handles=%d numfds=%d", > + running_handles, numfds); > + > + if (numfds == 0) { > + repeats++; > + if (repeats > 1) > + nbdkit_nanosleep (1, 0); > + } > + else { > + repeats = 0; > + if (extra_fds[0].revents == CURL_WAIT_POLLIN) { > + /* There's a command waiting. */ > + if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd) > + abort (); > + } > + } > + } while (!cmd); > > - /* If more connections are allowed, then allocate a new handle. */ > - if (curl_handles.len < connections) { > - ch = allocate_handle (); > - if (ch == NULL) > - return NULL; > - if (curl_handle_list_append (&curl_handles, ch) == -1) { > - free_handle (ch); > - return NULL; > - } > - ch->i = curl_handles.len - 1; > - ch->in_use = true; > - in_use++; > if (curl_debug_pool) > - nbdkit_debug ("get_handle: %zu", ch->i); > - return ch; > - } > + nbdkit_debug ("curl: dispatching %s command %" PRIu64, > + command_type_to_string (cmd->type), cmd->id); > + > + switch (cmd->type) { > + case STOP: > + stop = true; > + retire_command (cmd, CURLE_OK); > + break; > > - /* Otherwise we have run out of connections so we must wait until > - * another thread calls put_handle. > - */ > - assert (in_use == connections); > - waiting++; > - while (in_use == connections) > - pthread_cond_wait (&cond, &lock); > - waiting--; > + case EASY_HANDLE: > + do_easy_handle (cmd); > + break; > + } > + } /* while (!stop) */ > > - goto again; > + if (curl_debug_pool) > + nbdkit_debug ("curl: background thread stopped"); > + > + return NULL; > } > > -/* Return the handle to the pool. */ > -void > -put_handle (struct curl_handle *ch) > +/* This checks if any easy handles in the multi have > + * finished and retires the associated commands. > + */ > +static void > +check_for_finished_handles (void) > { > - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); > + CURLMsg *msg; > + int msgs_in_queue; > + > + while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) { > + size_t i; > + struct curl_handle *ch = NULL; > + > + if (msg->msg == CURLMSG_DONE) { > + /* Find this curl_handle. */ > + for (i = 0; i < curl_handles.len; ++i) { > + if (curl_handles.ptr[i]->c == msg->easy_handle) { > + ch = curl_handles.ptr[i]; > + curl_handle_list_remove (&curl_handles, i); > + } > + } > + if (ch == NULL) abort (); > + curl_multi_remove_handle (multi, ch->c); > + > + retire_command (ch->cmd, msg->data.result); > + } > + } > +} > > +/* Retire a command. status is a CURLcode. */ > +static void > +retire_command (struct command *cmd, CURLcode status) > +{ > if (curl_debug_pool) > - nbdkit_debug ("put_handle: %zu", ch->i); > + nbdkit_debug ("curl: retiring %s command %" PRIu64, > + command_type_to_string (cmd->type), cmd->id); > + > + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); > + cmd->status = status; > + pthread_cond_signal (&cmd->cond); > +} > + > +static void > +do_easy_handle (struct command *cmd) > +{ > + CURLMcode mc; > + > + cmd->ch->cmd = cmd; > + > + /* Add the handle to the multi. */ > + mc = curl_multi_add_handle (multi, cmd->ch->c); > + if (mc != CURLM_OK) { > + nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc)); > + goto err; > + } > > - ch->in_use = false; > - in_use--; > + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1) > + goto err; > + return; > > - /* Signal the next thread which is waiting. */ > - if (waiting > 0) > - pthread_cond_signal (&cond); > + err: > + retire_command (cmd, CURLE_OUT_OF_MEMORY); > } > -- > 2.41.0Overall looks nice, and I learned more about curl in the process. -- Eric Blake, Principal Software Engineer Red Hat, Inc. Virtualization: qemu.org | libguestfs.org
Richard W.M. Jones
2023-Aug-04 18:04 UTC
[Libguestfs] [PATCH nbdkit v2 8/9] curl: Use curl multi interface
On Fri, Aug 04, 2023 at 11:38:03AM -0500, Eric Blake wrote:> On Fri, Jul 28, 2023 at 06:17:52PM +0100, Richard W.M. Jones wrote: > > See the comment at the top of plugins/curl/pool.c for general > > information about how this works. > > > > This makes a very large difference to performance over the previous > > implementation. Note for the tests below I also applied the next > > commit changing the behaviour of the connections parameter. > > > > Using this test case: > > > > $ uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img > > $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri null' > > > > The times are as follows: > > > > multi, connections=64 21.5s > > multi, connections=32 30.2s > > multi, connections=16 56.0s > > before this commit 166s > > Awesome performance improvements! As painful as this series has been > for you to write and debug, it is showing its worth. > > > --- > > plugins/curl/curldefs.h | 35 ++-- > > plugins/curl/config.c | 246 --------------------------- > > plugins/curl/curl.c | 366 +++++++++++++++++++++++++++++++++++----- > > plugins/curl/pool.c | 346 ++++++++++++++++++++++++++++--------- > > 4 files changed, 616 insertions(+), 377 deletions(-) > > Finally taking time to review this, even though it is already in-tree. > > > @@ -98,8 +88,30 @@ struct curl_handle { > > const char *read_buf; > > uint32_t read_count; > > > > + /* This field is used by curl_get_size. */ > > + bool accept_range; > > + > > /* Used by scripts.c */ > > struct curl_slist *headers_copy; > > + > > + /* Used by pool.c */ > > + struct command *cmd; > > +}; > > + > > +/* Asynchronous commands that can be sent to the pool thread. */ > > +enum command_type { EASY_HANDLE, STOP }; > > +struct command { > > + /* These fields are set by the caller. */ > > + enum command_type type; /* command */ > > + struct curl_handle *ch; /* for EASY_HANDLE, the easy handle */ > > + > > + /* This field is set to a unique value by send_command_and_wait. */ > > + uint64_t id; /* serial number */ > > + > > + /* These fields are used to signal back that the command finished. */ > > + pthread_mutex_t mutex; /* completion mutex */ > > + pthread_cond_t cond; /* completion condition */ > > + CURLcode status; /* status code (CURLE_OK = succeeded) */ > > }; > > Makes sense. The two types are mutually recursive (curl_handle > includes a struct command *; command includes a struct curl_handle *); > hopefully you have proper locking when altering multiple objects to > adjust how they point to one another.Actually locking is not needed. Let me document it through ... We create both the curl easy handle and the associated EASY_HANDLE command in the nbdkit thread that gets the request, eg. in the curl .pread method. That of course requires no locking. There is a single background worker thread. A "self pipe" passes pointers to 'struct command *' to this worker thread simple by writing the 8 byte pointer onto the pipe (hopefully atomic ...) The nbdkit request thread then blocks on the mutex/cond in the command handle. The background worker thread picks the 'struct command *' up from the pipe in the same event loop that it uses to process ongoing requests on the multi handle. It takes the easy handle and adds it to the multi handle. When the easy handle work has finished, the worker thread removes it from the multi handle and signals the nbdkit request thread to wake up (using cmd->mutex + cmd->lock). At which point possession passes back to the request thread which will usually free up both the command and easy handle.> > +++ b/plugins/curl/config.c > > > +++ b/plugins/curl/curl.c > > > > +/* Get the file size. */ > > +static int get_content_length_accept_range (struct curl_handle *ch); > > +static bool try_fallback_GET_method (struct curl_handle *ch); > > +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque); > > +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque); > > + > > +static int64_t > > +curl_get_size (void *handle) > > +{ > > + struct curl_handle *ch; > > + CURLcode r; > > + long code; > > +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T > > + curl_off_t o; > > +#else > > + double d; > > +#endif > > + int64_t exportsize; > > + > > + /* Get a curl easy handle. */ > > + ch = allocate_handle (); > > + if (ch == NULL) goto err; > > + > > + /* Prepare to read the headers. */ > > + if (get_content_length_accept_range (ch) == -1) > > + goto err; > > + > > + /* Send the command to the worker thread and wait. */ > > + struct command cmd = { > > + .type = EASY_HANDLE, > > + .ch = ch, > > + }; > > + > > + r = send_command_and_wait (&cmd); > > + update_times (ch->c); > > + if (r != CURLE_OK) { > > + display_curl_error (ch, r, > > + "problem doing HEAD request to fetch size of URL [%s]", > > + url); > > + > > + /* Get the HTTP status code, if available. */ > > + r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code); > > + if (r == CURLE_OK) > > + nbdkit_debug ("HTTP status code: %ld", code); > > + else > > + code = -1; > > + > > + /* See comment on try_fallback_GET_method below. */ > > + if (code != 403 || !try_fallback_GET_method (ch)) > > + goto err; > > + } > > + > > + /* Get the content length. > > + * > > + * Note there is some subtlety here: For web servers using chunked > > + * encoding, either the Content-Length header will not be present, > > + * or if present it should be ignored. (For such servers the only > > + * way to find out the true length would be to read all of the > > + * content, which we don't want to do). > > + * > > + * Curl itself resolves this for us. It will ignore the > > + * Content-Length header if chunked encoding is used, returning the > > + * length as -1 which we check below (see also > > + * curl:lib/http.c:Curl_http_size). > > + */ > > +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T > > + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o); > > + if (r != CURLE_OK) { > > + display_curl_error (ch, r, > > + "could not get length of remote file [%s]", url); > > + goto err; > > + } > > + > > + if (o == -1) { > > + nbdkit_error ("could not get length of remote file [%s], " > > + "is the URL correct?", url); > > + goto err; > > + } > > + > > + exportsize = o; > > +#else > > + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d); > > + if (r != CURLE_OK) { > > + display_curl_error (ch, r, > > + "could not get length of remote file [%s]", url); > > + goto err; > > + } > > + > > + if (d == -1) { > > + nbdkit_error ("could not get length of remote file [%s], " > > + "is the URL correct?", url); > > + goto err; > > + } > > + > > + exportsize = d; > > Does curl guarantee that the double d will contain a value assignable > to int64_t without overflow/truncation? For particularly large sizes, > double has insufficient precision for all possible file sizes, but I > doubt someone is exposing such large files over HTTP.No, I don't believe a 'double' is sufficient. This is why newer versions of curl have HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T. Note this code is just copied from the old curl plugin.> > +#endif > > + nbdkit_debug ("content length: %" PRIi64, exportsize); > > + > > + /* If this is HTTP, check that byte ranges are supported. */ > > + if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 || > > + ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) { > > + if (!ch->accept_range) { > > + nbdkit_error ("server does not support 'range' (byte range) requests"); > > + goto err; > > + } > > + > > + nbdkit_debug ("accept range supported (for HTTP/HTTPS)"); > > + } > > + > > + free_handle (ch); > > + return exportsize; > > + > > + err: > > + if (ch) > > + free_handle (ch); > > + return -1; > > +} > > + > > +/* Get the file size and also whether the remote HTTP server > > + * supports byte ranges. > > + */ > > +static int > > +get_content_length_accept_range (struct curl_handle *ch) > > +{ > > + /* We must run the scripts if necessary and set headers in the > > + * handle. > > + */ > > + if (do_scripts (ch) == -1) > > + return -1; > > + > > + /* Set this flag in the handle to false. The callback should set it > > + * to true if byte ranges are supported, which we check below. > > + */ > > + ch->accept_range = false; > > + > > + /* No Body, not nobody! This forces a HEAD request. */ > > + curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L); > > + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); > > + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); > > + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL); > > + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL); > > + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL); > > + curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL); > > + return 0; > > +} > > + > > +/* S3 servers can return 403 Forbidden for HEAD but still respond > > + * to GET, so we give it a second chance in that case. > > + * https://github.com/kubevirt/containerized-data-importer/issues/2737 > > + * > > + * This function issues a GET request with a writefunction that always > > + * returns an error, thus effectively getting the headers but > > + * abandoning the transfer as soon as possible after. > > + */ > > +static bool > > +try_fallback_GET_method (struct curl_handle *ch) > > +{ > > + CURLcode r; > > + > > + nbdkit_debug ("attempting to fetch headers using GET method"); > > + > > + curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L); > > + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); > > + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); > > + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb); > > + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); > > + > > + struct command cmd = { > > + .type = EASY_HANDLE, > > + .ch = ch, > > + }; > > + > > + r = send_command_and_wait (&cmd); > > + update_times (ch->c); > > + > > + /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too > > + * (eg if the remote has zero length). Other errors might happen > > + * but we ignore them since it is a fallback path. > > + */ > > + return r == CURLE_OK || r == CURLE_WRITE_ERROR; > > +} > > + > > +static size_t > > +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque) > > +{ > > + struct curl_handle *ch = opaque; > > + size_t realsize = size * nmemb; > > + const char *header = ptr; > > + const char *end = header + realsize; > > + const char *accept_ranges = "accept-ranges:"; > > + const char *bytes = "bytes"; > > + > > + if (realsize >= strlen (accept_ranges) && > > + ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) { > > + const char *p = strchr (header, ':') + 1; > > + > > + /* Skip whitespace between the header name and value. */ > > + while (p < end && *p && ascii_isspace (*p)) > > Technically, '*p && ascii_isspace (*p)' can be shortened to > 'ascii_isspace (*p)', since the NUL byte is not ascii space. I don't > know if the compiler is smart enough to make that optimization on your > behalf.Ah indeed.> > + p++; > > + > > + if (end - p >= strlen (bytes) > > + && strncmp (p, bytes, strlen (bytes)) == 0) { > > + /* Check that there is nothing but whitespace after the value. */ > > + p += strlen (bytes); > > + while (p < end && *p && ascii_isspace (*p)) > > Another spot of the same. > > > + p++; > > + > > + if (p == end || !*p) > > + ch->accept_range = true; > > + } > > + } > > + > > + return realsize; > > +} > > + > > +static size_t > > +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque) > > +{ > > +#ifdef CURL_WRITEFUNC_ERROR > > + return CURL_WRITEFUNC_ERROR; > > +#else > > + return 0; /* in older curl, any size < requested will also be an error */ > > +#endif > > +} > > + > > /* Read data from the remote server. */ > > +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque); > > + > > static int > > curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > > { > > CURLcode r; > > + struct curl_handle *ch; > > char range[128]; > > > > - GET_HANDLE_FOR_CURRENT_SCOPE (ch); > > - if (ch == NULL) > > - return -1; > > + /* Get a curl easy handle. */ > > + ch = allocate_handle (); > > + if (ch == NULL) goto err; > > > > /* Run the scripts if necessary and set headers in the handle. */ > > - if (do_scripts (ch) == -1) return -1; > > + if (do_scripts (ch) == -1) goto err; > > > > /* Tell the write_cb where we want the data to be written. write_cb > > * will update this if the data comes in multiple sections. > > */ > > + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb); > > + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); > > ch->write_buf = buf; > > ch->write_count = count; > > > > @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > > offset, offset + count); > > curl_easy_setopt (ch->c, CURLOPT_RANGE, range); > > > > - /* The assumption here is that curl will look after timeouts. */ > > - r = curl_easy_perform (ch->c); > > + /* Send the command to the worker thread and wait. */ > > + struct command cmd = { > > + .type = EASY_HANDLE, > > + .ch = ch, > > + }; > > + > > + r = send_command_and_wait (&cmd); > > if (r != CURLE_OK) { > > - display_curl_error (ch, r, "pread: curl_easy_perform"); > > - return -1; > > + display_curl_error (ch, r, "pread"); > > + goto err; > > } > > update_times (ch->c); > > > > @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > > /* As far as I understand the cURL API, this should never happen. */ > > assert (ch->write_count == 0); > > > > + free_handle (ch); > > return 0; > > + > > + err: > > + if (ch) > > + free_handle (ch); > > + return -1; > > +} > > + > > +/* NB: The terminology used by libcurl is confusing! > > + * > > + * WRITEFUNCTION / write_cb is used when reading from the remote server > > + * READFUNCTION / read_cb is used when writing to the remote server. > > + * > > + * We use the same terminology as libcurl here. > > + */ > > +static size_t > > +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque) > > +{ > > + struct curl_handle *ch = opaque; > > + size_t orig_realsize = size * nmemb; > > + size_t realsize = orig_realsize; > > Do we have to worry about overflow when compiling on 32-bit machines? > Asked differently, should we be using off_t instead of size_t in any > of this code? Thankfully, for now, we know NBD .pread and .pwrite > requests are capped at 64M, so I think you're okay (we aren't ever > going to ask curl for gigabytes in one request),It's a good question ... I suspect that even if we requested it, web servers probably wouldn't want to serve gigabytes of data in a range request, but as you point out we shouldn't ever request it right now.> but maybe a comment > or assert() is worth it?I'll add a comment, but could we do this with one of the overflow macros? I'm not sure ...> > + > > + assert (ch->write_buf); > > + > > + /* Don't read more than the requested amount of data, even if the > > + * server or libcurl sends more. > > + */ > > + if (realsize > ch->write_count) > > + realsize = ch->write_count; > > + > > + memcpy (ch->write_buf, ptr, realsize); > > + > > + ch->write_count -= realsize; > > + ch->write_buf += realsize; > > + > > + return orig_realsize; > > [1] > > > } > > > > /* Write data to the remote server. */ > > +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque); > > + > > static int > > curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > > { > > CURLcode r; > > + struct curl_handle *ch; > > char range[128]; > > > > - GET_HANDLE_FOR_CURRENT_SCOPE (ch); > > - if (ch == NULL) > > - return -1; > > + /* Get a curl easy handle. */ > > + ch = allocate_handle (); > > + if (ch == NULL) goto err; > > > > /* Run the scripts if necessary and set headers in the handle. */ > > - if (do_scripts (ch) == -1) return -1; > > + if (do_scripts (ch) == -1) goto err; > > > > /* Tell the read_cb where we want the data to be read from. read_cb > > * will update this if the data comes in multiple sections. > > */ > > + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb); > > + curl_easy_setopt (ch->c, CURLOPT_READDATA, ch); > > ch->read_buf = buf; > > ch->read_count = count; > > > > @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > > offset, offset + count); > > curl_easy_setopt (ch->c, CURLOPT_RANGE, range); > > > > - /* The assumption here is that curl will look after timeouts. */ > > - r = curl_easy_perform (ch->c); > > + /* Send the command to the worker thread and wait. */ > > + struct command cmd = { > > + .type = EASY_HANDLE, > > + .ch = ch, > > + }; > > + > > + r = send_command_and_wait (&cmd); > > if (r != CURLE_OK) { > > - display_curl_error (ch, r, "pwrite: curl_easy_perform"); > > - return -1; > > + display_curl_error (ch, r, "pwrite"); > > + goto err; > > } > > update_times (ch->c); > > > > @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > > /* As far as I understand the cURL API, this should never happen. */ > > assert (ch->read_count == 0); > > > > + free_handle (ch); > > return 0; > > + > > + err: > > + if (ch) > > + free_handle (ch); > > + return -1; > > +} > > + > > +static size_t > > +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque) > > +{ > > + struct curl_handle *ch = opaque; > > + size_t realsize = size * nmemb; > > + > > + assert (ch->read_buf); > > + if (realsize > ch->read_count) > > + realsize = ch->read_count; > > + > > + memcpy (ptr, ch->read_buf, realsize); > > + > > + ch->read_count -= realsize; > > + ch->read_buf += realsize; > > + > > + return realsize; > > Why does write_cb in [1] above return orig_realsize, but read_cb > returns the potentially modified realsize?It's a good question (this code was copied from the old plugin which was working for years). It definitely works now. Note that writes in this plugin are probably never used. They require a web server that supports "Range PUTs" which is, probably, not any server in existence.> > } > > > > static struct nbdkit_plugin plugin = { > > diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c > > index eb2d330e1..2974cda3f 100644 > > --- a/plugins/curl/pool.c > > +++ b/plugins/curl/pool.c > > @@ -30,11 +30,29 @@ > > * SUCH DAMAGE. > > */ > > > > -/* Curl handle pool. > > +/* Worker thread which processes the curl multi interface. > > * > > - * To get a libcurl handle, call get_handle(). When you hold the > > - * handle, it is yours exclusively to use. After you have finished > > - * with the handle, put it back into the pool by calling put_handle(). > > + * The main nbdkit threads (see curl.c) create curl easy handles > > + * initialized with the work they want to carry out. Note there is > > + * one easy handle per task (eg. per pread/pwrite request). The easy > > + * handles are not reused. > > + * > > + * The commands + optional easy handle are submitted to the worker > > + * thread over a self-pipe (it's easy to use a pipe here because the > > + * way curl multi works is it can listen on an extra fd, but not on > > + * anything else like a pthread condition). The curl multi performs > > + * the work of the outstanding easy handles. > > + * > > + * When an easy handle finishes work or errors, we retire the command > > + * by signalling back to the waiting nbdkit thread using a pthread > > + * condition. > > + * > > + * In my experiments, we're almost always I/O bound so I haven't seen > > + * any strong need to use more than one curl multi / worker thread, > > + * although it would be possible to add more in future. > > + * > > + * See also this extremely useful thread: > > + * https://curl.se/mail/lib-2019-03/0100.html > > Very useful comment (and link). > > > */ > > > > #include <config.h> > > @@ -45,9 +63,19 @@ > > #include <stdint.h> > > #include <inttypes.h> > > #include <string.h> > > +#include <unistd.h> > > #include <assert.h> > > #include <pthread.h> > > > > +#ifdef HAVE_STDATOMIC_H > > +#include <stdatomic.h> > > +#else > > +/* Some old platforms lack atomic types, but 32 bit ints are usually > > + * "atomic enough". > > + */ > > +#define _Atomic /**/ > > +#endif > > + > > #include <curl/curl.h> > > > > #include <nbdkit-plugin.h> > > @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0; > > > > unsigned connections = 4; > > > > -/* This lock protects access to the curl_handles vector below. */ > > -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; > > +/* Pipe used to notify background thread that a command is pending in > > + * the queue. A pointer to the 'struct command' is sent over the > > + * pipe. > > + */ > > +static int self_pipe[2] = { -1, -1 }; > > > > -/* List of curl handles. This is allocated dynamically as more > > - * handles are requested. Currently it does not shrink. It may grow > > - * up to 'connections' in length. > > +/* The curl multi handle. */ > > +static CURLM *multi; > > + > > +/* List of running easy handles. We only need to maintain this so we > > + * can remove them from the multi handle when cleaning up. > > */ > > DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *); > > static curl_handle_list curl_handles = empty_vector; > > > > -/* The condition is used when the curl handles vector is full and > > - * we're waiting for a thread to put_handle. > > - */ > > -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; > > -static size_t in_use = 0, waiting = 0; > > +static const char * > > +command_type_to_string (enum command_type type) > > +{ > > + switch (type) { > > + case EASY_HANDLE: return "EASY_HANDLE"; > > + case STOP: return "STOP"; > > + default: abort (); > > + } > > +} > > > > int > > pool_get_ready (void) > > { > > + multi = curl_multi_init (); > > + if (multi == NULL) { > > + nbdkit_error ("curl_multi_init failed: %m"); > > + return -1; > > + } > > + > > return 0; > > } > > > > +/* Start and stop the background thread. */ > > +static pthread_t thread; > > +static bool thread_running; > > +static void *pool_worker (void *); > > + > > int > > pool_after_fork (void) > > { > > + int err; > > + > > + if (pipe (self_pipe) == -1) { > > + nbdkit_error ("pipe: %m"); > > + return -1; > > + } > > + > > + /* Start the pool background thread where all the curl work is done. */ > > + err = pthread_create (&thread, NULL, pool_worker, NULL); > > + if (err != 0) { > > + errno = err; > > + nbdkit_error ("pthread_create: %m"); > > + return -1; > > + } > > + thread_running = true; > > + > > return 0; > > } > > > > -/* Close and free all handles in the pool. */ > > +/* Unload the background thread. */ > > void > > pool_unload (void) > > { > > - size_t i; > > + if (thread_running) { > > + /* Stop the background thread. */ > > + struct command cmd = { .type = STOP }; > > + send_command_and_wait (&cmd); > > + pthread_join (thread, NULL); > > + thread_running = false; > > + } > > > > - if (curl_debug_pool) > > - nbdkit_debug ("unload_pool: number of curl handles allocated: %zu", > > - curl_handles.len); > > + if (self_pipe[0] >= 0) { > > + close (self_pipe[0]); > > + self_pipe[0] = -1; > > + } > > + if (self_pipe[1] >= 0) { > > + close (self_pipe[1]); > > + self_pipe[1] = -1; > > + } > > > > - for (i = 0; i < curl_handles.len; ++i) > > - free_handle (curl_handles.ptr[i]); > > - curl_handle_list_reset (&curl_handles); > > + if (multi) { > > + size_t i; > > + > > + /* Remove and free any easy handles in the multi. */ > > + for (i = 0; i < curl_handles.len; ++i) { > > + curl_multi_remove_handle (multi, curl_handles.ptr[i]->c); > > + free_handle (curl_handles.ptr[i]); > > + } > > + > > + curl_multi_cleanup (multi); > > + multi = NULL; > > + } > > } > > > > -/* Get a handle from the pool. > > - * > > - * It is owned exclusively by the caller until they call put_handle. > > +/* Command queue. */ > > +static _Atomic uint64_t id; /* next command ID */ > > + > > +/* Send command to the background thread and wait for completion. > > + * This is only called by one of the nbdkit threads. > > */ > > -struct curl_handle * > > -get_handle (void) > > +CURLcode > > +send_command_and_wait (struct command *cmd) > > { > > - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); > > - size_t i; > > - struct curl_handle *ch; > > - > > - again: > > - /* Look for a handle which is not in_use. */ > > - for (i = 0; i < curl_handles.len; ++i) { > > - ch = curl_handles.ptr[i]; > > - if (!ch->in_use) { > > - ch->in_use = true; > > - in_use++; > > + cmd->id = id++; > > + > > + /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to > > + * indicate that the command has not yet been completed and status > > + * set. > > + */ > > + cmd->status = -1; > > + > > + /* This will be used to signal command completion back to us. */ > > + pthread_mutex_init (&cmd->mutex, NULL); > > + pthread_cond_init (&cmd->cond, NULL); > > + > > + /* Send the command to the background thread. */ > > + if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd) > > + abort (); > > + > > + /* Wait for the command to be completed by the background thread. */ > > + { > > + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); > > + while (cmd->status == -1) /* for -1, see above */ > > + pthread_cond_wait (&cmd->cond, &cmd->mutex); > > + } > > + > > + pthread_mutex_destroy (&cmd->mutex); > > + pthread_cond_destroy (&cmd->cond); > > + > > + /* Note the main thread must call nbdkit_error on error! */ > > + return cmd->status; > > +} > > + > > +/* The background thread. */ > > +static void check_for_finished_handles (void); > > +static void retire_command (struct command *cmd, CURLcode code); > > +static void do_easy_handle (struct command *cmd); > > + > > +static void * > > +pool_worker (void *vp) > > +{ > > + bool stop = false; > > + > > + if (curl_debug_pool) > > + nbdkit_debug ("curl: background thread started"); > > + > > + while (!stop) { > > + struct command *cmd = NULL; > > + struct curl_waitfd extra_fds[1] > > + { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } }; > > + CURLMcode mc; > > + int numfds, running_handles, repeats = 0; > > + > > + do { > > + /* Process the multi handle. */ > > + mc = curl_multi_perform (multi, &running_handles); > > + if (mc != CURLM_OK) { > > + nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc)); > > Since nbdkit_error() stores its string in thread-local storage, is > there anything that ever extracts this error over to the nbdkit thread > that issued the original request to the worker thread?... > > > + abort (); /* XXX We don't expect this to happen */ > > ...Then again, if we abort, it doesn't matter.I was unclear what to do here. The final code does: while (!stop) { ... cmd = process_multi_handle (); if (cmd == NULL) continue; /* or die?? */ with process_multi_handle still calling nbdkit_error. I felt it might be better to keep trying than just abort, as presumably some (most?) errors are transient. nbdkit_error will ensure the error message is written out.> > + } > > > + > > + check_for_finished_handles (); > > + > > + mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds); > > + if (mc != CURLM_OK) { > > + nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc)); > > + abort (); /* XXX We don't expect this to happen */ > > + } > > + > > if (curl_debug_pool) > > - nbdkit_debug ("get_handle: %zu", ch->i); > > - return ch; > > - } > > - } > > + nbdkit_debug ("curl_multi_wait returned: running_handles=%d numfds=%d", > > + running_handles, numfds); > > + > > + if (numfds == 0) { > > + repeats++; > > + if (repeats > 1) > > + nbdkit_nanosleep (1, 0); > > + } > > + else { > > + repeats = 0; > > + if (extra_fds[0].revents == CURL_WAIT_POLLIN) { > > + /* There's a command waiting. */ > > + if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd) > > + abort (); > > + } > > + } > > + } while (!cmd); > > > > - /* If more connections are allowed, then allocate a new handle. */ > > - if (curl_handles.len < connections) { > > - ch = allocate_handle (); > > - if (ch == NULL) > > - return NULL; > > - if (curl_handle_list_append (&curl_handles, ch) == -1) { > > - free_handle (ch); > > - return NULL; > > - } > > - ch->i = curl_handles.len - 1; > > - ch->in_use = true; > > - in_use++; > > if (curl_debug_pool) > > - nbdkit_debug ("get_handle: %zu", ch->i); > > - return ch; > > - } > > + nbdkit_debug ("curl: dispatching %s command %" PRIu64, > > + command_type_to_string (cmd->type), cmd->id); > > + > > + switch (cmd->type) { > > + case STOP: > > + stop = true; > > + retire_command (cmd, CURLE_OK); > > + break; > > > > - /* Otherwise we have run out of connections so we must wait until > > - * another thread calls put_handle. > > - */ > > - assert (in_use == connections); > > - waiting++; > > - while (in_use == connections) > > - pthread_cond_wait (&cond, &lock); > > - waiting--; > > + case EASY_HANDLE: > > + do_easy_handle (cmd); > > + break; > > + } > > + } /* while (!stop) */ > > > > - goto again; > > + if (curl_debug_pool) > > + nbdkit_debug ("curl: background thread stopped"); > > + > > + return NULL; > > } > > > > -/* Return the handle to the pool. */ > > -void > > -put_handle (struct curl_handle *ch) > > +/* This checks if any easy handles in the multi have > > + * finished and retires the associated commands. > > + */ > > +static void > > +check_for_finished_handles (void) > > { > > - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); > > + CURLMsg *msg; > > + int msgs_in_queue; > > + > > + while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) { > > + size_t i; > > + struct curl_handle *ch = NULL; > > + > > + if (msg->msg == CURLMSG_DONE) { > > + /* Find this curl_handle. */ > > + for (i = 0; i < curl_handles.len; ++i) { > > + if (curl_handles.ptr[i]->c == msg->easy_handle) { > > + ch = curl_handles.ptr[i]; > > + curl_handle_list_remove (&curl_handles, i); > > + } > > + } > > + if (ch == NULL) abort (); > > + curl_multi_remove_handle (multi, ch->c); > > + > > + retire_command (ch->cmd, msg->data.result); > > + } > > + } > > +} > > > > +/* Retire a command. status is a CURLcode. */ > > +static void > > +retire_command (struct command *cmd, CURLcode status) > > +{ > > if (curl_debug_pool) > > - nbdkit_debug ("put_handle: %zu", ch->i); > > + nbdkit_debug ("curl: retiring %s command %" PRIu64, > > + command_type_to_string (cmd->type), cmd->id); > > + > > + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); > > + cmd->status = status; > > + pthread_cond_signal (&cmd->cond); > > +} > > + > > +static void > > +do_easy_handle (struct command *cmd) > > +{ > > + CURLMcode mc; > > + > > + cmd->ch->cmd = cmd; > > + > > + /* Add the handle to the multi. */ > > + mc = curl_multi_add_handle (multi, cmd->ch->c); > > + if (mc != CURLM_OK) { > > + nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc)); > > + goto err; > > + } > > > > - ch->in_use = false; > > - in_use--; > > + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1) > > + goto err; > > + return; > > > > - /* Signal the next thread which is waiting. */ > > - if (waiting > 0) > > - pthread_cond_signal (&cond); > > + err: > > + retire_command (cmd, CURLE_OUT_OF_MEMORY); > > } > > -- > > 2.41.0 > > Overall looks nice, and I learned more about curl in the process.For me, too much :-( Thanks, Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-p2v converts physical machines to virtual machines. Boot with a live CD or over the network (PXE) and turn machines into KVM guests. http://libguestfs.org/virt-v2v