Richard W.M. Jones
2023-Jul-28 11:09 UTC
[Libguestfs] [PATCH nbdkit 0/3] curl: Use curl multi interface
Sorry, patch 2 is a mess (and so am I after spending days researching and implementing the change). There's not an easy way to simplify the change that I can see, but I'll look at it again with fresh eyes later. However can't argue with the performance improvement which is spectacular. Rich.
Richard W.M. Jones
2023-Jul-28 11:09 UTC
[Libguestfs] [PATCH nbdkit 1/3] curl: Make times seconds field slightly wider
Updates: commit 68dddbeb584fb9385915846d259563f74338ffe8 --- plugins/curl/nbdkit-curl-plugin.pod | 14 +++++++------- plugins/curl/times.c | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/plugins/curl/nbdkit-curl-plugin.pod b/plugins/curl/nbdkit-curl-plugin.pod index 0fd688edd..0774adadc 100644 --- a/plugins/curl/nbdkit-curl-plugin.pod +++ b/plugins/curl/nbdkit-curl-plugin.pod @@ -551,13 +551,13 @@ information is printed in the debug output before nbdkit exits. The output will look like: nbdkit: debug: times (-D curl.times=1): - nbdkit: debug: name resolution : 0.128442 s - nbdkit: debug: connection : 4.945213 s - nbdkit: debug: SSL negotiation : 4.291362 s - nbdkit: debug: pretransfer : 0.104137 s - nbdkit: debug: first byte received : 56.115269 s - nbdkit: debug: data transfer : 222.633831 s - nbdkit: debug: redirection time : 0.000000 s + nbdkit: debug: name resolution : 0.128442 s + nbdkit: debug: connection : 4.945213 s + nbdkit: debug: SSL negotiation : 4.291362 s + nbdkit: debug: pretransfer : 0.104137 s + nbdkit: debug: first byte received : 56.115269 s + nbdkit: debug: data transfer : 222.633831 s + nbdkit: debug: redirection time : 0.000000 s The cumulative time taken to perform each step is shown (summed across all HTTP connections). The redirection time is the total time taken diff --git a/plugins/curl/times.c b/plugins/curl/times.c index 8cc4cf278..e752a0a9e 100644 --- a/plugins/curl/times.c +++ b/plugins/curl/times.c @@ -123,7 +123,7 @@ display_times (void) v = t; prev_t = t; - nbdkit_debug ("%-30s: %3" PRIi64 ".%06" PRIi64 " s", + nbdkit_debug ("%-30s: %4" PRIi64 ".%06" PRIi64 " s", times[i].name, v / 1000000, v % 1000000); } -- 2.41.0
Richard W.M. Jones
2023-Jul-28 11:09 UTC
[Libguestfs] [PATCH nbdkit 2/3] curl: Use curl multi interface
This makes a very large difference to performance over the previous implementation. Note for the tests below I also applied the next commit changing the behaviour of the connections parameter. Using this test case: $ nbdkit -r -U - curl https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img \ ipresolve=v4 --run 'nbdcopy -p $uri null' The times are as follows: multi, connections=0 19s (*) multi, connections=32 26s multi, connections=16 50s before this commit 2m46s (*) Not documented, but setting CURLMOPT_MAX_TOTAL_CONNECTIONS = 0 means allow unlimited connections. In practice the number of connections reaches 64. --- plugins/curl/curldefs.h | 35 +++- plugins/curl/config.c | 66 +----- plugins/curl/curl.c | 146 +++++++++----- plugins/curl/pool.c | 431 ++++++++++++++++++++++++++++++++-------- 4 files changed, 482 insertions(+), 196 deletions(-) diff --git a/plugins/curl/curldefs.h b/plugins/curl/curldefs.h index 9169b256d..db384f055 100644 --- a/plugins/curl/curldefs.h +++ b/plugins/curl/curldefs.h @@ -76,12 +76,6 @@ struct curl_handle { /* The underlying curl handle. */ CURL *c; - /* Index of this handle in the pool (for debugging). */ - size_t i; - - /* True if the handle is in use by a thread. */ - bool in_use; - /* These fields are used/initialized when we create the handle. */ bool accept_range; int64_t exportsize; @@ -100,6 +94,26 @@ struct curl_handle { /* Used by scripts.c */ struct curl_slist *headers_copy; + + /* Used by pool.c */ + struct command *cmd; +}; + +/* Asynchronous commands that can be sent to the pool thread. */ +enum command_type { GET_SIZE, PREAD, PWRITE, STOP }; +struct command { + /* These fields are set by the caller. */ + enum command_type type; /* command */ + void *ptr; /* for get_size, returned size */ + struct curl_handle *ch; /* for read/write, the easy handle */ + + /* This field is set to a unique value by send_command_and_wait. */ + uint64_t id; /* serial number */ + + /* These fields are used to signal back that the command finished. */ + pthread_mutex_t mutex; /* completion mutex */ + pthread_cond_t cond; /* completion condition */ + enum { SUBMITTED, SUCCEEDED, FAILED } status; }; /* config.c */ @@ -109,12 +123,13 @@ extern const char *curl_config_help; extern void unload_config (void); extern struct curl_handle *allocate_handle (void); extern void free_handle (struct curl_handle *); +extern int get_content_length_accept_range (struct curl_handle *ch); /* pool.c */ -extern void load_pool (void); -extern void unload_pool (void); -extern struct curl_handle *get_handle (void); -extern void put_handle (struct curl_handle *ch); +extern int curl_get_ready (void); +extern int curl_after_fork (void); +extern void curl_cleanup (void); +extern int send_command_and_wait (struct command *cmd); /* scripts.c */ extern int do_scripts (struct curl_handle *ch); diff --git a/plugins/curl/config.c b/plugins/curl/config.c index 742d60809..5cda46031 100644 --- a/plugins/curl/config.c +++ b/plugins/curl/config.c @@ -89,9 +89,6 @@ static const char *user_agent = NULL; static int debug_cb (CURL *handle, curl_infotype type, const char *data, size_t size, void *); -static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque); -static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque); -static int get_content_length_accept_range (struct curl_handle *ch); static bool try_fallback_GET_method (struct curl_handle *ch); static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque); static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque); @@ -668,17 +665,9 @@ allocate_handle (void) if (user_agent) curl_easy_setopt (ch->c, CURLOPT_USERAGENT, user_agent); - if (get_content_length_accept_range (ch) == -1) - goto err; - /* Get set up for reading and writing. */ curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, NULL); curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, NULL); - curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb); - curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); - /* These are only used if !readonly but we always register them. */ - curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb); - curl_easy_setopt (ch->c, CURLOPT_READDATA, ch); return ch; @@ -743,59 +732,10 @@ debug_cb (CURL *handle, curl_infotype type, return 0; } -/* NB: The terminology used by libcurl is confusing! - * - * WRITEFUNCTION / write_cb is used when reading from the remote server - * READFUNCTION / read_cb is used when writing to the remote server. - * - * We use the same terminology as libcurl here. - */ - -static size_t -write_cb (char *ptr, size_t size, size_t nmemb, void *opaque) -{ - struct curl_handle *ch = opaque; - size_t orig_realsize = size * nmemb; - size_t realsize = orig_realsize; - - assert (ch->write_buf); - - /* Don't read more than the requested amount of data, even if the - * server or libcurl sends more. - */ - if (realsize > ch->write_count) - realsize = ch->write_count; - - memcpy (ch->write_buf, ptr, realsize); - - ch->write_count -= realsize; - ch->write_buf += realsize; - - return orig_realsize; -} - -static size_t -read_cb (void *ptr, size_t size, size_t nmemb, void *opaque) -{ - struct curl_handle *ch = opaque; - size_t realsize = size * nmemb; - - assert (ch->read_buf); - if (realsize > ch->read_count) - realsize = ch->read_count; - - memcpy (ptr, ch->read_buf, realsize); - - ch->read_count -= realsize; - ch->read_buf += realsize; - - return realsize; -} - /* Get the file size and also whether the remote HTTP server * supports byte ranges. */ -static int +int get_content_length_accept_range (struct curl_handle *ch) { CURLcode r; @@ -821,6 +761,10 @@ get_content_length_accept_range (struct curl_handle *ch) curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L); curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL); + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL); + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL); + curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL); r = curl_easy_perform (ch->c); update_times (ch->c); if (r != CURLE_OK) { diff --git a/plugins/curl/curl.c b/plugins/curl/curl.c index 99a7e00b5..c29b5d0cb 100644 --- a/plugins/curl/curl.c +++ b/plugins/curl/curl.c @@ -48,8 +48,6 @@ #include <nbdkit-plugin.h> -#include "cleanup.h" - #include "curldefs.h" const char *cookie_script = NULL; @@ -67,8 +65,6 @@ curl_load (void) nbdkit_error ("libcurl initialization failed: %d", (int) r); exit (EXIT_FAILURE); } - - load_pool (); } static void @@ -76,7 +72,6 @@ curl_unload (void) { unload_config (); scripts_unload (); - unload_pool (); display_times (); curl_global_cleanup (); } @@ -108,30 +103,17 @@ curl_close (void *handle) #define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL -/* Calls get_handle() ... put_handle() to get a handle for the length - * of the current scope. - */ -#define GET_HANDLE_FOR_CURRENT_SCOPE(ch) \ - CLEANUP_PUT_HANDLE struct curl_handle *ch = get_handle (); -#define CLEANUP_PUT_HANDLE __attribute__ ((cleanup (cleanup_put_handle))) -static void -cleanup_put_handle (void *chp) -{ - struct curl_handle *ch = * (struct curl_handle **) chp; - - if (ch != NULL) - put_handle (ch); -} - /* Get the file size. */ static int64_t curl_get_size (void *handle) { - GET_HANDLE_FOR_CURRENT_SCOPE (ch); - if (ch == NULL) + int64_t size; + struct command cmd = { .type = GET_SIZE, .ptr = &size }; + + if (send_command_and_wait (&cmd) == -1) return -1; - return ch->exportsize; + return size; } /* Multi-conn is safe for read-only connections, but HTTP does not @@ -146,23 +128,56 @@ curl_can_multi_conn (void *handle) return !! h->readonly; } +/* NB: The terminology used by libcurl is confusing! + * + * WRITEFUNCTION / write_cb is used when reading from the remote server + * READFUNCTION / read_cb is used when writing to the remote server. + * + * We use the same terminology as libcurl here. + */ + +static size_t +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque) +{ + struct curl_handle *ch = opaque; + size_t orig_realsize = size * nmemb; + size_t realsize = orig_realsize; + + assert (ch->write_buf); + + /* Don't read more than the requested amount of data, even if the + * server or libcurl sends more. + */ + if (realsize > ch->write_count) + realsize = ch->write_count; + + memcpy (ch->write_buf, ptr, realsize); + + ch->write_count -= realsize; + ch->write_buf += realsize; + + return orig_realsize; +} + /* Read data from the remote server. */ static int curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) { - CURLcode r; + struct curl_handle *ch; char range[128]; - GET_HANDLE_FOR_CURRENT_SCOPE (ch); - if (ch == NULL) - return -1; + /* Get a curl easy handle. */ + ch = allocate_handle (); + if (ch == NULL) goto err; /* Run the scripts if necessary and set headers in the handle. */ - if (do_scripts (ch) == -1) return -1; + if (do_scripts (ch) == -1) goto err; /* Tell the write_cb where we want the data to be written. write_cb * will update this if the data comes in multiple sections. */ + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb); + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); ch->write_buf = buf; ch->write_count = count; @@ -173,12 +188,14 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) offset, offset + count); curl_easy_setopt (ch->c, CURLOPT_RANGE, range); - /* The assumption here is that curl will look after timeouts. */ - r = curl_easy_perform (ch->c); - if (r != CURLE_OK) { - display_curl_error (ch, r, "pread: curl_easy_perform"); - return -1; - } + /* Send the command to the worker thread and wait. */ + struct command cmd = { + .type = PREAD, + .ch = ch, + }; + + if (send_command_and_wait (&cmd) == -1) + goto err; update_times (ch->c); /* Could use curl_easy_getinfo here to obtain further information @@ -188,42 +205,70 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) /* As far as I understand the cURL API, this should never happen. */ assert (ch->write_count == 0); + free_handle (ch); return 0; + + err: + if (ch) + free_handle (ch); + return -1; +} + +static size_t +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque) +{ + struct curl_handle *ch = opaque; + size_t realsize = size * nmemb; + + assert (ch->read_buf); + if (realsize > ch->read_count) + realsize = ch->read_count; + + memcpy (ptr, ch->read_buf, realsize); + + ch->read_count -= realsize; + ch->read_buf += realsize; + + return realsize; } /* Write data to the remote server. */ static int curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) { - CURLcode r; + struct curl_handle *ch; char range[128]; - GET_HANDLE_FOR_CURRENT_SCOPE (ch); - if (ch == NULL) - return -1; + /* Get a curl easy handle. */ + ch = allocate_handle (); + if (ch == NULL) goto err; /* Run the scripts if necessary and set headers in the handle. */ - if (do_scripts (ch) == -1) return -1; + if (do_scripts (ch) == -1) goto err; /* Tell the read_cb where we want the data to be read from. read_cb * will update this if the data comes in multiple sections. */ + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb); + curl_easy_setopt (ch->c, CURLOPT_READDATA, ch); ch->read_buf = buf; ch->read_count = count; - curl_easy_setopt (ch->c, CURLOPT_UPLOAD, 1L); + curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L); /* Make an HTTP range request. */ snprintf (range, sizeof range, "%" PRIu64 "-%" PRIu64, offset, offset + count); curl_easy_setopt (ch->c, CURLOPT_RANGE, range); - /* The assumption here is that curl will look after timeouts. */ - r = curl_easy_perform (ch->c); - if (r != CURLE_OK) { - display_curl_error (ch, r, "pwrite: curl_easy_perform"); - return -1; - } + /* Send the command to the worker thread and wait. */ + struct command cmd = { + .type = PWRITE, + .ch = ch, + }; + + if (send_command_and_wait (&cmd) == -1) + goto err; update_times (ch->c); /* Could use curl_easy_getinfo here to obtain further information @@ -233,7 +278,13 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) /* As far as I understand the cURL API, this should never happen. */ assert (ch->read_count == 0); + free_handle (ch); return 0; + + err: + if (ch) + free_handle (ch); + return -1; } static struct nbdkit_plugin plugin = { @@ -249,6 +300,9 @@ static struct nbdkit_plugin plugin = { */ //.config_help = curl_config_help, .magic_config_key = "url", + .get_ready = curl_get_ready, + .after_fork = curl_after_fork, + .cleanup = curl_cleanup, .open = curl_open, .close = curl_close, .get_size = curl_get_size, diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c index 91e56f070..2835f1664 100644 --- a/plugins/curl/pool.c +++ b/plugins/curl/pool.c @@ -30,11 +30,29 @@ * SUCH DAMAGE. */ -/* Curl handle pool. +/* Worker thread which processes the curl multi interface. * - * To get a libcurl handle, call get_handle(). When you hold the - * handle, it is yours exclusively to use. After you have finished - * with the handle, put it back into the pool by calling put_handle(). + * The main nbdkit threads (see curl.c) create curl easy handles + * initialized with the work they want to carry out. Note there is + * one easy handle per task (eg. per pread/pwrite request). The easy + * handles are not reused. + * + * The commands + optional easy handle are submitted to the worker + * thread over a self-pipe (it's easy to use a pipe here because the + * way curl multi works is it can listen on an extra fd, but not on + * anything else like a pthread condition). The curl multi performs + * the work of the outstanding easy handles. + * + * When an easy handle finishes work or errors, we retire the command + * by signalling back to the waiting nbdkit thread using a pthread + * condition. + * + * In my experiments, we're almost always I/O bound so I haven't seen + * any strong need to use more than one curl multi / worker thread, + * although it would be possible to add more in future. + * + * See also this extremely useful thread: + * https://curl.se/mail/lib-2019-03/0100.html */ #include <config.h> @@ -45,9 +63,19 @@ #include <stdint.h> #include <inttypes.h> #include <string.h> +#include <unistd.h> #include <assert.h> #include <pthread.h> +#ifdef HAVE_STDATOMIC_H +#include <stdatomic.h> +#else +/* Some old platforms lack atomic types, but 32 bit ints are usually + * "atomic enough". + */ +#define _Atomic /**/ +#endif + #include <curl/curl.h> #include <nbdkit-plugin.h> @@ -62,109 +90,354 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0; unsigned connections = 4; -/* This lock protects access to the curl_handles vector below. */ -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; +/* Pipe used to notify background thread that a command is pending in + * the queue. A pointer to the 'struct command' is sent over the + * pipe. + */ +static int self_pipe[2] = { -1, -1 }; -/* List of curl handles. This is allocated dynamically as more - * handles are requested. Currently it does not shrink. It may grow - * up to 'connections' in length. +/* The curl multi handle. */ +static CURLM *multi; + +/* List of running easy handles. We only need to maintain this so we + * can remove them from the multi handle when cleaning up. */ DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *); static curl_handle_list curl_handles = empty_vector; -/* The condition is used when the curl handles vector is full and - * we're waiting for a thread to put_handle. - */ -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; -static size_t in_use = 0, waiting = 0; +static const char * +command_type_to_string (enum command_type type) +{ + switch (type) { + case GET_SIZE: return "GET_SIZE"; + case PREAD: return "PREAD"; + case PWRITE: return "PWRITE"; + case STOP: return "STOP"; + default: abort (); + } +} -/* Initialize pool structures. */ -void -load_pool (void) +int +curl_get_ready (void) +{ + multi = curl_multi_init (); + if (multi == NULL) { + nbdkit_error ("curl_multi_init failed: %m"); + return -1; + } + + return 0; +} + +/* Start and stop the background thread. */ +static pthread_t thread; +static bool thread_running; +static void *pool_worker (void *); + +int +curl_after_fork (void) { + int err; + + if (pipe (self_pipe) == -1) { + nbdkit_error ("pipe: %m"); + return -1; + } + + /* Start the pool background thread where all the curl work is done. */ + err = pthread_create (&thread, NULL, pool_worker, NULL); + if (err != 0) { + errno = err; + nbdkit_error ("pthread_create: %m"); + return -1; + } + thread_running = true; + + return 0; } -/* Close and free all handles in the pool. */ +/* Unload the background thread. */ void -unload_pool (void) +curl_cleanup (void) { - size_t i; + if (thread_running) { + /* Stop the background thread. */ + struct command cmd = { .type = STOP }; + send_command_and_wait (&cmd); + pthread_join (thread, NULL); + thread_running = false; + } - if (curl_debug_pool) - nbdkit_debug ("unload_pool: number of curl handles allocated: %zu", - curl_handles.len); + if (self_pipe[0] >= 0) { + close (self_pipe[0]); + self_pipe[0] = -1; + } + if (self_pipe[1] >= 0) { + close (self_pipe[1]); + self_pipe[1] = -1; + } - for (i = 0; i < curl_handles.len; ++i) - free_handle (curl_handles.ptr[i]); - curl_handle_list_reset (&curl_handles); + if (multi) { + size_t i; + + /* Remove and free any easy handles in the multi. */ + for (i = 0; i < curl_handles.len; ++i) { + curl_multi_remove_handle (multi, curl_handles.ptr[i]->c); + free_handle (curl_handles.ptr[i]); + } + + curl_multi_cleanup (multi); + multi = NULL; + } } -/* Get a handle from the pool. +/* Command queue. */ +static _Atomic uint64_t id; /* next command ID */ + +/* Send command to the background thread and wait for completion. + * This is only called by one of the nbdkit threads. * - * It is owned exclusively by the caller until they call put_handle. + * Returns 0 for OK + * On error, calls nbdkit_error and returns -1. */ -struct curl_handle * -get_handle (void) +int +send_command_and_wait (struct command *cmd) { - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); - size_t i; - struct curl_handle *ch; - - again: - /* Look for a handle which is not in_use. */ - for (i = 0; i < curl_handles.len; ++i) { - ch = curl_handles.ptr[i]; - if (!ch->in_use) { - ch->in_use = true; - in_use++; + cmd->id = id++; + + /* This will be used to signal command completion back to us. */ + pthread_mutex_init (&cmd->mutex, NULL); + pthread_cond_init (&cmd->cond, NULL); + + /* Send the command to the background thread. */ + if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd) + abort (); + + /* Wait for the command to be completed by the background thread. */ + { + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); + while (cmd->status == SUBMITTED) + pthread_cond_wait (&cmd->cond, &cmd->mutex); + } + + pthread_mutex_destroy (&cmd->mutex); + pthread_cond_destroy (&cmd->cond); + + /* On error the background thread will call nbdkit_error. */ + switch (cmd->status) { + case SUCCEEDED: return 0; + case FAILED: return -1; + default: abort (); + } +} + +/* The background thread. */ +static void check_for_finished_handles (void); +static void retire_command (struct command *cmd, int status); +static void do_get_size (struct command *cmd); +static void do_pread (struct command *cmd); +static void do_pwrite (struct command *cmd); + +static void * +pool_worker (void *vp) +{ + bool stop = false; + + if (curl_debug_pool) + nbdkit_debug ("curl: background thread started"); + + while (!stop) { + struct command *cmd = NULL; + struct curl_waitfd extra_fds[1] + { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } }; + CURLMcode mc; + int numfds, running_handles, repeats = 0; + + do { + /* Process the multi handle. */ + mc = curl_multi_perform (multi, &running_handles); + if (mc != CURLM_OK) { + nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc)); + abort (); /* XXX We don't expect this to happen */ + } + + check_for_finished_handles (); + + mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds); + if (mc != CURLM_OK) { + nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc)); + abort (); /* XXX We don't expect this to happen */ + } + if (curl_debug_pool) - nbdkit_debug ("get_handle: %zu", ch->i); - return ch; - } - } + nbdkit_debug ("curl_multi_wait returned: running_handles=%d numfds=%d", + running_handles, numfds); + + if (numfds == 0) { + repeats++; + if (repeats > 1) + nbdkit_nanosleep (1, 0); + } + else { + repeats = 0; + if (extra_fds[0].revents == CURL_WAIT_POLLIN) { + /* There's a command waiting. */ + if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd) + abort (); + } + } + } while (!cmd); - /* If more connections are allowed, then allocate a new handle. */ - if (curl_handles.len < connections) { - ch = allocate_handle (); - if (ch == NULL) - return NULL; - if (curl_handle_list_append (&curl_handles, ch) == -1) { - free_handle (ch); - return NULL; - } - ch->i = curl_handles.len - 1; - ch->in_use = true; - in_use++; if (curl_debug_pool) - nbdkit_debug ("get_handle: %zu", ch->i); - return ch; - } + nbdkit_debug ("curl: dispatching %s command %" PRIu64, + command_type_to_string (cmd->type), cmd->id); + + switch (cmd->type) { + case STOP: + stop = true; + retire_command (cmd, SUCCEEDED); + break; + + case GET_SIZE: + do_get_size (cmd); + break; + + case PREAD: + do_pread (cmd); + break; + + case PWRITE: + do_pwrite (cmd); + break; + } + } /* while (!stop) */ - /* Otherwise we have run out of connections so we must wait until - * another thread calls put_handle. - */ - assert (in_use == connections); - waiting++; - while (in_use == connections) - pthread_cond_wait (&cond, &lock); - waiting--; + if (curl_debug_pool) + nbdkit_debug ("curl: background thread stopped"); - goto again; + return NULL; } -/* Return the handle to the pool. */ -void -put_handle (struct curl_handle *ch) +/* This checks if any easy handles in the multi have + * finished and retires the associated commands. + */ +static void +check_for_finished_handles (void) { - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); + CURLMsg *msg; + int msgs_in_queue; + + while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) { + size_t i; + struct curl_handle *ch = NULL; + + if (msg->msg == CURLMSG_DONE) { + /* Find this curl_handle. */ + for (i = 0; i < curl_handles.len; ++i) { + if (curl_handles.ptr[i]->c == msg->easy_handle) { + ch = curl_handles.ptr[i]; + curl_handle_list_remove (&curl_handles, i); + } + } + if (ch == NULL) abort (); + + if (msg->data.result != CURLE_OK) + display_curl_error (ch, msg->data.result, "curl"); + + curl_multi_remove_handle (multi, ch->c); + retire_command (ch->cmd, + msg->data.result == CURLE_OK ? SUCCEEDED : FAILED); + } + } +} + +/* Retire a command. status is SUCCEEDED | FAILED */ +static void +retire_command (struct command *cmd, int status) +{ if (curl_debug_pool) - nbdkit_debug ("put_handle: %zu", ch->i); + nbdkit_debug ("curl: retiring %s command %" PRIu64, + command_type_to_string (cmd->type), cmd->id); + + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); + cmd->status = status; + pthread_cond_signal (&cmd->cond); +} + +/* We handle get_size synchronously, the first time we are called. + * We assume the size never changes (you must restart nbdkit). + */ +static struct curl_handle *get_size_ch; + +static void +do_get_size (struct command *cmd) +{ + int64_t size; + + if (get_size_ch == NULL) { + get_size_ch = allocate_handle (); + if (get_size_ch == NULL) + goto err; + + if (get_content_length_accept_range (get_size_ch) == -1) { + free_handle (get_size_ch); + get_size_ch = NULL; + goto err; + } + } + + size = get_size_ch->exportsize; + if (size == -1) + goto err; + * (int64_t *) cmd->ptr = size; + retire_command (cmd, SUCCEEDED); + return; + +err: + retire_command (cmd, FAILED); +} + +static void +do_pread (struct command *cmd) +{ + CURLMcode mc; + + cmd->ch->cmd = cmd; + + /* Add the handle to the multi. */ + mc = curl_multi_add_handle (multi, cmd->ch->c); + if (mc != CURLM_OK) { + nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc)); + goto err; + } + + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1) + goto err; + return; + + err: + retire_command (cmd, FAILED); +} + +static void +do_pwrite (struct command *cmd) +{ + CURLMcode mc; + + cmd->ch->cmd = cmd; + + /* Add the handle to the multi. */ + mc = curl_multi_add_handle (multi, cmd->ch->c); + if (mc != CURLM_OK) { + nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc)); + goto err; + } - ch->in_use = false; - in_use--; + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1) + goto err; + return; - /* Signal the next thread which is waiting. */ - if (waiting > 0) - pthread_cond_signal (&cond); + err: + retire_command (cmd, FAILED); } -- 2.41.0
Richard W.M. Jones
2023-Jul-28 11:09 UTC
[Libguestfs] [PATCH nbdkit 3/3] curl: Redefine connections=<N> parameter as number of HTTP connections
Previously (nbdkit 1.34) this was the number of easy handles. However it turns out that easy handles can open multiple HTTP connections, and in fact there's no good way to tell how many (and they are not shared). Now that we are using a curl multi, curl >= 7.30 provides a way to limit the total number of actual HTTP connections, so we should just use it. This is closer to what I intended this parameter to mean. Link: https://curl.se/mail/lib-2019-03/0102.html Link: https://curl.se/mail/lib-2019-12/0044.html --- plugins/curl/nbdkit-curl-plugin.pod | 13 +++++++++---- plugins/curl/curldefs.h | 3 +++ plugins/curl/config.c | 2 +- plugins/curl/pool.c | 6 +++++- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/plugins/curl/nbdkit-curl-plugin.pod b/plugins/curl/nbdkit-curl-plugin.pod index 0774adadc..8223d6da6 100644 --- a/plugins/curl/nbdkit-curl-plugin.pod +++ b/plugins/curl/nbdkit-curl-plugin.pod @@ -58,10 +58,10 @@ L<CURLOPT_CAPATH(3)> for more information. (nbdkit E<ge> 1.34) -Open up to C<N> curl connections to the web server. The default is 4. -Curl connections are shared between all NBD clients, so you may wish -to increase this if you expect many simultaneous NBD clients (or a -single client using many multi-conn connections). +Open up to C<N> connections to the web server. The default is 16. +Connections are shared between all NBD clients, so you may wish to +increase this if you expect many simultaneous NBD clients (or a single +client using many multi-conn connections). See L</NBD CONNECTIONS AND CURL HANDLES> below. @@ -407,6 +407,11 @@ of curl handles in the pool with the C<connections> parameter (default 4). Note that if there are more than 4 NBD connections, they will share the 4 web server connections, unless you adjust C<connections>. +nbdkit E<ge> 1.36 changed this again to use a curl multi handle, which +is more efficient especially with HTTP/2 and HTTP/3. Now the +C<connections> parameter controls the maximum number of HTTP +connections made to the remote server, and the default is 16. + =head1 HEADER AND COOKIE SCRIPTS While the C<header> and C<cookie> parameters can be used to specify diff --git a/plugins/curl/curldefs.h b/plugins/curl/curldefs.h index db384f055..6a97f374e 100644 --- a/plugins/curl/curldefs.h +++ b/plugins/curl/curldefs.h @@ -41,6 +41,9 @@ * macro isn't present then Curl is very old. */ #ifdef CURL_AT_LEAST_VERSION +#if CURL_AT_LEAST_VERSION (7, 30, 0) +#define HAVE_CURLMOPT_MAX_TOTAL_CONNECTIONS +#endif #if CURL_AT_LEAST_VERSION (7, 55, 0) #define HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T #endif diff --git a/plugins/curl/config.c b/plugins/curl/config.c index 5cda46031..b5bc0532a 100644 --- a/plugins/curl/config.c +++ b/plugins/curl/config.c @@ -496,7 +496,7 @@ curl_config_complete (void) const char *curl_config_help "cainfo=<CAINFO> Path to Certificate Authority file.\n" "capath=<CAPATH> Path to directory with CA certificates.\n" - "connections=<N> Number of libcurl connections to use.\n" + "connections=<N> Number of HTTP connections to use.\n" "cookie=<COOKIE> Set HTTP/HTTPS cookies.\n" "cookiefile= Enable cookie processing.\n" "cookiefile=<FILENAME> Read cookies from file.\n" diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c index 2835f1664..a0aaf749b 100644 --- a/plugins/curl/pool.c +++ b/plugins/curl/pool.c @@ -88,7 +88,7 @@ /* Use '-D curl.pool=1' to debug handle pool. */ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0; -unsigned connections = 4; +unsigned connections = 16; /* Pipe used to notify background thread that a command is pending in * the queue. A pointer to the 'struct command' is sent over the @@ -126,6 +126,10 @@ curl_get_ready (void) return -1; } +#ifdef HAVE_CURLMOPT_MAX_TOTAL_CONNECTIONS + curl_multi_setopt(multi, CURLMOPT_MAX_TOTAL_CONNECTIONS, (long) connections); +#endif + return 0; } -- 2.41.0