Laszlo Ersek
2023-Aug-07 10:57 UTC
[Libguestfs] [PATCH nbdkit v2 8/9] curl: Use curl multi interface
On 8/4/23 20:04, Richard W.M. Jones wrote:> On Fri, Aug 04, 2023 at 11:38:03AM -0500, Eric Blake wrote: >> On Fri, Jul 28, 2023 at 06:17:52PM +0100, Richard W.M. Jones wrote: >>> See the comment at the top of plugins/curl/pool.c for general >>> information about how this works. >>> >>> This makes a very large difference to performance over the previous >>> implementation. Note for the tests below I also applied the next >>> commit changing the behaviour of the connections parameter. >>> >>> Using this test case: >>> >>> $ uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img >>> $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri null' >>> >>> The times are as follows: >>> >>> multi, connections=64 21.5s >>> multi, connections=32 30.2s >>> multi, connections=16 56.0s >>> before this commit 166s >> >> Awesome performance improvements! As painful as this series has been >> for you to write and debug, it is showing its worth. >> >>> --- >>> plugins/curl/curldefs.h | 35 ++-- >>> plugins/curl/config.c | 246 --------------------------- >>> plugins/curl/curl.c | 366 +++++++++++++++++++++++++++++++++++----- >>> plugins/curl/pool.c | 346 ++++++++++++++++++++++++++++--------- >>> 4 files changed, 616 insertions(+), 377 deletions(-) >> >> Finally taking time to review this, even though it is already in-tree. >> >>> @@ -98,8 +88,30 @@ struct curl_handle { >>> const char *read_buf; >>> uint32_t read_count; >>> >>> + /* This field is used by curl_get_size. */ >>> + bool accept_range; >>> + >>> /* Used by scripts.c */ >>> struct curl_slist *headers_copy; >>> + >>> + /* Used by pool.c */ >>> + struct command *cmd; >>> +}; >>> + >>> +/* Asynchronous commands that can be sent to the pool thread. */ >>> +enum command_type { EASY_HANDLE, STOP }; >>> +struct command { >>> + /* These fields are set by the caller. */ >>> + enum command_type type; /* command */ >>> + struct curl_handle *ch; /* for EASY_HANDLE, the easy handle */ >>> + >>> + /* This field is set to a unique value by send_command_and_wait. */ >>> + uint64_t id; /* serial number */ >>> + >>> + /* These fields are used to signal back that the command finished. */ >>> + pthread_mutex_t mutex; /* completion mutex */ >>> + pthread_cond_t cond; /* completion condition */ >>> + CURLcode status; /* status code (CURLE_OK = succeeded) */ >>> }; >> >> Makes sense. The two types are mutually recursive (curl_handle >> includes a struct command *; command includes a struct curl_handle *); >> hopefully you have proper locking when altering multiple objects to >> adjust how they point to one another. > > Actually locking is not needed. Let me document it through ... > > We create both the curl easy handle and the associated EASY_HANDLE > command in the nbdkit thread that gets the request, eg. in the curl > .pread method. That of course requires no locking. > > There is a single background worker thread. > > A "self pipe" passes pointers to 'struct command *' to this worker > thread simple by writing the 8 byte pointer onto the pipe (hopefully > atomic ...) The nbdkit request thread then blocks on the mutex/cond > in the command handle.Right, that's exactly that I got curious about. In my opinion, writing to the self-pipe is mostly safe. Reading from the self-pipe could be slightly polished. Here are the POSIX pages on write() and read(): https://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html https://pubs.opengroup.org/onlinepubs/9699919799/functions/read.html The first one (write) says that if you attempt to write at most PIPE_BUF bytes, then writes will not be interleaved with other concurrent writes, so in a sense the write will be atomic. (And in this case, O_NONBLOCK only changes the behavior for the case when the buffer cannot be written in entirety: with O_NONBLOCK clear, the writer thread will block, with O_NONBLOCK set, the writer thread will see -1/EAGAIN.) Now, PIPE_BUF is "variable" in a sense: https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/limits.h.html but it is greater than or equal to _POSIX_PIPE_BUF, which is 512. Our pointers are 8 bytes in size, so the <=PIPE_BUF condition is surely satisfied. ... The only case I see possible for a short write is the delivery of a signal after transfer has started. *If* nbdkit catches some signal such that the signal handler actually returns, then this could be a (theoretical) problem, calling for xwrite() or similar.> > The background worker thread picks the 'struct command *' up from the > pipe in the same event loop that it uses to process ongoing requests > on the multi handle. It takes the easy handle and adds it to the > multi handle.The spec on read() does not seem to have the same kind of non-interleaving / "atomicity" language that write() does. The rationale section says: "The standard developers considered adding atomicity requirements to a pipe or FIFO, but recognized that due to the nature of pipes and FIFOs there could be no guarantee of atomicity of reads of {PIPE_BUF} or any other size that would be an aid to applications portability." Now given that the writer side is free of interleaving (because, in the first place: there is only a single writer!), I think we need not worry about data corruption. However, it does feel like read() may return fewer than 8 bytes in one go, "just because" (not only because of a signal being delivered midway). And that may be a problem with readiness reporting via curl_multi_wait(); even if you get CURL_WAIT_POLLIN, the whole command pointer may not yet be available. Now I do think a split read is extremely unlikely, maybe even impossible on Linux. If we choose to be pedantic, then the curl_multi_wait() loop might want to expect "cmd" to be populated only over multiple iterations -- like use "cmd_ptr_bytes" or something similar for tracking the size already available, and only consider "cmd" usable when cmd_ptr_bytes reaches sizeof cmd. Yet another topic that comes up is visibility / ordering. Transfering a pointer via write()/read() between threads does not seem to guarantee ordering / visibility regarding the *pointed-to* area per spec. Pthread mutex APIs (and C11 thread and atomics APIs) include the CPU instructions for ensuring proper memory visibility. *BUT* I think it would be insane for any POSIX implementation to have a write()+read() combination that's *weaker* regarding data consistency, (i.e., that's more racy) than mutexes. Write() and read() are heavy-weight syscalls, so I can't imagine a publish-subscribe pattern not working with them (i.e., the pointed-to area not "settling" until the pointer is consumed). Laszlo> > When the easy handle work has finished, the worker thread removes it > from the multi handle and signals the nbdkit request thread to wake up > (using cmd->mutex + cmd->lock). At which point possession passes back > to the request thread which will usually free up both the command and > easy handle. > >>> +++ b/plugins/curl/config.c >> >>> +++ b/plugins/curl/curl.c >>> >>> +/* Get the file size. */ >>> +static int get_content_length_accept_range (struct curl_handle *ch); >>> +static bool try_fallback_GET_method (struct curl_handle *ch); >>> +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque); >>> +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque); >>> + >>> +static int64_t >>> +curl_get_size (void *handle) >>> +{ >>> + struct curl_handle *ch; >>> + CURLcode r; >>> + long code; >>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T >>> + curl_off_t o; >>> +#else >>> + double d; >>> +#endif >>> + int64_t exportsize; >>> + >>> + /* Get a curl easy handle. */ >>> + ch = allocate_handle (); >>> + if (ch == NULL) goto err; >>> + >>> + /* Prepare to read the headers. */ >>> + if (get_content_length_accept_range (ch) == -1) >>> + goto err; >>> + >>> + /* Send the command to the worker thread and wait. */ >>> + struct command cmd = { >>> + .type = EASY_HANDLE, >>> + .ch = ch, >>> + }; >>> + >>> + r = send_command_and_wait (&cmd); >>> + update_times (ch->c); >>> + if (r != CURLE_OK) { >>> + display_curl_error (ch, r, >>> + "problem doing HEAD request to fetch size of URL [%s]", >>> + url); >>> + >>> + /* Get the HTTP status code, if available. */ >>> + r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code); >>> + if (r == CURLE_OK) >>> + nbdkit_debug ("HTTP status code: %ld", code); >>> + else >>> + code = -1; >>> + >>> + /* See comment on try_fallback_GET_method below. */ >>> + if (code != 403 || !try_fallback_GET_method (ch)) >>> + goto err; >>> + } >>> + >>> + /* Get the content length. >>> + * >>> + * Note there is some subtlety here: For web servers using chunked >>> + * encoding, either the Content-Length header will not be present, >>> + * or if present it should be ignored. (For such servers the only >>> + * way to find out the true length would be to read all of the >>> + * content, which we don't want to do). >>> + * >>> + * Curl itself resolves this for us. It will ignore the >>> + * Content-Length header if chunked encoding is used, returning the >>> + * length as -1 which we check below (see also >>> + * curl:lib/http.c:Curl_http_size). >>> + */ >>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T >>> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o); >>> + if (r != CURLE_OK) { >>> + display_curl_error (ch, r, >>> + "could not get length of remote file [%s]", url); >>> + goto err; >>> + } >>> + >>> + if (o == -1) { >>> + nbdkit_error ("could not get length of remote file [%s], " >>> + "is the URL correct?", url); >>> + goto err; >>> + } >>> + >>> + exportsize = o; >>> +#else >>> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d); >>> + if (r != CURLE_OK) { >>> + display_curl_error (ch, r, >>> + "could not get length of remote file [%s]", url); >>> + goto err; >>> + } >>> + >>> + if (d == -1) { >>> + nbdkit_error ("could not get length of remote file [%s], " >>> + "is the URL correct?", url); >>> + goto err; >>> + } >>> + >>> + exportsize = d; >> >> Does curl guarantee that the double d will contain a value assignable >> to int64_t without overflow/truncation? For particularly large sizes, >> double has insufficient precision for all possible file sizes, but I >> doubt someone is exposing such large files over HTTP. > > No, I don't believe a 'double' is sufficient. This is why newer > versions of curl have HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T. Note > this code is just copied from the old curl plugin. > >>> +#endif >>> + nbdkit_debug ("content length: %" PRIi64, exportsize); >>> + >>> + /* If this is HTTP, check that byte ranges are supported. */ >>> + if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 || >>> + ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) { >>> + if (!ch->accept_range) { >>> + nbdkit_error ("server does not support 'range' (byte range) requests"); >>> + goto err; >>> + } >>> + >>> + nbdkit_debug ("accept range supported (for HTTP/HTTPS)"); >>> + } >>> + >>> + free_handle (ch); >>> + return exportsize; >>> + >>> + err: >>> + if (ch) >>> + free_handle (ch); >>> + return -1; >>> +} >>> + >>> +/* Get the file size and also whether the remote HTTP server >>> + * supports byte ranges. >>> + */ >>> +static int >>> +get_content_length_accept_range (struct curl_handle *ch) >>> +{ >>> + /* We must run the scripts if necessary and set headers in the >>> + * handle. >>> + */ >>> + if (do_scripts (ch) == -1) >>> + return -1; >>> + >>> + /* Set this flag in the handle to false. The callback should set it >>> + * to true if byte ranges are supported, which we check below. >>> + */ >>> + ch->accept_range = false; >>> + >>> + /* No Body, not nobody! This forces a HEAD request. */ >>> + curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L); >>> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); >>> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL); >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL); >>> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL); >>> + curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL); >>> + return 0; >>> +} >>> + >>> +/* S3 servers can return 403 Forbidden for HEAD but still respond >>> + * to GET, so we give it a second chance in that case. >>> + * https://github.com/kubevirt/containerized-data-importer/issues/2737 >>> + * >>> + * This function issues a GET request with a writefunction that always >>> + * returns an error, thus effectively getting the headers but >>> + * abandoning the transfer as soon as possible after. >>> + */ >>> +static bool >>> +try_fallback_GET_method (struct curl_handle *ch) >>> +{ >>> + CURLcode r; >>> + >>> + nbdkit_debug ("attempting to fetch headers using GET method"); >>> + >>> + curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L); >>> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); >>> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb); >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); >>> + >>> + struct command cmd = { >>> + .type = EASY_HANDLE, >>> + .ch = ch, >>> + }; >>> + >>> + r = send_command_and_wait (&cmd); >>> + update_times (ch->c); >>> + >>> + /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too >>> + * (eg if the remote has zero length). Other errors might happen >>> + * but we ignore them since it is a fallback path. >>> + */ >>> + return r == CURLE_OK || r == CURLE_WRITE_ERROR; >>> +} >>> + >>> +static size_t >>> +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque) >>> +{ >>> + struct curl_handle *ch = opaque; >>> + size_t realsize = size * nmemb; >>> + const char *header = ptr; >>> + const char *end = header + realsize; >>> + const char *accept_ranges = "accept-ranges:"; >>> + const char *bytes = "bytes"; >>> + >>> + if (realsize >= strlen (accept_ranges) && >>> + ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) { >>> + const char *p = strchr (header, ':') + 1; >>> + >>> + /* Skip whitespace between the header name and value. */ >>> + while (p < end && *p && ascii_isspace (*p)) >> >> Technically, '*p && ascii_isspace (*p)' can be shortened to >> 'ascii_isspace (*p)', since the NUL byte is not ascii space. I don't >> know if the compiler is smart enough to make that optimization on your >> behalf. > > Ah indeed. > >>> + p++; >>> + >>> + if (end - p >= strlen (bytes) >>> + && strncmp (p, bytes, strlen (bytes)) == 0) { >>> + /* Check that there is nothing but whitespace after the value. */ >>> + p += strlen (bytes); >>> + while (p < end && *p && ascii_isspace (*p)) >> >> Another spot of the same. >> >>> + p++; >>> + >>> + if (p == end || !*p) >>> + ch->accept_range = true; >>> + } >>> + } >>> + >>> + return realsize; >>> +} >>> + >>> +static size_t >>> +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque) >>> +{ >>> +#ifdef CURL_WRITEFUNC_ERROR >>> + return CURL_WRITEFUNC_ERROR; >>> +#else >>> + return 0; /* in older curl, any size < requested will also be an error */ >>> +#endif >>> +} >>> + >>> /* Read data from the remote server. */ >>> +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque); >>> + >>> static int >>> curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) >>> { >>> CURLcode r; >>> + struct curl_handle *ch; >>> char range[128]; >>> >>> - GET_HANDLE_FOR_CURRENT_SCOPE (ch); >>> - if (ch == NULL) >>> - return -1; >>> + /* Get a curl easy handle. */ >>> + ch = allocate_handle (); >>> + if (ch == NULL) goto err; >>> >>> /* Run the scripts if necessary and set headers in the handle. */ >>> - if (do_scripts (ch) == -1) return -1; >>> + if (do_scripts (ch) == -1) goto err; >>> >>> /* Tell the write_cb where we want the data to be written. write_cb >>> * will update this if the data comes in multiple sections. >>> */ >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb); >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); >>> ch->write_buf = buf; >>> ch->write_count = count; >>> >>> @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) >>> offset, offset + count); >>> curl_easy_setopt (ch->c, CURLOPT_RANGE, range); >>> >>> - /* The assumption here is that curl will look after timeouts. */ >>> - r = curl_easy_perform (ch->c); >>> + /* Send the command to the worker thread and wait. */ >>> + struct command cmd = { >>> + .type = EASY_HANDLE, >>> + .ch = ch, >>> + }; >>> + >>> + r = send_command_and_wait (&cmd); >>> if (r != CURLE_OK) { >>> - display_curl_error (ch, r, "pread: curl_easy_perform"); >>> - return -1; >>> + display_curl_error (ch, r, "pread"); >>> + goto err; >>> } >>> update_times (ch->c); >>> >>> @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) >>> /* As far as I understand the cURL API, this should never happen. */ >>> assert (ch->write_count == 0); >>> >>> + free_handle (ch); >>> return 0; >>> + >>> + err: >>> + if (ch) >>> + free_handle (ch); >>> + return -1; >>> +} >>> + >>> +/* NB: The terminology used by libcurl is confusing! >>> + * >>> + * WRITEFUNCTION / write_cb is used when reading from the remote server >>> + * READFUNCTION / read_cb is used when writing to the remote server. >>> + * >>> + * We use the same terminology as libcurl here. >>> + */ >>> +static size_t >>> +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque) >>> +{ >>> + struct curl_handle *ch = opaque; >>> + size_t orig_realsize = size * nmemb; >>> + size_t realsize = orig_realsize; >> >> Do we have to worry about overflow when compiling on 32-bit machines? >> Asked differently, should we be using off_t instead of size_t in any >> of this code? Thankfully, for now, we know NBD .pread and .pwrite >> requests are capped at 64M, so I think you're okay (we aren't ever >> going to ask curl for gigabytes in one request), > > It's a good question ... I suspect that even if we requested it, web > servers probably wouldn't want to serve gigabytes of data in a range > request, but as you point out we shouldn't ever request it right now. > >> but maybe a comment >> or assert() is worth it? > > I'll add a comment, but could we do this with one of the overflow > macros? I'm not sure ... > >>> + >>> + assert (ch->write_buf); >>> + >>> + /* Don't read more than the requested amount of data, even if the >>> + * server or libcurl sends more. >>> + */ >>> + if (realsize > ch->write_count) >>> + realsize = ch->write_count; >>> + >>> + memcpy (ch->write_buf, ptr, realsize); >>> + >>> + ch->write_count -= realsize; >>> + ch->write_buf += realsize; >>> + >>> + return orig_realsize; >> >> [1] >> >>> } >>> >>> /* Write data to the remote server. */ >>> +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque); >>> + >>> static int >>> curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) >>> { >>> CURLcode r; >>> + struct curl_handle *ch; >>> char range[128]; >>> >>> - GET_HANDLE_FOR_CURRENT_SCOPE (ch); >>> - if (ch == NULL) >>> - return -1; >>> + /* Get a curl easy handle. */ >>> + ch = allocate_handle (); >>> + if (ch == NULL) goto err; >>> >>> /* Run the scripts if necessary and set headers in the handle. */ >>> - if (do_scripts (ch) == -1) return -1; >>> + if (do_scripts (ch) == -1) goto err; >>> >>> /* Tell the read_cb where we want the data to be read from. read_cb >>> * will update this if the data comes in multiple sections. >>> */ >>> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb); >>> + curl_easy_setopt (ch->c, CURLOPT_READDATA, ch); >>> ch->read_buf = buf; >>> ch->read_count = count; >>> >>> @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) >>> offset, offset + count); >>> curl_easy_setopt (ch->c, CURLOPT_RANGE, range); >>> >>> - /* The assumption here is that curl will look after timeouts. */ >>> - r = curl_easy_perform (ch->c); >>> + /* Send the command to the worker thread and wait. */ >>> + struct command cmd = { >>> + .type = EASY_HANDLE, >>> + .ch = ch, >>> + }; >>> + >>> + r = send_command_and_wait (&cmd); >>> if (r != CURLE_OK) { >>> - display_curl_error (ch, r, "pwrite: curl_easy_perform"); >>> - return -1; >>> + display_curl_error (ch, r, "pwrite"); >>> + goto err; >>> } >>> update_times (ch->c); >>> >>> @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) >>> /* As far as I understand the cURL API, this should never happen. */ >>> assert (ch->read_count == 0); >>> >>> + free_handle (ch); >>> return 0; >>> + >>> + err: >>> + if (ch) >>> + free_handle (ch); >>> + return -1; >>> +} >>> + >>> +static size_t >>> +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque) >>> +{ >>> + struct curl_handle *ch = opaque; >>> + size_t realsize = size * nmemb; >>> + >>> + assert (ch->read_buf); >>> + if (realsize > ch->read_count) >>> + realsize = ch->read_count; >>> + >>> + memcpy (ptr, ch->read_buf, realsize); >>> + >>> + ch->read_count -= realsize; >>> + ch->read_buf += realsize; >>> + >>> + return realsize; >> >> Why does write_cb in [1] above return orig_realsize, but read_cb >> returns the potentially modified realsize? > > It's a good question (this code was copied from the old plugin which > was working for years). It definitely works now. Note that writes in > this plugin are probably never used. They require a web server that > supports "Range PUTs" which is, probably, not any server in existence. > >>> } >>> >>> static struct nbdkit_plugin plugin = { >>> diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c >>> index eb2d330e1..2974cda3f 100644 >>> --- a/plugins/curl/pool.c >>> +++ b/plugins/curl/pool.c >>> @@ -30,11 +30,29 @@ >>> * SUCH DAMAGE. >>> */ >>> >>> -/* Curl handle pool. >>> +/* Worker thread which processes the curl multi interface. >>> * >>> - * To get a libcurl handle, call get_handle(). When you hold the >>> - * handle, it is yours exclusively to use. After you have finished >>> - * with the handle, put it back into the pool by calling put_handle(). >>> + * The main nbdkit threads (see curl.c) create curl easy handles >>> + * initialized with the work they want to carry out. Note there is >>> + * one easy handle per task (eg. per pread/pwrite request). The easy >>> + * handles are not reused. >>> + * >>> + * The commands + optional easy handle are submitted to the worker >>> + * thread over a self-pipe (it's easy to use a pipe here because the >>> + * way curl multi works is it can listen on an extra fd, but not on >>> + * anything else like a pthread condition). The curl multi performs >>> + * the work of the outstanding easy handles. >>> + * >>> + * When an easy handle finishes work or errors, we retire the command >>> + * by signalling back to the waiting nbdkit thread using a pthread >>> + * condition. >>> + * >>> + * In my experiments, we're almost always I/O bound so I haven't seen >>> + * any strong need to use more than one curl multi / worker thread, >>> + * although it would be possible to add more in future. >>> + * >>> + * See also this extremely useful thread: >>> + * https://curl.se/mail/lib-2019-03/0100.html >> >> Very useful comment (and link). >> >>> */ >>> >>> #include <config.h> >>> @@ -45,9 +63,19 @@ >>> #include <stdint.h> >>> #include <inttypes.h> >>> #include <string.h> >>> +#include <unistd.h> >>> #include <assert.h> >>> #include <pthread.h> >>> >>> +#ifdef HAVE_STDATOMIC_H >>> +#include <stdatomic.h> >>> +#else >>> +/* Some old platforms lack atomic types, but 32 bit ints are usually >>> + * "atomic enough". >>> + */ >>> +#define _Atomic /**/ >>> +#endif >>> + >>> #include <curl/curl.h> >>> >>> #include <nbdkit-plugin.h> >>> @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0; >>> >>> unsigned connections = 4; >>> >>> -/* This lock protects access to the curl_handles vector below. */ >>> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; >>> +/* Pipe used to notify background thread that a command is pending in >>> + * the queue. A pointer to the 'struct command' is sent over the >>> + * pipe. >>> + */ >>> +static int self_pipe[2] = { -1, -1 }; >>> >>> -/* List of curl handles. This is allocated dynamically as more >>> - * handles are requested. Currently it does not shrink. It may grow >>> - * up to 'connections' in length. >>> +/* The curl multi handle. */ >>> +static CURLM *multi; >>> + >>> +/* List of running easy handles. We only need to maintain this so we >>> + * can remove them from the multi handle when cleaning up. >>> */ >>> DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *); >>> static curl_handle_list curl_handles = empty_vector; >>> >>> -/* The condition is used when the curl handles vector is full and >>> - * we're waiting for a thread to put_handle. >>> - */ >>> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; >>> -static size_t in_use = 0, waiting = 0; >>> +static const char * >>> +command_type_to_string (enum command_type type) >>> +{ >>> + switch (type) { >>> + case EASY_HANDLE: return "EASY_HANDLE"; >>> + case STOP: return "STOP"; >>> + default: abort (); >>> + } >>> +} >>> >>> int >>> pool_get_ready (void) >>> { >>> + multi = curl_multi_init (); >>> + if (multi == NULL) { >>> + nbdkit_error ("curl_multi_init failed: %m"); >>> + return -1; >>> + } >>> + >>> return 0; >>> } >>> >>> +/* Start and stop the background thread. */ >>> +static pthread_t thread; >>> +static bool thread_running; >>> +static void *pool_worker (void *); >>> + >>> int >>> pool_after_fork (void) >>> { >>> + int err; >>> + >>> + if (pipe (self_pipe) == -1) { >>> + nbdkit_error ("pipe: %m"); >>> + return -1; >>> + } >>> + >>> + /* Start the pool background thread where all the curl work is done. */ >>> + err = pthread_create (&thread, NULL, pool_worker, NULL); >>> + if (err != 0) { >>> + errno = err; >>> + nbdkit_error ("pthread_create: %m"); >>> + return -1; >>> + } >>> + thread_running = true; >>> + >>> return 0; >>> } >>> >>> -/* Close and free all handles in the pool. */ >>> +/* Unload the background thread. */ >>> void >>> pool_unload (void) >>> { >>> - size_t i; >>> + if (thread_running) { >>> + /* Stop the background thread. */ >>> + struct command cmd = { .type = STOP }; >>> + send_command_and_wait (&cmd); >>> + pthread_join (thread, NULL); >>> + thread_running = false; >>> + } >>> >>> - if (curl_debug_pool) >>> - nbdkit_debug ("unload_pool: number of curl handles allocated: %zu", >>> - curl_handles.len); >>> + if (self_pipe[0] >= 0) { >>> + close (self_pipe[0]); >>> + self_pipe[0] = -1; >>> + } >>> + if (self_pipe[1] >= 0) { >>> + close (self_pipe[1]); >>> + self_pipe[1] = -1; >>> + } >>> >>> - for (i = 0; i < curl_handles.len; ++i) >>> - free_handle (curl_handles.ptr[i]); >>> - curl_handle_list_reset (&curl_handles); >>> + if (multi) { >>> + size_t i; >>> + >>> + /* Remove and free any easy handles in the multi. */ >>> + for (i = 0; i < curl_handles.len; ++i) { >>> + curl_multi_remove_handle (multi, curl_handles.ptr[i]->c); >>> + free_handle (curl_handles.ptr[i]); >>> + } >>> + >>> + curl_multi_cleanup (multi); >>> + multi = NULL; >>> + } >>> } >>> >>> -/* Get a handle from the pool. >>> - * >>> - * It is owned exclusively by the caller until they call put_handle. >>> +/* Command queue. */ >>> +static _Atomic uint64_t id; /* next command ID */ >>> + >>> +/* Send command to the background thread and wait for completion. >>> + * This is only called by one of the nbdkit threads. >>> */ >>> -struct curl_handle * >>> -get_handle (void) >>> +CURLcode >>> +send_command_and_wait (struct command *cmd) >>> { >>> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); >>> - size_t i; >>> - struct curl_handle *ch; >>> - >>> - again: >>> - /* Look for a handle which is not in_use. */ >>> - for (i = 0; i < curl_handles.len; ++i) { >>> - ch = curl_handles.ptr[i]; >>> - if (!ch->in_use) { >>> - ch->in_use = true; >>> - in_use++; >>> + cmd->id = id++; >>> + >>> + /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to >>> + * indicate that the command has not yet been completed and status >>> + * set. >>> + */ >>> + cmd->status = -1; >>> + >>> + /* This will be used to signal command completion back to us. */ >>> + pthread_mutex_init (&cmd->mutex, NULL); >>> + pthread_cond_init (&cmd->cond, NULL); >>> + >>> + /* Send the command to the background thread. */ >>> + if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd) >>> + abort (); >>> + >>> + /* Wait for the command to be completed by the background thread. */ >>> + { >>> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); >>> + while (cmd->status == -1) /* for -1, see above */ >>> + pthread_cond_wait (&cmd->cond, &cmd->mutex); >>> + } >>> + >>> + pthread_mutex_destroy (&cmd->mutex); >>> + pthread_cond_destroy (&cmd->cond); >>> + >>> + /* Note the main thread must call nbdkit_error on error! */ >>> + return cmd->status; >>> +} >>> + >>> +/* The background thread. */ >>> +static void check_for_finished_handles (void); >>> +static void retire_command (struct command *cmd, CURLcode code); >>> +static void do_easy_handle (struct command *cmd); >>> + >>> +static void * >>> +pool_worker (void *vp) >>> +{ >>> + bool stop = false; >>> + >>> + if (curl_debug_pool) >>> + nbdkit_debug ("curl: background thread started"); >>> + >>> + while (!stop) { >>> + struct command *cmd = NULL; >>> + struct curl_waitfd extra_fds[1] >>> + { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } }; >>> + CURLMcode mc; >>> + int numfds, running_handles, repeats = 0; >>> + >>> + do { >>> + /* Process the multi handle. */ >>> + mc = curl_multi_perform (multi, &running_handles); >>> + if (mc != CURLM_OK) { >>> + nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc)); >> >> Since nbdkit_error() stores its string in thread-local storage, is >> there anything that ever extracts this error over to the nbdkit thread >> that issued the original request to the worker thread?... >> >>> + abort (); /* XXX We don't expect this to happen */ >> >> ...Then again, if we abort, it doesn't matter. > > I was unclear what to do here. The final code does: > > while (!stop) { > ... > cmd = process_multi_handle (); > if (cmd == NULL) > continue; /* or die?? */ > > with process_multi_handle still calling nbdkit_error. I felt it might > be better to keep trying than just abort, as presumably some (most?) > errors are transient. > > nbdkit_error will ensure the error message is written out. > >>> + } >> >>> + >>> + check_for_finished_handles (); >>> + >>> + mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds); >>> + if (mc != CURLM_OK) { >>> + nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc)); >>> + abort (); /* XXX We don't expect this to happen */ >>> + } >>> + >>> if (curl_debug_pool) >>> - nbdkit_debug ("get_handle: %zu", ch->i); >>> - return ch; >>> - } >>> - } >>> + nbdkit_debug ("curl_multi_wait returned: running_handles=%d numfds=%d", >>> + running_handles, numfds); >>> + >>> + if (numfds == 0) { >>> + repeats++; >>> + if (repeats > 1) >>> + nbdkit_nanosleep (1, 0); >>> + } >>> + else { >>> + repeats = 0; >>> + if (extra_fds[0].revents == CURL_WAIT_POLLIN) { >>> + /* There's a command waiting. */ >>> + if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd) >>> + abort (); >>> + } >>> + } >>> + } while (!cmd); >>> >>> - /* If more connections are allowed, then allocate a new handle. */ >>> - if (curl_handles.len < connections) { >>> - ch = allocate_handle (); >>> - if (ch == NULL) >>> - return NULL; >>> - if (curl_handle_list_append (&curl_handles, ch) == -1) { >>> - free_handle (ch); >>> - return NULL; >>> - } >>> - ch->i = curl_handles.len - 1; >>> - ch->in_use = true; >>> - in_use++; >>> if (curl_debug_pool) >>> - nbdkit_debug ("get_handle: %zu", ch->i); >>> - return ch; >>> - } >>> + nbdkit_debug ("curl: dispatching %s command %" PRIu64, >>> + command_type_to_string (cmd->type), cmd->id); >>> + >>> + switch (cmd->type) { >>> + case STOP: >>> + stop = true; >>> + retire_command (cmd, CURLE_OK); >>> + break; >>> >>> - /* Otherwise we have run out of connections so we must wait until >>> - * another thread calls put_handle. >>> - */ >>> - assert (in_use == connections); >>> - waiting++; >>> - while (in_use == connections) >>> - pthread_cond_wait (&cond, &lock); >>> - waiting--; >>> + case EASY_HANDLE: >>> + do_easy_handle (cmd); >>> + break; >>> + } >>> + } /* while (!stop) */ >>> >>> - goto again; >>> + if (curl_debug_pool) >>> + nbdkit_debug ("curl: background thread stopped"); >>> + >>> + return NULL; >>> } >>> >>> -/* Return the handle to the pool. */ >>> -void >>> -put_handle (struct curl_handle *ch) >>> +/* This checks if any easy handles in the multi have >>> + * finished and retires the associated commands. >>> + */ >>> +static void >>> +check_for_finished_handles (void) >>> { >>> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); >>> + CURLMsg *msg; >>> + int msgs_in_queue; >>> + >>> + while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) { >>> + size_t i; >>> + struct curl_handle *ch = NULL; >>> + >>> + if (msg->msg == CURLMSG_DONE) { >>> + /* Find this curl_handle. */ >>> + for (i = 0; i < curl_handles.len; ++i) { >>> + if (curl_handles.ptr[i]->c == msg->easy_handle) { >>> + ch = curl_handles.ptr[i]; >>> + curl_handle_list_remove (&curl_handles, i); >>> + } >>> + } >>> + if (ch == NULL) abort (); >>> + curl_multi_remove_handle (multi, ch->c); >>> + >>> + retire_command (ch->cmd, msg->data.result); >>> + } >>> + } >>> +} >>> >>> +/* Retire a command. status is a CURLcode. */ >>> +static void >>> +retire_command (struct command *cmd, CURLcode status) >>> +{ >>> if (curl_debug_pool) >>> - nbdkit_debug ("put_handle: %zu", ch->i); >>> + nbdkit_debug ("curl: retiring %s command %" PRIu64, >>> + command_type_to_string (cmd->type), cmd->id); >>> + >>> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); >>> + cmd->status = status; >>> + pthread_cond_signal (&cmd->cond); >>> +} >>> + >>> +static void >>> +do_easy_handle (struct command *cmd) >>> +{ >>> + CURLMcode mc; >>> + >>> + cmd->ch->cmd = cmd; >>> + >>> + /* Add the handle to the multi. */ >>> + mc = curl_multi_add_handle (multi, cmd->ch->c); >>> + if (mc != CURLM_OK) { >>> + nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc)); >>> + goto err; >>> + } >>> >>> - ch->in_use = false; >>> - in_use--; >>> + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1) >>> + goto err; >>> + return; >>> >>> - /* Signal the next thread which is waiting. */ >>> - if (waiting > 0) >>> - pthread_cond_signal (&cond); >>> + err: >>> + retire_command (cmd, CURLE_OUT_OF_MEMORY); >>> } >>> -- >>> 2.41.0 >> >> Overall looks nice, and I learned more about curl in the process. > > For me, too much :-( > > Thanks, > > Rich. >
Richard W.M. Jones
2023-Aug-07 11:08 UTC
[Libguestfs] [PATCH nbdkit v2 8/9] curl: Use curl multi interface
On Mon, Aug 07, 2023 at 12:57:02PM +0200, Laszlo Ersek wrote:> On 8/4/23 20:04, Richard W.M. Jones wrote: > > On Fri, Aug 04, 2023 at 11:38:03AM -0500, Eric Blake wrote: > >> On Fri, Jul 28, 2023 at 06:17:52PM +0100, Richard W.M. Jones wrote: > >>> See the comment at the top of plugins/curl/pool.c for general > >>> information about how this works. > >>> > >>> This makes a very large difference to performance over the previous > >>> implementation. Note for the tests below I also applied the next > >>> commit changing the behaviour of the connections parameter. > >>> > >>> Using this test case: > >>> > >>> $ uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img > >>> $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri null' > >>> > >>> The times are as follows: > >>> > >>> multi, connections=64 21.5s > >>> multi, connections=32 30.2s > >>> multi, connections=16 56.0s > >>> before this commit 166s > >> > >> Awesome performance improvements! As painful as this series has been > >> for you to write and debug, it is showing its worth. > >> > >>> --- > >>> plugins/curl/curldefs.h | 35 ++-- > >>> plugins/curl/config.c | 246 --------------------------- > >>> plugins/curl/curl.c | 366 +++++++++++++++++++++++++++++++++++----- > >>> plugins/curl/pool.c | 346 ++++++++++++++++++++++++++++--------- > >>> 4 files changed, 616 insertions(+), 377 deletions(-) > >> > >> Finally taking time to review this, even though it is already in-tree. > >> > >>> @@ -98,8 +88,30 @@ struct curl_handle { > >>> const char *read_buf; > >>> uint32_t read_count; > >>> > >>> + /* This field is used by curl_get_size. */ > >>> + bool accept_range; > >>> + > >>> /* Used by scripts.c */ > >>> struct curl_slist *headers_copy; > >>> + > >>> + /* Used by pool.c */ > >>> + struct command *cmd; > >>> +}; > >>> + > >>> +/* Asynchronous commands that can be sent to the pool thread. */ > >>> +enum command_type { EASY_HANDLE, STOP }; > >>> +struct command { > >>> + /* These fields are set by the caller. */ > >>> + enum command_type type; /* command */ > >>> + struct curl_handle *ch; /* for EASY_HANDLE, the easy handle */ > >>> + > >>> + /* This field is set to a unique value by send_command_and_wait. */ > >>> + uint64_t id; /* serial number */ > >>> + > >>> + /* These fields are used to signal back that the command finished. */ > >>> + pthread_mutex_t mutex; /* completion mutex */ > >>> + pthread_cond_t cond; /* completion condition */ > >>> + CURLcode status; /* status code (CURLE_OK = succeeded) */ > >>> }; > >> > >> Makes sense. The two types are mutually recursive (curl_handle > >> includes a struct command *; command includes a struct curl_handle *); > >> hopefully you have proper locking when altering multiple objects to > >> adjust how they point to one another. > > > > Actually locking is not needed. Let me document it through ... > > > > We create both the curl easy handle and the associated EASY_HANDLE > > command in the nbdkit thread that gets the request, eg. in the curl > > .pread method. That of course requires no locking. > > > > There is a single background worker thread. > > > > A "self pipe" passes pointers to 'struct command *' to this worker > > thread simple by writing the 8 byte pointer onto the pipe (hopefully > > atomic ...) The nbdkit request thread then blocks on the mutex/cond > > in the command handle. > > Right, that's exactly that I got curious about. > > In my opinion, writing to the self-pipe is mostly safe. Reading from the > self-pipe could be slightly polished. > > Here are the POSIX pages on write() and read(): > > https://pubs.opengroup.org/onlinepubs/9699919799/functions/write.html > https://pubs.opengroup.org/onlinepubs/9699919799/functions/read.html > > The first one (write) says that if you attempt to write at most PIPE_BUF > bytes, then writes will not be interleaved with other concurrent writes, > so in a sense the write will be atomic. (And in this case, O_NONBLOCK > only changes the behavior for the case when the buffer cannot be written > in entirety: with O_NONBLOCK clear, the writer thread will block, with > O_NONBLOCK set, the writer thread will see -1/EAGAIN.) > > Now, PIPE_BUF is "variable" in a sense: > > https://pubs.opengroup.org/onlinepubs/9699919799/basedefs/limits.h.html > > but it is greater than or equal to _POSIX_PIPE_BUF, which is 512. Our > pointers are 8 bytes in size, so the <=PIPE_BUF condition is surely > satisfied. > > ... The only case I see possible for a short write is the delivery of a > signal after transfer has started. *If* nbdkit catches some signal such > that the signal handler actually returns, then this could be a > (theoretical) problem, calling for xwrite() or similar. > > > > > The background worker thread picks the 'struct command *' up from the > > pipe in the same event loop that it uses to process ongoing requests > > on the multi handle. It takes the easy handle and adds it to the > > multi handle. > > The spec on read() does not seem to have the same kind of > non-interleaving / "atomicity" language that write() does. The rationale > section says: > > "The standard developers considered adding atomicity requirements to a > pipe or FIFO, but recognized that due to the nature of pipes and FIFOs > there could be no guarantee of atomicity of reads of {PIPE_BUF} or any > other size that would be an aid to applications portability." > > Now given that the writer side is free of interleaving (because, in the > first place: there is only a single writer!), I think we need not worry > about data corruption. However, it does feel like read() may return > fewer than 8 bytes in one go, "just because" (not only because of a > signal being delivered midway).So there are actually multiple writer threads, but only a single reader thread. Also I should say that I didn't set O_NONBLOCK (actually I completely forgot), but that might be a benefit in this case since it makes it less likely for the 8 byte read to be broken up.> And that may be a problem with readiness reporting via > curl_multi_wait(); even if you get CURL_WAIT_POLLIN, the whole command > pointer may not yet be available. > > Now I do think a split read is extremely unlikely, maybe even impossible > on Linux. If we choose to be pedantic, then the curl_multi_wait() loop > might want to expect "cmd" to be populated only over multiple iterations > -- like use "cmd_ptr_bytes" or something similar for tracking the size > already available, and only consider "cmd" usable when cmd_ptr_bytes > reaches sizeof cmd. > > > Yet another topic that comes up is visibility / ordering. Transfering a > pointer via write()/read() between threads does not seem to guarantee > ordering / visibility regarding the *pointed-to* area per spec. Pthread > mutex APIs (and C11 thread and atomics APIs) include the CPU > instructions for ensuring proper memory visibility. > > *BUT* I think it would be insane for any POSIX implementation to have a > write()+read() combination that's *weaker* regarding data consistency, > (i.e., that's more racy) than mutexes. Write() and read() are > heavy-weight syscalls, so I can't imagine a publish-subscribe pattern > not working with them (i.e., the pointed-to area not "settling" until > the pointer is consumed).Yup, it seems unlikely, but maybe it could happen on architectures with weaker store ordering like Arm? I wonder if adding a memory barrier before the write would be a good idea? Thanks, Rich.> Laszlo > > > > > When the easy handle work has finished, the worker thread removes it > > from the multi handle and signals the nbdkit request thread to wake up > > (using cmd->mutex + cmd->lock). At which point possession passes back > > to the request thread which will usually free up both the command and > > easy handle. > > > >>> +++ b/plugins/curl/config.c > >> > >>> +++ b/plugins/curl/curl.c > >>> > >>> +/* Get the file size. */ > >>> +static int get_content_length_accept_range (struct curl_handle *ch); > >>> +static bool try_fallback_GET_method (struct curl_handle *ch); > >>> +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque); > >>> +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque); > >>> + > >>> +static int64_t > >>> +curl_get_size (void *handle) > >>> +{ > >>> + struct curl_handle *ch; > >>> + CURLcode r; > >>> + long code; > >>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T > >>> + curl_off_t o; > >>> +#else > >>> + double d; > >>> +#endif > >>> + int64_t exportsize; > >>> + > >>> + /* Get a curl easy handle. */ > >>> + ch = allocate_handle (); > >>> + if (ch == NULL) goto err; > >>> + > >>> + /* Prepare to read the headers. */ > >>> + if (get_content_length_accept_range (ch) == -1) > >>> + goto err; > >>> + > >>> + /* Send the command to the worker thread and wait. */ > >>> + struct command cmd = { > >>> + .type = EASY_HANDLE, > >>> + .ch = ch, > >>> + }; > >>> + > >>> + r = send_command_and_wait (&cmd); > >>> + update_times (ch->c); > >>> + if (r != CURLE_OK) { > >>> + display_curl_error (ch, r, > >>> + "problem doing HEAD request to fetch size of URL [%s]", > >>> + url); > >>> + > >>> + /* Get the HTTP status code, if available. */ > >>> + r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code); > >>> + if (r == CURLE_OK) > >>> + nbdkit_debug ("HTTP status code: %ld", code); > >>> + else > >>> + code = -1; > >>> + > >>> + /* See comment on try_fallback_GET_method below. */ > >>> + if (code != 403 || !try_fallback_GET_method (ch)) > >>> + goto err; > >>> + } > >>> + > >>> + /* Get the content length. > >>> + * > >>> + * Note there is some subtlety here: For web servers using chunked > >>> + * encoding, either the Content-Length header will not be present, > >>> + * or if present it should be ignored. (For such servers the only > >>> + * way to find out the true length would be to read all of the > >>> + * content, which we don't want to do). > >>> + * > >>> + * Curl itself resolves this for us. It will ignore the > >>> + * Content-Length header if chunked encoding is used, returning the > >>> + * length as -1 which we check below (see also > >>> + * curl:lib/http.c:Curl_http_size). > >>> + */ > >>> +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T > >>> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o); > >>> + if (r != CURLE_OK) { > >>> + display_curl_error (ch, r, > >>> + "could not get length of remote file [%s]", url); > >>> + goto err; > >>> + } > >>> + > >>> + if (o == -1) { > >>> + nbdkit_error ("could not get length of remote file [%s], " > >>> + "is the URL correct?", url); > >>> + goto err; > >>> + } > >>> + > >>> + exportsize = o; > >>> +#else > >>> + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d); > >>> + if (r != CURLE_OK) { > >>> + display_curl_error (ch, r, > >>> + "could not get length of remote file [%s]", url); > >>> + goto err; > >>> + } > >>> + > >>> + if (d == -1) { > >>> + nbdkit_error ("could not get length of remote file [%s], " > >>> + "is the URL correct?", url); > >>> + goto err; > >>> + } > >>> + > >>> + exportsize = d; > >> > >> Does curl guarantee that the double d will contain a value assignable > >> to int64_t without overflow/truncation? For particularly large sizes, > >> double has insufficient precision for all possible file sizes, but I > >> doubt someone is exposing such large files over HTTP. > > > > No, I don't believe a 'double' is sufficient. This is why newer > > versions of curl have HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T. Note > > this code is just copied from the old curl plugin. > > > >>> +#endif > >>> + nbdkit_debug ("content length: %" PRIi64, exportsize); > >>> + > >>> + /* If this is HTTP, check that byte ranges are supported. */ > >>> + if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 || > >>> + ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) { > >>> + if (!ch->accept_range) { > >>> + nbdkit_error ("server does not support 'range' (byte range) requests"); > >>> + goto err; > >>> + } > >>> + > >>> + nbdkit_debug ("accept range supported (for HTTP/HTTPS)"); > >>> + } > >>> + > >>> + free_handle (ch); > >>> + return exportsize; > >>> + > >>> + err: > >>> + if (ch) > >>> + free_handle (ch); > >>> + return -1; > >>> +} > >>> + > >>> +/* Get the file size and also whether the remote HTTP server > >>> + * supports byte ranges. > >>> + */ > >>> +static int > >>> +get_content_length_accept_range (struct curl_handle *ch) > >>> +{ > >>> + /* We must run the scripts if necessary and set headers in the > >>> + * handle. > >>> + */ > >>> + if (do_scripts (ch) == -1) > >>> + return -1; > >>> + > >>> + /* Set this flag in the handle to false. The callback should set it > >>> + * to true if byte ranges are supported, which we check below. > >>> + */ > >>> + ch->accept_range = false; > >>> + > >>> + /* No Body, not nobody! This forces a HEAD request. */ > >>> + curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L); > >>> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); > >>> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); > >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL); > >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL); > >>> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL); > >>> + curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL); > >>> + return 0; > >>> +} > >>> + > >>> +/* S3 servers can return 403 Forbidden for HEAD but still respond > >>> + * to GET, so we give it a second chance in that case. > >>> + * https://github.com/kubevirt/containerized-data-importer/issues/2737 > >>> + * > >>> + * This function issues a GET request with a writefunction that always > >>> + * returns an error, thus effectively getting the headers but > >>> + * abandoning the transfer as soon as possible after. > >>> + */ > >>> +static bool > >>> +try_fallback_GET_method (struct curl_handle *ch) > >>> +{ > >>> + CURLcode r; > >>> + > >>> + nbdkit_debug ("attempting to fetch headers using GET method"); > >>> + > >>> + curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L); > >>> + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); > >>> + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); > >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb); > >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); > >>> + > >>> + struct command cmd = { > >>> + .type = EASY_HANDLE, > >>> + .ch = ch, > >>> + }; > >>> + > >>> + r = send_command_and_wait (&cmd); > >>> + update_times (ch->c); > >>> + > >>> + /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too > >>> + * (eg if the remote has zero length). Other errors might happen > >>> + * but we ignore them since it is a fallback path. > >>> + */ > >>> + return r == CURLE_OK || r == CURLE_WRITE_ERROR; > >>> +} > >>> + > >>> +static size_t > >>> +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque) > >>> +{ > >>> + struct curl_handle *ch = opaque; > >>> + size_t realsize = size * nmemb; > >>> + const char *header = ptr; > >>> + const char *end = header + realsize; > >>> + const char *accept_ranges = "accept-ranges:"; > >>> + const char *bytes = "bytes"; > >>> + > >>> + if (realsize >= strlen (accept_ranges) && > >>> + ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) { > >>> + const char *p = strchr (header, ':') + 1; > >>> + > >>> + /* Skip whitespace between the header name and value. */ > >>> + while (p < end && *p && ascii_isspace (*p)) > >> > >> Technically, '*p && ascii_isspace (*p)' can be shortened to > >> 'ascii_isspace (*p)', since the NUL byte is not ascii space. I don't > >> know if the compiler is smart enough to make that optimization on your > >> behalf. > > > > Ah indeed. > > > >>> + p++; > >>> + > >>> + if (end - p >= strlen (bytes) > >>> + && strncmp (p, bytes, strlen (bytes)) == 0) { > >>> + /* Check that there is nothing but whitespace after the value. */ > >>> + p += strlen (bytes); > >>> + while (p < end && *p && ascii_isspace (*p)) > >> > >> Another spot of the same. > >> > >>> + p++; > >>> + > >>> + if (p == end || !*p) > >>> + ch->accept_range = true; > >>> + } > >>> + } > >>> + > >>> + return realsize; > >>> +} > >>> + > >>> +static size_t > >>> +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque) > >>> +{ > >>> +#ifdef CURL_WRITEFUNC_ERROR > >>> + return CURL_WRITEFUNC_ERROR; > >>> +#else > >>> + return 0; /* in older curl, any size < requested will also be an error */ > >>> +#endif > >>> +} > >>> + > >>> /* Read data from the remote server. */ > >>> +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque); > >>> + > >>> static int > >>> curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > >>> { > >>> CURLcode r; > >>> + struct curl_handle *ch; > >>> char range[128]; > >>> > >>> - GET_HANDLE_FOR_CURRENT_SCOPE (ch); > >>> - if (ch == NULL) > >>> - return -1; > >>> + /* Get a curl easy handle. */ > >>> + ch = allocate_handle (); > >>> + if (ch == NULL) goto err; > >>> > >>> /* Run the scripts if necessary and set headers in the handle. */ > >>> - if (do_scripts (ch) == -1) return -1; > >>> + if (do_scripts (ch) == -1) goto err; > >>> > >>> /* Tell the write_cb where we want the data to be written. write_cb > >>> * will update this if the data comes in multiple sections. > >>> */ > >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb); > >>> + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); > >>> ch->write_buf = buf; > >>> ch->write_count = count; > >>> > >>> @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > >>> offset, offset + count); > >>> curl_easy_setopt (ch->c, CURLOPT_RANGE, range); > >>> > >>> - /* The assumption here is that curl will look after timeouts. */ > >>> - r = curl_easy_perform (ch->c); > >>> + /* Send the command to the worker thread and wait. */ > >>> + struct command cmd = { > >>> + .type = EASY_HANDLE, > >>> + .ch = ch, > >>> + }; > >>> + > >>> + r = send_command_and_wait (&cmd); > >>> if (r != CURLE_OK) { > >>> - display_curl_error (ch, r, "pread: curl_easy_perform"); > >>> - return -1; > >>> + display_curl_error (ch, r, "pread"); > >>> + goto err; > >>> } > >>> update_times (ch->c); > >>> > >>> @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > >>> /* As far as I understand the cURL API, this should never happen. */ > >>> assert (ch->write_count == 0); > >>> > >>> + free_handle (ch); > >>> return 0; > >>> + > >>> + err: > >>> + if (ch) > >>> + free_handle (ch); > >>> + return -1; > >>> +} > >>> + > >>> +/* NB: The terminology used by libcurl is confusing! > >>> + * > >>> + * WRITEFUNCTION / write_cb is used when reading from the remote server > >>> + * READFUNCTION / read_cb is used when writing to the remote server. > >>> + * > >>> + * We use the same terminology as libcurl here. > >>> + */ > >>> +static size_t > >>> +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque) > >>> +{ > >>> + struct curl_handle *ch = opaque; > >>> + size_t orig_realsize = size * nmemb; > >>> + size_t realsize = orig_realsize; > >> > >> Do we have to worry about overflow when compiling on 32-bit machines? > >> Asked differently, should we be using off_t instead of size_t in any > >> of this code? Thankfully, for now, we know NBD .pread and .pwrite > >> requests are capped at 64M, so I think you're okay (we aren't ever > >> going to ask curl for gigabytes in one request), > > > > It's a good question ... I suspect that even if we requested it, web > > servers probably wouldn't want to serve gigabytes of data in a range > > request, but as you point out we shouldn't ever request it right now. > > > >> but maybe a comment > >> or assert() is worth it? > > > > I'll add a comment, but could we do this with one of the overflow > > macros? I'm not sure ... > > > >>> + > >>> + assert (ch->write_buf); > >>> + > >>> + /* Don't read more than the requested amount of data, even if the > >>> + * server or libcurl sends more. > >>> + */ > >>> + if (realsize > ch->write_count) > >>> + realsize = ch->write_count; > >>> + > >>> + memcpy (ch->write_buf, ptr, realsize); > >>> + > >>> + ch->write_count -= realsize; > >>> + ch->write_buf += realsize; > >>> + > >>> + return orig_realsize; > >> > >> [1] > >> > >>> } > >>> > >>> /* Write data to the remote server. */ > >>> +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque); > >>> + > >>> static int > >>> curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > >>> { > >>> CURLcode r; > >>> + struct curl_handle *ch; > >>> char range[128]; > >>> > >>> - GET_HANDLE_FOR_CURRENT_SCOPE (ch); > >>> - if (ch == NULL) > >>> - return -1; > >>> + /* Get a curl easy handle. */ > >>> + ch = allocate_handle (); > >>> + if (ch == NULL) goto err; > >>> > >>> /* Run the scripts if necessary and set headers in the handle. */ > >>> - if (do_scripts (ch) == -1) return -1; > >>> + if (do_scripts (ch) == -1) goto err; > >>> > >>> /* Tell the read_cb where we want the data to be read from. read_cb > >>> * will update this if the data comes in multiple sections. > >>> */ > >>> + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb); > >>> + curl_easy_setopt (ch->c, CURLOPT_READDATA, ch); > >>> ch->read_buf = buf; > >>> ch->read_count = count; > >>> > >>> @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > >>> offset, offset + count); > >>> curl_easy_setopt (ch->c, CURLOPT_RANGE, range); > >>> > >>> - /* The assumption here is that curl will look after timeouts. */ > >>> - r = curl_easy_perform (ch->c); > >>> + /* Send the command to the worker thread and wait. */ > >>> + struct command cmd = { > >>> + .type = EASY_HANDLE, > >>> + .ch = ch, > >>> + }; > >>> + > >>> + r = send_command_and_wait (&cmd); > >>> if (r != CURLE_OK) { > >>> - display_curl_error (ch, r, "pwrite: curl_easy_perform"); > >>> - return -1; > >>> + display_curl_error (ch, r, "pwrite"); > >>> + goto err; > >>> } > >>> update_times (ch->c); > >>> > >>> @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > >>> /* As far as I understand the cURL API, this should never happen. */ > >>> assert (ch->read_count == 0); > >>> > >>> + free_handle (ch); > >>> return 0; > >>> + > >>> + err: > >>> + if (ch) > >>> + free_handle (ch); > >>> + return -1; > >>> +} > >>> + > >>> +static size_t > >>> +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque) > >>> +{ > >>> + struct curl_handle *ch = opaque; > >>> + size_t realsize = size * nmemb; > >>> + > >>> + assert (ch->read_buf); > >>> + if (realsize > ch->read_count) > >>> + realsize = ch->read_count; > >>> + > >>> + memcpy (ptr, ch->read_buf, realsize); > >>> + > >>> + ch->read_count -= realsize; > >>> + ch->read_buf += realsize; > >>> + > >>> + return realsize; > >> > >> Why does write_cb in [1] above return orig_realsize, but read_cb > >> returns the potentially modified realsize? > > > > It's a good question (this code was copied from the old plugin which > > was working for years). It definitely works now. Note that writes in > > this plugin are probably never used. They require a web server that > > supports "Range PUTs" which is, probably, not any server in existence. > > > >>> } > >>> > >>> static struct nbdkit_plugin plugin = { > >>> diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c > >>> index eb2d330e1..2974cda3f 100644 > >>> --- a/plugins/curl/pool.c > >>> +++ b/plugins/curl/pool.c > >>> @@ -30,11 +30,29 @@ > >>> * SUCH DAMAGE. > >>> */ > >>> > >>> -/* Curl handle pool. > >>> +/* Worker thread which processes the curl multi interface. > >>> * > >>> - * To get a libcurl handle, call get_handle(). When you hold the > >>> - * handle, it is yours exclusively to use. After you have finished > >>> - * with the handle, put it back into the pool by calling put_handle(). > >>> + * The main nbdkit threads (see curl.c) create curl easy handles > >>> + * initialized with the work they want to carry out. Note there is > >>> + * one easy handle per task (eg. per pread/pwrite request). The easy > >>> + * handles are not reused. > >>> + * > >>> + * The commands + optional easy handle are submitted to the worker > >>> + * thread over a self-pipe (it's easy to use a pipe here because the > >>> + * way curl multi works is it can listen on an extra fd, but not on > >>> + * anything else like a pthread condition). The curl multi performs > >>> + * the work of the outstanding easy handles. > >>> + * > >>> + * When an easy handle finishes work or errors, we retire the command > >>> + * by signalling back to the waiting nbdkit thread using a pthread > >>> + * condition. > >>> + * > >>> + * In my experiments, we're almost always I/O bound so I haven't seen > >>> + * any strong need to use more than one curl multi / worker thread, > >>> + * although it would be possible to add more in future. > >>> + * > >>> + * See also this extremely useful thread: > >>> + * https://curl.se/mail/lib-2019-03/0100.html > >> > >> Very useful comment (and link). > >> > >>> */ > >>> > >>> #include <config.h> > >>> @@ -45,9 +63,19 @@ > >>> #include <stdint.h> > >>> #include <inttypes.h> > >>> #include <string.h> > >>> +#include <unistd.h> > >>> #include <assert.h> > >>> #include <pthread.h> > >>> > >>> +#ifdef HAVE_STDATOMIC_H > >>> +#include <stdatomic.h> > >>> +#else > >>> +/* Some old platforms lack atomic types, but 32 bit ints are usually > >>> + * "atomic enough". > >>> + */ > >>> +#define _Atomic /**/ > >>> +#endif > >>> + > >>> #include <curl/curl.h> > >>> > >>> #include <nbdkit-plugin.h> > >>> @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0; > >>> > >>> unsigned connections = 4; > >>> > >>> -/* This lock protects access to the curl_handles vector below. */ > >>> -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; > >>> +/* Pipe used to notify background thread that a command is pending in > >>> + * the queue. A pointer to the 'struct command' is sent over the > >>> + * pipe. > >>> + */ > >>> +static int self_pipe[2] = { -1, -1 }; > >>> > >>> -/* List of curl handles. This is allocated dynamically as more > >>> - * handles are requested. Currently it does not shrink. It may grow > >>> - * up to 'connections' in length. > >>> +/* The curl multi handle. */ > >>> +static CURLM *multi; > >>> + > >>> +/* List of running easy handles. We only need to maintain this so we > >>> + * can remove them from the multi handle when cleaning up. > >>> */ > >>> DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *); > >>> static curl_handle_list curl_handles = empty_vector; > >>> > >>> -/* The condition is used when the curl handles vector is full and > >>> - * we're waiting for a thread to put_handle. > >>> - */ > >>> -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; > >>> -static size_t in_use = 0, waiting = 0; > >>> +static const char * > >>> +command_type_to_string (enum command_type type) > >>> +{ > >>> + switch (type) { > >>> + case EASY_HANDLE: return "EASY_HANDLE"; > >>> + case STOP: return "STOP"; > >>> + default: abort (); > >>> + } > >>> +} > >>> > >>> int > >>> pool_get_ready (void) > >>> { > >>> + multi = curl_multi_init (); > >>> + if (multi == NULL) { > >>> + nbdkit_error ("curl_multi_init failed: %m"); > >>> + return -1; > >>> + } > >>> + > >>> return 0; > >>> } > >>> > >>> +/* Start and stop the background thread. */ > >>> +static pthread_t thread; > >>> +static bool thread_running; > >>> +static void *pool_worker (void *); > >>> + > >>> int > >>> pool_after_fork (void) > >>> { > >>> + int err; > >>> + > >>> + if (pipe (self_pipe) == -1) { > >>> + nbdkit_error ("pipe: %m"); > >>> + return -1; > >>> + } > >>> + > >>> + /* Start the pool background thread where all the curl work is done. */ > >>> + err = pthread_create (&thread, NULL, pool_worker, NULL); > >>> + if (err != 0) { > >>> + errno = err; > >>> + nbdkit_error ("pthread_create: %m"); > >>> + return -1; > >>> + } > >>> + thread_running = true; > >>> + > >>> return 0; > >>> } > >>> > >>> -/* Close and free all handles in the pool. */ > >>> +/* Unload the background thread. */ > >>> void > >>> pool_unload (void) > >>> { > >>> - size_t i; > >>> + if (thread_running) { > >>> + /* Stop the background thread. */ > >>> + struct command cmd = { .type = STOP }; > >>> + send_command_and_wait (&cmd); > >>> + pthread_join (thread, NULL); > >>> + thread_running = false; > >>> + } > >>> > >>> - if (curl_debug_pool) > >>> - nbdkit_debug ("unload_pool: number of curl handles allocated: %zu", > >>> - curl_handles.len); > >>> + if (self_pipe[0] >= 0) { > >>> + close (self_pipe[0]); > >>> + self_pipe[0] = -1; > >>> + } > >>> + if (self_pipe[1] >= 0) { > >>> + close (self_pipe[1]); > >>> + self_pipe[1] = -1; > >>> + } > >>> > >>> - for (i = 0; i < curl_handles.len; ++i) > >>> - free_handle (curl_handles.ptr[i]); > >>> - curl_handle_list_reset (&curl_handles); > >>> + if (multi) { > >>> + size_t i; > >>> + > >>> + /* Remove and free any easy handles in the multi. */ > >>> + for (i = 0; i < curl_handles.len; ++i) { > >>> + curl_multi_remove_handle (multi, curl_handles.ptr[i]->c); > >>> + free_handle (curl_handles.ptr[i]); > >>> + } > >>> + > >>> + curl_multi_cleanup (multi); > >>> + multi = NULL; > >>> + } > >>> } > >>> > >>> -/* Get a handle from the pool. > >>> - * > >>> - * It is owned exclusively by the caller until they call put_handle. > >>> +/* Command queue. */ > >>> +static _Atomic uint64_t id; /* next command ID */ > >>> + > >>> +/* Send command to the background thread and wait for completion. > >>> + * This is only called by one of the nbdkit threads. > >>> */ > >>> -struct curl_handle * > >>> -get_handle (void) > >>> +CURLcode > >>> +send_command_and_wait (struct command *cmd) > >>> { > >>> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); > >>> - size_t i; > >>> - struct curl_handle *ch; > >>> - > >>> - again: > >>> - /* Look for a handle which is not in_use. */ > >>> - for (i = 0; i < curl_handles.len; ++i) { > >>> - ch = curl_handles.ptr[i]; > >>> - if (!ch->in_use) { > >>> - ch->in_use = true; > >>> - in_use++; > >>> + cmd->id = id++; > >>> + > >>> + /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to > >>> + * indicate that the command has not yet been completed and status > >>> + * set. > >>> + */ > >>> + cmd->status = -1; > >>> + > >>> + /* This will be used to signal command completion back to us. */ > >>> + pthread_mutex_init (&cmd->mutex, NULL); > >>> + pthread_cond_init (&cmd->cond, NULL); > >>> + > >>> + /* Send the command to the background thread. */ > >>> + if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd) > >>> + abort (); > >>> + > >>> + /* Wait for the command to be completed by the background thread. */ > >>> + { > >>> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); > >>> + while (cmd->status == -1) /* for -1, see above */ > >>> + pthread_cond_wait (&cmd->cond, &cmd->mutex); > >>> + } > >>> + > >>> + pthread_mutex_destroy (&cmd->mutex); > >>> + pthread_cond_destroy (&cmd->cond); > >>> + > >>> + /* Note the main thread must call nbdkit_error on error! */ > >>> + return cmd->status; > >>> +} > >>> + > >>> +/* The background thread. */ > >>> +static void check_for_finished_handles (void); > >>> +static void retire_command (struct command *cmd, CURLcode code); > >>> +static void do_easy_handle (struct command *cmd); > >>> + > >>> +static void * > >>> +pool_worker (void *vp) > >>> +{ > >>> + bool stop = false; > >>> + > >>> + if (curl_debug_pool) > >>> + nbdkit_debug ("curl: background thread started"); > >>> + > >>> + while (!stop) { > >>> + struct command *cmd = NULL; > >>> + struct curl_waitfd extra_fds[1] > >>> + { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } }; > >>> + CURLMcode mc; > >>> + int numfds, running_handles, repeats = 0; > >>> + > >>> + do { > >>> + /* Process the multi handle. */ > >>> + mc = curl_multi_perform (multi, &running_handles); > >>> + if (mc != CURLM_OK) { > >>> + nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc)); > >> > >> Since nbdkit_error() stores its string in thread-local storage, is > >> there anything that ever extracts this error over to the nbdkit thread > >> that issued the original request to the worker thread?... > >> > >>> + abort (); /* XXX We don't expect this to happen */ > >> > >> ...Then again, if we abort, it doesn't matter. > > > > I was unclear what to do here. The final code does: > > > > while (!stop) { > > ... > > cmd = process_multi_handle (); > > if (cmd == NULL) > > continue; /* or die?? */ > > > > with process_multi_handle still calling nbdkit_error. I felt it might > > be better to keep trying than just abort, as presumably some (most?) > > errors are transient. > > > > nbdkit_error will ensure the error message is written out. > > > >>> + } > >> > >>> + > >>> + check_for_finished_handles (); > >>> + > >>> + mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds); > >>> + if (mc != CURLM_OK) { > >>> + nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc)); > >>> + abort (); /* XXX We don't expect this to happen */ > >>> + } > >>> + > >>> if (curl_debug_pool) > >>> - nbdkit_debug ("get_handle: %zu", ch->i); > >>> - return ch; > >>> - } > >>> - } > >>> + nbdkit_debug ("curl_multi_wait returned: running_handles=%d numfds=%d", > >>> + running_handles, numfds); > >>> + > >>> + if (numfds == 0) { > >>> + repeats++; > >>> + if (repeats > 1) > >>> + nbdkit_nanosleep (1, 0); > >>> + } > >>> + else { > >>> + repeats = 0; > >>> + if (extra_fds[0].revents == CURL_WAIT_POLLIN) { > >>> + /* There's a command waiting. */ > >>> + if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd) > >>> + abort (); > >>> + } > >>> + } > >>> + } while (!cmd); > >>> > >>> - /* If more connections are allowed, then allocate a new handle. */ > >>> - if (curl_handles.len < connections) { > >>> - ch = allocate_handle (); > >>> - if (ch == NULL) > >>> - return NULL; > >>> - if (curl_handle_list_append (&curl_handles, ch) == -1) { > >>> - free_handle (ch); > >>> - return NULL; > >>> - } > >>> - ch->i = curl_handles.len - 1; > >>> - ch->in_use = true; > >>> - in_use++; > >>> if (curl_debug_pool) > >>> - nbdkit_debug ("get_handle: %zu", ch->i); > >>> - return ch; > >>> - } > >>> + nbdkit_debug ("curl: dispatching %s command %" PRIu64, > >>> + command_type_to_string (cmd->type), cmd->id); > >>> + > >>> + switch (cmd->type) { > >>> + case STOP: > >>> + stop = true; > >>> + retire_command (cmd, CURLE_OK); > >>> + break; > >>> > >>> - /* Otherwise we have run out of connections so we must wait until > >>> - * another thread calls put_handle. > >>> - */ > >>> - assert (in_use == connections); > >>> - waiting++; > >>> - while (in_use == connections) > >>> - pthread_cond_wait (&cond, &lock); > >>> - waiting--; > >>> + case EASY_HANDLE: > >>> + do_easy_handle (cmd); > >>> + break; > >>> + } > >>> + } /* while (!stop) */ > >>> > >>> - goto again; > >>> + if (curl_debug_pool) > >>> + nbdkit_debug ("curl: background thread stopped"); > >>> + > >>> + return NULL; > >>> } > >>> > >>> -/* Return the handle to the pool. */ > >>> -void > >>> -put_handle (struct curl_handle *ch) > >>> +/* This checks if any easy handles in the multi have > >>> + * finished and retires the associated commands. > >>> + */ > >>> +static void > >>> +check_for_finished_handles (void) > >>> { > >>> - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); > >>> + CURLMsg *msg; > >>> + int msgs_in_queue; > >>> + > >>> + while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) { > >>> + size_t i; > >>> + struct curl_handle *ch = NULL; > >>> + > >>> + if (msg->msg == CURLMSG_DONE) { > >>> + /* Find this curl_handle. */ > >>> + for (i = 0; i < curl_handles.len; ++i) { > >>> + if (curl_handles.ptr[i]->c == msg->easy_handle) { > >>> + ch = curl_handles.ptr[i]; > >>> + curl_handle_list_remove (&curl_handles, i); > >>> + } > >>> + } > >>> + if (ch == NULL) abort (); > >>> + curl_multi_remove_handle (multi, ch->c); > >>> + > >>> + retire_command (ch->cmd, msg->data.result); > >>> + } > >>> + } > >>> +} > >>> > >>> +/* Retire a command. status is a CURLcode. */ > >>> +static void > >>> +retire_command (struct command *cmd, CURLcode status) > >>> +{ > >>> if (curl_debug_pool) > >>> - nbdkit_debug ("put_handle: %zu", ch->i); > >>> + nbdkit_debug ("curl: retiring %s command %" PRIu64, > >>> + command_type_to_string (cmd->type), cmd->id); > >>> + > >>> + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); > >>> + cmd->status = status; > >>> + pthread_cond_signal (&cmd->cond); > >>> +} > >>> + > >>> +static void > >>> +do_easy_handle (struct command *cmd) > >>> +{ > >>> + CURLMcode mc; > >>> + > >>> + cmd->ch->cmd = cmd; > >>> + > >>> + /* Add the handle to the multi. */ > >>> + mc = curl_multi_add_handle (multi, cmd->ch->c); > >>> + if (mc != CURLM_OK) { > >>> + nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc)); > >>> + goto err; > >>> + } > >>> > >>> - ch->in_use = false; > >>> - in_use--; > >>> + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1) > >>> + goto err; > >>> + return; > >>> > >>> - /* Signal the next thread which is waiting. */ > >>> - if (waiting > 0) > >>> - pthread_cond_signal (&cond); > >>> + err: > >>> + retire_command (cmd, CURLE_OUT_OF_MEMORY); > >>> } > >>> -- > >>> 2.41.0 > >> > >> Overall looks nice, and I learned more about curl in the process. > > > > For me, too much :-( > > > > Thanks, > > > > Rich. > >-- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com Fedora Windows cross-compiler. Compile Windows programs, test, and build Windows installers. Over 100 libraries supported. http://fedoraproject.org/wiki/MinGW