Richard W.M. Jones
2023-Jul-28 17:17 UTC
[Libguestfs] [PATCH nbdkit v2 8/9] curl: Use curl multi interface
See the comment at the top of plugins/curl/pool.c for general
information about how this works.
This makes a very large difference to performance over the previous
implementation. Note for the tests below I also applied the next
commit changing the behaviour of the connections parameter.
Using this test case:
$
uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img
$ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri null'
The times are as follows:
multi, connections=64 21.5s
multi, connections=32 30.2s
multi, connections=16 56.0s
before this commit 166s
---
plugins/curl/curldefs.h | 35 ++--
plugins/curl/config.c | 246 ---------------------------
plugins/curl/curl.c | 366 +++++++++++++++++++++++++++++++++++-----
plugins/curl/pool.c | 346 ++++++++++++++++++++++++++++---------
4 files changed, 616 insertions(+), 377 deletions(-)
diff --git a/plugins/curl/curldefs.h b/plugins/curl/curldefs.h
index 3c54b7c3c..02b28a133 100644
--- a/plugins/curl/curldefs.h
+++ b/plugins/curl/curldefs.h
@@ -76,16 +76,6 @@ struct curl_handle {
/* The underlying curl handle. */
CURL *c;
- /* Index of this handle in the pool (for debugging). */
- size_t i;
-
- /* True if the handle is in use by a thread. */
- bool in_use;
-
- /* These fields are used/initialized when we create the handle. */
- bool accept_range;
- int64_t exportsize;
-
char errbuf[CURL_ERROR_SIZE];
/* Before doing a read or write operation, set these to point to the
@@ -98,8 +88,30 @@ struct curl_handle {
const char *read_buf;
uint32_t read_count;
+ /* This field is used by curl_get_size. */
+ bool accept_range;
+
/* Used by scripts.c */
struct curl_slist *headers_copy;
+
+ /* Used by pool.c */
+ struct command *cmd;
+};
+
+/* Asynchronous commands that can be sent to the pool thread. */
+enum command_type { EASY_HANDLE, STOP };
+struct command {
+ /* These fields are set by the caller. */
+ enum command_type type; /* command */
+ struct curl_handle *ch; /* for EASY_HANDLE, the easy handle */
+
+ /* This field is set to a unique value by send_command_and_wait. */
+ uint64_t id; /* serial number */
+
+ /* These fields are used to signal back that the command finished. */
+ pthread_mutex_t mutex; /* completion mutex */
+ pthread_cond_t cond; /* completion condition */
+ CURLcode status; /* status code (CURLE_OK = succeeded) */
};
/* config.c */
@@ -114,8 +126,7 @@ extern void free_handle (struct curl_handle *);
extern int pool_get_ready (void);
extern int pool_after_fork (void);
extern void pool_unload (void);
-extern struct curl_handle *get_handle (void);
-extern void put_handle (struct curl_handle *ch);
+extern CURLcode send_command_and_wait (struct command *cmd);
/* scripts.c */
extern int do_scripts (struct curl_handle *ch);
diff --git a/plugins/curl/config.c b/plugins/curl/config.c
index b6e02b85a..46bec2bbb 100644
--- a/plugins/curl/config.c
+++ b/plugins/curl/config.c
@@ -48,8 +48,6 @@
#include <nbdkit-plugin.h>
-#include "ascii-ctype.h"
-#include "ascii-string.h"
#include "cleanup.h"
#include "curldefs.h"
@@ -89,12 +87,6 @@ static const char *user_agent = NULL;
static int debug_cb (CURL *handle, curl_infotype type,
const char *data, size_t size, void *);
-static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
-static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
-static int get_content_length_accept_range (struct curl_handle *ch);
-static bool try_fallback_GET_method (struct curl_handle *ch);
-static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
-static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
/* Use '-D curl.verbose=1' to set. */
NBDKIT_DLL_PUBLIC int curl_debug_verbose = 0;
@@ -668,17 +660,9 @@ allocate_handle (void)
if (user_agent)
curl_easy_setopt (ch->c, CURLOPT_USERAGENT, user_agent);
- if (get_content_length_accept_range (ch) == -1)
- goto err;
-
/* Get set up for reading and writing. */
curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, NULL);
curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, NULL);
- curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
- curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
- /* These are only used if !readonly but we always register them. */
- curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
- curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
return ch;
@@ -742,233 +726,3 @@ debug_cb (CURL *handle, curl_infotype type,
out:
return 0;
}
-
-/* NB: The terminology used by libcurl is confusing!
- *
- * WRITEFUNCTION / write_cb is used when reading from the remote server
- * READFUNCTION / read_cb is used when writing to the remote server.
- *
- * We use the same terminology as libcurl here.
- */
-
-static size_t
-write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
-{
- struct curl_handle *ch = opaque;
- size_t orig_realsize = size * nmemb;
- size_t realsize = orig_realsize;
-
- assert (ch->write_buf);
-
- /* Don't read more than the requested amount of data, even if the
- * server or libcurl sends more.
- */
- if (realsize > ch->write_count)
- realsize = ch->write_count;
-
- memcpy (ch->write_buf, ptr, realsize);
-
- ch->write_count -= realsize;
- ch->write_buf += realsize;
-
- return orig_realsize;
-}
-
-static size_t
-read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
-{
- struct curl_handle *ch = opaque;
- size_t realsize = size * nmemb;
-
- assert (ch->read_buf);
- if (realsize > ch->read_count)
- realsize = ch->read_count;
-
- memcpy (ptr, ch->read_buf, realsize);
-
- ch->read_count -= realsize;
- ch->read_buf += realsize;
-
- return realsize;
-}
-
-/* Get the file size and also whether the remote HTTP server
- * supports byte ranges.
- */
-static int
-get_content_length_accept_range (struct curl_handle *ch)
-{
- CURLcode r;
- long code;
-#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
- curl_off_t o;
-#else
- double d;
-#endif
-
- /* We must run the scripts if necessary and set headers in the
- * handle.
- */
- if (do_scripts (ch) == -1)
- return -1;
-
- /* Set this flag in the handle to false. The callback should set it
- * to true if byte ranges are supported, which we check below.
- */
- ch->accept_range = false;
-
- /* No Body, not nobody! This forces a HEAD request. */
- curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
- curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
- curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
- r = curl_easy_perform (ch->c);
- update_times (ch->c);
- if (r != CURLE_OK) {
- display_curl_error (ch, r,
- "problem doing HEAD request to fetch size of URL
[%s]",
- url);
-
- /* Get the HTTP status code, if available. */
- r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
- if (r == CURLE_OK)
- nbdkit_debug ("HTTP status code: %ld", code);
- else
- code = -1;
-
- /* See comment on try_fallback_GET_method below. */
- if (code != 403 || !try_fallback_GET_method (ch))
- return -1;
- }
-
- /* Get the content length.
- *
- * Note there is some subtlety here: For web servers using chunked
- * encoding, either the Content-Length header will not be present,
- * or if present it should be ignored. (For such servers the only
- * way to find out the true length would be to read all of the
- * content, which we don't want to do).
- *
- * Curl itself resolves this for us. It will ignore the
- * Content-Length header if chunked encoding is used, returning the
- * length as -1 which we check below (see also
- * curl:lib/http.c:Curl_http_size).
- */
-#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
- r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o);
- if (r != CURLE_OK) {
- display_curl_error (ch, r,
- "could not get length of remote file [%s]",
url);
- return -1;
- }
-
- if (o == -1) {
- nbdkit_error ("could not get length of remote file [%s], "
- "is the URL correct?", url);
- return -1;
- }
-
- ch->exportsize = o;
-#else
- r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
- if (r != CURLE_OK) {
- display_curl_error (ch, r,
- "could not get length of remote file [%s]",
url);
- return -1;
- }
-
- if (d == -1) {
- nbdkit_error ("could not get length of remote file [%s], "
- "is the URL correct?", url);
- return -1;
- }
-
- ch->exportsize = d;
-#endif
- nbdkit_debug ("content length: %" PRIi64, ch->exportsize);
-
- /* If this is HTTP, check that byte ranges are supported. */
- if (ascii_strncasecmp (url, "http://", strlen
("http://")) == 0 ||
- ascii_strncasecmp (url, "https://", strlen
("https://")) == 0) {
- if (!ch->accept_range) {
- nbdkit_error ("server does not support 'range' (byte range)
requests");
- return -1;
- }
-
- nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
- }
-
- return 0;
-}
-
-/* S3 servers can return 403 Forbidden for HEAD but still respond
- * to GET, so we give it a second chance in that case.
- * https://github.com/kubevirt/containerized-data-importer/issues/2737
- *
- * This function issues a GET request with a writefunction that always
- * returns an error, thus effectively getting the headers but
- * abandoning the transfer as soon as possible after.
- */
-static bool
-try_fallback_GET_method (struct curl_handle *ch)
-{
- CURLcode r;
-
- nbdkit_debug ("attempting to fetch headers using GET method");
-
- curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
- curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
- curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
- curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
- curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
- r = curl_easy_perform (ch->c);
- update_times (ch->c);
-
- /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
- * (eg if the remote has zero length). Other errors might happen
- * but we ignore them since it is a fallback path.
- */
- return r == CURLE_OK || r == CURLE_WRITE_ERROR;
-}
-
-static size_t
-header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
-{
- struct curl_handle *ch = opaque;
- size_t realsize = size * nmemb;
- const char *header = ptr;
- const char *end = header + realsize;
- const char *accept_ranges = "accept-ranges:";
- const char *bytes = "bytes";
-
- if (realsize >= strlen (accept_ranges) &&
- ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) {
- const char *p = strchr (header, ':') + 1;
-
- /* Skip whitespace between the header name and value. */
- while (p < end && *p && ascii_isspace (*p))
- p++;
-
- if (end - p >= strlen (bytes)
- && strncmp (p, bytes, strlen (bytes)) == 0) {
- /* Check that there is nothing but whitespace after the value. */
- p += strlen (bytes);
- while (p < end && *p && ascii_isspace (*p))
- p++;
-
- if (p == end || !*p)
- ch->accept_range = true;
- }
- }
-
- return realsize;
-}
-
-static size_t
-error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
-{
-#ifdef CURL_WRITEFUNC_ERROR
- return CURL_WRITEFUNC_ERROR;
-#else
- return 0; /* in older curl, any size < requested will also be an error */
-#endif
-}
diff --git a/plugins/curl/curl.c b/plugins/curl/curl.c
index be42de36f..28cc7bbe6 100644
--- a/plugins/curl/curl.c
+++ b/plugins/curl/curl.c
@@ -48,7 +48,8 @@
#include <nbdkit-plugin.h>
-#include "cleanup.h"
+#include "ascii-ctype.h"
+#include "ascii-string.h"
#include "curldefs.h"
@@ -118,32 +119,6 @@ curl_close (void *handle)
#define THREAD_MODEL NBDKIT_THREAD_MODEL_PARALLEL
-/* Calls get_handle() ... put_handle() to get a handle for the length
- * of the current scope.
- */
-#define GET_HANDLE_FOR_CURRENT_SCOPE(ch) \
- CLEANUP_PUT_HANDLE struct curl_handle *ch = get_handle ();
-#define CLEANUP_PUT_HANDLE __attribute__ ((cleanup (cleanup_put_handle)))
-static void
-cleanup_put_handle (void *chp)
-{
- struct curl_handle *ch = * (struct curl_handle **) chp;
-
- if (ch != NULL)
- put_handle (ch);
-}
-
-/* Get the file size. */
-static int64_t
-curl_get_size (void *handle)
-{
- GET_HANDLE_FOR_CURRENT_SCOPE (ch);
- if (ch == NULL)
- return -1;
-
- return ch->exportsize;
-}
-
/* Multi-conn is safe for read-only connections, but HTTP does not
* have any concept of flushing so we cannot use it for read-write
* connections.
@@ -156,23 +131,253 @@ curl_can_multi_conn (void *handle)
return !! h->readonly;
}
+/* Get the file size. */
+static int get_content_length_accept_range (struct curl_handle *ch);
+static bool try_fallback_GET_method (struct curl_handle *ch);
+static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
+static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
+
+static int64_t
+curl_get_size (void *handle)
+{
+ struct curl_handle *ch;
+ CURLcode r;
+ long code;
+#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
+ curl_off_t o;
+#else
+ double d;
+#endif
+ int64_t exportsize;
+
+ /* Get a curl easy handle. */
+ ch = allocate_handle ();
+ if (ch == NULL) goto err;
+
+ /* Prepare to read the headers. */
+ if (get_content_length_accept_range (ch) == -1)
+ goto err;
+
+ /* Send the command to the worker thread and wait. */
+ struct command cmd = {
+ .type = EASY_HANDLE,
+ .ch = ch,
+ };
+
+ r = send_command_and_wait (&cmd);
+ update_times (ch->c);
+ if (r != CURLE_OK) {
+ display_curl_error (ch, r,
+ "problem doing HEAD request to fetch size of URL
[%s]",
+ url);
+
+ /* Get the HTTP status code, if available. */
+ r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code);
+ if (r == CURLE_OK)
+ nbdkit_debug ("HTTP status code: %ld", code);
+ else
+ code = -1;
+
+ /* See comment on try_fallback_GET_method below. */
+ if (code != 403 || !try_fallback_GET_method (ch))
+ goto err;
+ }
+
+ /* Get the content length.
+ *
+ * Note there is some subtlety here: For web servers using chunked
+ * encoding, either the Content-Length header will not be present,
+ * or if present it should be ignored. (For such servers the only
+ * way to find out the true length would be to read all of the
+ * content, which we don't want to do).
+ *
+ * Curl itself resolves this for us. It will ignore the
+ * Content-Length header if chunked encoding is used, returning the
+ * length as -1 which we check below (see also
+ * curl:lib/http.c:Curl_http_size).
+ */
+#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T
+ r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o);
+ if (r != CURLE_OK) {
+ display_curl_error (ch, r,
+ "could not get length of remote file [%s]",
url);
+ goto err;
+ }
+
+ if (o == -1) {
+ nbdkit_error ("could not get length of remote file [%s], "
+ "is the URL correct?", url);
+ goto err;
+ }
+
+ exportsize = o;
+#else
+ r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d);
+ if (r != CURLE_OK) {
+ display_curl_error (ch, r,
+ "could not get length of remote file [%s]",
url);
+ goto err;
+ }
+
+ if (d == -1) {
+ nbdkit_error ("could not get length of remote file [%s], "
+ "is the URL correct?", url);
+ goto err;
+ }
+
+ exportsize = d;
+#endif
+ nbdkit_debug ("content length: %" PRIi64, exportsize);
+
+ /* If this is HTTP, check that byte ranges are supported. */
+ if (ascii_strncasecmp (url, "http://", strlen
("http://")) == 0 ||
+ ascii_strncasecmp (url, "https://", strlen
("https://")) == 0) {
+ if (!ch->accept_range) {
+ nbdkit_error ("server does not support 'range' (byte range)
requests");
+ goto err;
+ }
+
+ nbdkit_debug ("accept range supported (for HTTP/HTTPS)");
+ }
+
+ free_handle (ch);
+ return exportsize;
+
+ err:
+ if (ch)
+ free_handle (ch);
+ return -1;
+}
+
+/* Get the file size and also whether the remote HTTP server
+ * supports byte ranges.
+ */
+static int
+get_content_length_accept_range (struct curl_handle *ch)
+{
+ /* We must run the scripts if necessary and set headers in the
+ * handle.
+ */
+ if (do_scripts (ch) == -1)
+ return -1;
+
+ /* Set this flag in the handle to false. The callback should set it
+ * to true if byte ranges are supported, which we check below.
+ */
+ ch->accept_range = false;
+
+ /* No Body, not nobody! This forces a HEAD request. */
+ curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L);
+ curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
+ curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
+ curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL);
+ curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL);
+ curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL);
+ curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL);
+ return 0;
+}
+
+/* S3 servers can return 403 Forbidden for HEAD but still respond
+ * to GET, so we give it a second chance in that case.
+ * https://github.com/kubevirt/containerized-data-importer/issues/2737
+ *
+ * This function issues a GET request with a writefunction that always
+ * returns an error, thus effectively getting the headers but
+ * abandoning the transfer as soon as possible after.
+ */
+static bool
+try_fallback_GET_method (struct curl_handle *ch)
+{
+ CURLcode r;
+
+ nbdkit_debug ("attempting to fetch headers using GET method");
+
+ curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L);
+ curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb);
+ curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch);
+ curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb);
+ curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
+
+ struct command cmd = {
+ .type = EASY_HANDLE,
+ .ch = ch,
+ };
+
+ r = send_command_and_wait (&cmd);
+ update_times (ch->c);
+
+ /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too
+ * (eg if the remote has zero length). Other errors might happen
+ * but we ignore them since it is a fallback path.
+ */
+ return r == CURLE_OK || r == CURLE_WRITE_ERROR;
+}
+
+static size_t
+header_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
+{
+ struct curl_handle *ch = opaque;
+ size_t realsize = size * nmemb;
+ const char *header = ptr;
+ const char *end = header + realsize;
+ const char *accept_ranges = "accept-ranges:";
+ const char *bytes = "bytes";
+
+ if (realsize >= strlen (accept_ranges) &&
+ ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) {
+ const char *p = strchr (header, ':') + 1;
+
+ /* Skip whitespace between the header name and value. */
+ while (p < end && *p && ascii_isspace (*p))
+ p++;
+
+ if (end - p >= strlen (bytes)
+ && strncmp (p, bytes, strlen (bytes)) == 0) {
+ /* Check that there is nothing but whitespace after the value. */
+ p += strlen (bytes);
+ while (p < end && *p && ascii_isspace (*p))
+ p++;
+
+ if (p == end || !*p)
+ ch->accept_range = true;
+ }
+ }
+
+ return realsize;
+}
+
+static size_t
+error_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
+{
+#ifdef CURL_WRITEFUNC_ERROR
+ return CURL_WRITEFUNC_ERROR;
+#else
+ return 0; /* in older curl, any size < requested will also be an error */
+#endif
+}
+
/* Read data from the remote server. */
+static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque);
+
static int
curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset)
{
CURLcode r;
+ struct curl_handle *ch;
char range[128];
- GET_HANDLE_FOR_CURRENT_SCOPE (ch);
- if (ch == NULL)
- return -1;
+ /* Get a curl easy handle. */
+ ch = allocate_handle ();
+ if (ch == NULL) goto err;
/* Run the scripts if necessary and set headers in the handle. */
- if (do_scripts (ch) == -1) return -1;
+ if (do_scripts (ch) == -1) goto err;
/* Tell the write_cb where we want the data to be written. write_cb
* will update this if the data comes in multiple sections.
*/
+ curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb);
+ curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch);
ch->write_buf = buf;
ch->write_count = count;
@@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count,
uint64_t offset)
offset, offset + count);
curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
- /* The assumption here is that curl will look after timeouts. */
- r = curl_easy_perform (ch->c);
+ /* Send the command to the worker thread and wait. */
+ struct command cmd = {
+ .type = EASY_HANDLE,
+ .ch = ch,
+ };
+
+ r = send_command_and_wait (&cmd);
if (r != CURLE_OK) {
- display_curl_error (ch, r, "pread: curl_easy_perform");
- return -1;
+ display_curl_error (ch, r, "pread");
+ goto err;
}
update_times (ch->c);
@@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count,
uint64_t offset)
/* As far as I understand the cURL API, this should never happen. */
assert (ch->write_count == 0);
+ free_handle (ch);
return 0;
+
+ err:
+ if (ch)
+ free_handle (ch);
+ return -1;
+}
+
+/* NB: The terminology used by libcurl is confusing!
+ *
+ * WRITEFUNCTION / write_cb is used when reading from the remote server
+ * READFUNCTION / read_cb is used when writing to the remote server.
+ *
+ * We use the same terminology as libcurl here.
+ */
+static size_t
+write_cb (char *ptr, size_t size, size_t nmemb, void *opaque)
+{
+ struct curl_handle *ch = opaque;
+ size_t orig_realsize = size * nmemb;
+ size_t realsize = orig_realsize;
+
+ assert (ch->write_buf);
+
+ /* Don't read more than the requested amount of data, even if the
+ * server or libcurl sends more.
+ */
+ if (realsize > ch->write_count)
+ realsize = ch->write_count;
+
+ memcpy (ch->write_buf, ptr, realsize);
+
+ ch->write_count -= realsize;
+ ch->write_buf += realsize;
+
+ return orig_realsize;
}
/* Write data to the remote server. */
+static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque);
+
static int
curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset)
{
CURLcode r;
+ struct curl_handle *ch;
char range[128];
- GET_HANDLE_FOR_CURRENT_SCOPE (ch);
- if (ch == NULL)
- return -1;
+ /* Get a curl easy handle. */
+ ch = allocate_handle ();
+ if (ch == NULL) goto err;
/* Run the scripts if necessary and set headers in the handle. */
- if (do_scripts (ch) == -1) return -1;
+ if (do_scripts (ch) == -1) goto err;
/* Tell the read_cb where we want the data to be read from. read_cb
* will update this if the data comes in multiple sections.
*/
+ curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb);
+ curl_easy_setopt (ch->c, CURLOPT_READDATA, ch);
ch->read_buf = buf;
ch->read_count = count;
@@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t
count, uint64_t offset)
offset, offset + count);
curl_easy_setopt (ch->c, CURLOPT_RANGE, range);
- /* The assumption here is that curl will look after timeouts. */
- r = curl_easy_perform (ch->c);
+ /* Send the command to the worker thread and wait. */
+ struct command cmd = {
+ .type = EASY_HANDLE,
+ .ch = ch,
+ };
+
+ r = send_command_and_wait (&cmd);
if (r != CURLE_OK) {
- display_curl_error (ch, r, "pwrite: curl_easy_perform");
- return -1;
+ display_curl_error (ch, r, "pwrite");
+ goto err;
}
update_times (ch->c);
@@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t count,
uint64_t offset)
/* As far as I understand the cURL API, this should never happen. */
assert (ch->read_count == 0);
+ free_handle (ch);
return 0;
+
+ err:
+ if (ch)
+ free_handle (ch);
+ return -1;
+}
+
+static size_t
+read_cb (void *ptr, size_t size, size_t nmemb, void *opaque)
+{
+ struct curl_handle *ch = opaque;
+ size_t realsize = size * nmemb;
+
+ assert (ch->read_buf);
+ if (realsize > ch->read_count)
+ realsize = ch->read_count;
+
+ memcpy (ptr, ch->read_buf, realsize);
+
+ ch->read_count -= realsize;
+ ch->read_buf += realsize;
+
+ return realsize;
}
static struct nbdkit_plugin plugin = {
diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c
index eb2d330e1..2974cda3f 100644
--- a/plugins/curl/pool.c
+++ b/plugins/curl/pool.c
@@ -30,11 +30,29 @@
* SUCH DAMAGE.
*/
-/* Curl handle pool.
+/* Worker thread which processes the curl multi interface.
*
- * To get a libcurl handle, call get_handle(). When you hold the
- * handle, it is yours exclusively to use. After you have finished
- * with the handle, put it back into the pool by calling put_handle().
+ * The main nbdkit threads (see curl.c) create curl easy handles
+ * initialized with the work they want to carry out. Note there is
+ * one easy handle per task (eg. per pread/pwrite request). The easy
+ * handles are not reused.
+ *
+ * The commands + optional easy handle are submitted to the worker
+ * thread over a self-pipe (it's easy to use a pipe here because the
+ * way curl multi works is it can listen on an extra fd, but not on
+ * anything else like a pthread condition). The curl multi performs
+ * the work of the outstanding easy handles.
+ *
+ * When an easy handle finishes work or errors, we retire the command
+ * by signalling back to the waiting nbdkit thread using a pthread
+ * condition.
+ *
+ * In my experiments, we're almost always I/O bound so I haven't seen
+ * any strong need to use more than one curl multi / worker thread,
+ * although it would be possible to add more in future.
+ *
+ * See also this extremely useful thread:
+ * https://curl.se/mail/lib-2019-03/0100.html
*/
#include <config.h>
@@ -45,9 +63,19 @@
#include <stdint.h>
#include <inttypes.h>
#include <string.h>
+#include <unistd.h>
#include <assert.h>
#include <pthread.h>
+#ifdef HAVE_STDATOMIC_H
+#include <stdatomic.h>
+#else
+/* Some old platforms lack atomic types, but 32 bit ints are usually
+ * "atomic enough".
+ */
+#define _Atomic /**/
+#endif
+
#include <curl/curl.h>
#include <nbdkit-plugin.h>
@@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0;
unsigned connections = 4;
-/* This lock protects access to the curl_handles vector below. */
-static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER;
+/* Pipe used to notify background thread that a command is pending in
+ * the queue. A pointer to the 'struct command' is sent over the
+ * pipe.
+ */
+static int self_pipe[2] = { -1, -1 };
-/* List of curl handles. This is allocated dynamically as more
- * handles are requested. Currently it does not shrink. It may grow
- * up to 'connections' in length.
+/* The curl multi handle. */
+static CURLM *multi;
+
+/* List of running easy handles. We only need to maintain this so we
+ * can remove them from the multi handle when cleaning up.
*/
DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *);
static curl_handle_list curl_handles = empty_vector;
-/* The condition is used when the curl handles vector is full and
- * we're waiting for a thread to put_handle.
- */
-static pthread_cond_t cond = PTHREAD_COND_INITIALIZER;
-static size_t in_use = 0, waiting = 0;
+static const char *
+command_type_to_string (enum command_type type)
+{
+ switch (type) {
+ case EASY_HANDLE: return "EASY_HANDLE";
+ case STOP: return "STOP";
+ default: abort ();
+ }
+}
int
pool_get_ready (void)
{
+ multi = curl_multi_init ();
+ if (multi == NULL) {
+ nbdkit_error ("curl_multi_init failed: %m");
+ return -1;
+ }
+
return 0;
}
+/* Start and stop the background thread. */
+static pthread_t thread;
+static bool thread_running;
+static void *pool_worker (void *);
+
int
pool_after_fork (void)
{
+ int err;
+
+ if (pipe (self_pipe) == -1) {
+ nbdkit_error ("pipe: %m");
+ return -1;
+ }
+
+ /* Start the pool background thread where all the curl work is done. */
+ err = pthread_create (&thread, NULL, pool_worker, NULL);
+ if (err != 0) {
+ errno = err;
+ nbdkit_error ("pthread_create: %m");
+ return -1;
+ }
+ thread_running = true;
+
return 0;
}
-/* Close and free all handles in the pool. */
+/* Unload the background thread. */
void
pool_unload (void)
{
- size_t i;
+ if (thread_running) {
+ /* Stop the background thread. */
+ struct command cmd = { .type = STOP };
+ send_command_and_wait (&cmd);
+ pthread_join (thread, NULL);
+ thread_running = false;
+ }
- if (curl_debug_pool)
- nbdkit_debug ("unload_pool: number of curl handles allocated:
%zu",
- curl_handles.len);
+ if (self_pipe[0] >= 0) {
+ close (self_pipe[0]);
+ self_pipe[0] = -1;
+ }
+ if (self_pipe[1] >= 0) {
+ close (self_pipe[1]);
+ self_pipe[1] = -1;
+ }
- for (i = 0; i < curl_handles.len; ++i)
- free_handle (curl_handles.ptr[i]);
- curl_handle_list_reset (&curl_handles);
+ if (multi) {
+ size_t i;
+
+ /* Remove and free any easy handles in the multi. */
+ for (i = 0; i < curl_handles.len; ++i) {
+ curl_multi_remove_handle (multi, curl_handles.ptr[i]->c);
+ free_handle (curl_handles.ptr[i]);
+ }
+
+ curl_multi_cleanup (multi);
+ multi = NULL;
+ }
}
-/* Get a handle from the pool.
- *
- * It is owned exclusively by the caller until they call put_handle.
+/* Command queue. */
+static _Atomic uint64_t id; /* next command ID */
+
+/* Send command to the background thread and wait for completion.
+ * This is only called by one of the nbdkit threads.
*/
-struct curl_handle *
-get_handle (void)
+CURLcode
+send_command_and_wait (struct command *cmd)
{
- ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
- size_t i;
- struct curl_handle *ch;
-
- again:
- /* Look for a handle which is not in_use. */
- for (i = 0; i < curl_handles.len; ++i) {
- ch = curl_handles.ptr[i];
- if (!ch->in_use) {
- ch->in_use = true;
- in_use++;
+ cmd->id = id++;
+
+ /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to
+ * indicate that the command has not yet been completed and status
+ * set.
+ */
+ cmd->status = -1;
+
+ /* This will be used to signal command completion back to us. */
+ pthread_mutex_init (&cmd->mutex, NULL);
+ pthread_cond_init (&cmd->cond, NULL);
+
+ /* Send the command to the background thread. */
+ if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd)
+ abort ();
+
+ /* Wait for the command to be completed by the background thread. */
+ {
+ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
+ while (cmd->status == -1) /* for -1, see above */
+ pthread_cond_wait (&cmd->cond, &cmd->mutex);
+ }
+
+ pthread_mutex_destroy (&cmd->mutex);
+ pthread_cond_destroy (&cmd->cond);
+
+ /* Note the main thread must call nbdkit_error on error! */
+ return cmd->status;
+}
+
+/* The background thread. */
+static void check_for_finished_handles (void);
+static void retire_command (struct command *cmd, CURLcode code);
+static void do_easy_handle (struct command *cmd);
+
+static void *
+pool_worker (void *vp)
+{
+ bool stop = false;
+
+ if (curl_debug_pool)
+ nbdkit_debug ("curl: background thread started");
+
+ while (!stop) {
+ struct command *cmd = NULL;
+ struct curl_waitfd extra_fds[1] + { { .fd = self_pipe[0], .events =
CURL_WAIT_POLLIN } };
+ CURLMcode mc;
+ int numfds, running_handles, repeats = 0;
+
+ do {
+ /* Process the multi handle. */
+ mc = curl_multi_perform (multi, &running_handles);
+ if (mc != CURLM_OK) {
+ nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror
(mc));
+ abort (); /* XXX We don't expect this to happen */
+ }
+
+ check_for_finished_handles ();
+
+ mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds);
+ if (mc != CURLM_OK) {
+ nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror
(mc));
+ abort (); /* XXX We don't expect this to happen */
+ }
+
if (curl_debug_pool)
- nbdkit_debug ("get_handle: %zu", ch->i);
- return ch;
- }
- }
+ nbdkit_debug ("curl_multi_wait returned: running_handles=%d
numfds=%d",
+ running_handles, numfds);
+
+ if (numfds == 0) {
+ repeats++;
+ if (repeats > 1)
+ nbdkit_nanosleep (1, 0);
+ }
+ else {
+ repeats = 0;
+ if (extra_fds[0].revents == CURL_WAIT_POLLIN) {
+ /* There's a command waiting. */
+ if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd)
+ abort ();
+ }
+ }
+ } while (!cmd);
- /* If more connections are allowed, then allocate a new handle. */
- if (curl_handles.len < connections) {
- ch = allocate_handle ();
- if (ch == NULL)
- return NULL;
- if (curl_handle_list_append (&curl_handles, ch) == -1) {
- free_handle (ch);
- return NULL;
- }
- ch->i = curl_handles.len - 1;
- ch->in_use = true;
- in_use++;
if (curl_debug_pool)
- nbdkit_debug ("get_handle: %zu", ch->i);
- return ch;
- }
+ nbdkit_debug ("curl: dispatching %s command %" PRIu64,
+ command_type_to_string (cmd->type), cmd->id);
+
+ switch (cmd->type) {
+ case STOP:
+ stop = true;
+ retire_command (cmd, CURLE_OK);
+ break;
- /* Otherwise we have run out of connections so we must wait until
- * another thread calls put_handle.
- */
- assert (in_use == connections);
- waiting++;
- while (in_use == connections)
- pthread_cond_wait (&cond, &lock);
- waiting--;
+ case EASY_HANDLE:
+ do_easy_handle (cmd);
+ break;
+ }
+ } /* while (!stop) */
- goto again;
+ if (curl_debug_pool)
+ nbdkit_debug ("curl: background thread stopped");
+
+ return NULL;
}
-/* Return the handle to the pool. */
-void
-put_handle (struct curl_handle *ch)
+/* This checks if any easy handles in the multi have
+ * finished and retires the associated commands.
+ */
+static void
+check_for_finished_handles (void)
{
- ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock);
+ CURLMsg *msg;
+ int msgs_in_queue;
+
+ while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) {
+ size_t i;
+ struct curl_handle *ch = NULL;
+
+ if (msg->msg == CURLMSG_DONE) {
+ /* Find this curl_handle. */
+ for (i = 0; i < curl_handles.len; ++i) {
+ if (curl_handles.ptr[i]->c == msg->easy_handle) {
+ ch = curl_handles.ptr[i];
+ curl_handle_list_remove (&curl_handles, i);
+ }
+ }
+ if (ch == NULL) abort ();
+ curl_multi_remove_handle (multi, ch->c);
+
+ retire_command (ch->cmd, msg->data.result);
+ }
+ }
+}
+/* Retire a command. status is a CURLcode. */
+static void
+retire_command (struct command *cmd, CURLcode status)
+{
if (curl_debug_pool)
- nbdkit_debug ("put_handle: %zu", ch->i);
+ nbdkit_debug ("curl: retiring %s command %" PRIu64,
+ command_type_to_string (cmd->type), cmd->id);
+
+ ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex);
+ cmd->status = status;
+ pthread_cond_signal (&cmd->cond);
+}
+
+static void
+do_easy_handle (struct command *cmd)
+{
+ CURLMcode mc;
+
+ cmd->ch->cmd = cmd;
+
+ /* Add the handle to the multi. */
+ mc = curl_multi_add_handle (multi, cmd->ch->c);
+ if (mc != CURLM_OK) {
+ nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror
(mc));
+ goto err;
+ }
- ch->in_use = false;
- in_use--;
+ if (curl_handle_list_append (&curl_handles, cmd->ch) == -1)
+ goto err;
+ return;
- /* Signal the next thread which is waiting. */
- if (waiting > 0)
- pthread_cond_signal (&cond);
+ err:
+ retire_command (cmd, CURLE_OUT_OF_MEMORY);
}
--
2.41.0
Eric Blake
2023-Aug-04 16:38 UTC
[Libguestfs] [PATCH nbdkit v2 8/9] curl: Use curl multi interface
On Fri, Jul 28, 2023 at 06:17:52PM +0100, Richard W.M. Jones wrote:> See the comment at the top of plugins/curl/pool.c for general > information about how this works. > > This makes a very large difference to performance over the previous > implementation. Note for the tests below I also applied the next > commit changing the behaviour of the connections parameter. > > Using this test case: > > $ uri=https://cloud-images.ubuntu.com/lunar/current/lunar-server-cloudimg-amd64.img > $ nbdkit -r -U - curl $uri ipresolve=v4 --run 'nbdcopy -p $uri null' > > The times are as follows: > > multi, connections=64 21.5s > multi, connections=32 30.2s > multi, connections=16 56.0s > before this commit 166sAwesome performance improvements! As painful as this series has been for you to write and debug, it is showing its worth.> --- > plugins/curl/curldefs.h | 35 ++-- > plugins/curl/config.c | 246 --------------------------- > plugins/curl/curl.c | 366 +++++++++++++++++++++++++++++++++++----- > plugins/curl/pool.c | 346 ++++++++++++++++++++++++++++--------- > 4 files changed, 616 insertions(+), 377 deletions(-)Finally taking time to review this, even though it is already in-tree.> @@ -98,8 +88,30 @@ struct curl_handle { > const char *read_buf; > uint32_t read_count; > > + /* This field is used by curl_get_size. */ > + bool accept_range; > + > /* Used by scripts.c */ > struct curl_slist *headers_copy; > + > + /* Used by pool.c */ > + struct command *cmd; > +}; > + > +/* Asynchronous commands that can be sent to the pool thread. */ > +enum command_type { EASY_HANDLE, STOP }; > +struct command { > + /* These fields are set by the caller. */ > + enum command_type type; /* command */ > + struct curl_handle *ch; /* for EASY_HANDLE, the easy handle */ > + > + /* This field is set to a unique value by send_command_and_wait. */ > + uint64_t id; /* serial number */ > + > + /* These fields are used to signal back that the command finished. */ > + pthread_mutex_t mutex; /* completion mutex */ > + pthread_cond_t cond; /* completion condition */ > + CURLcode status; /* status code (CURLE_OK = succeeded) */ > };Makes sense. The two types are mutually recursive (curl_handle includes a struct command *; command includes a struct curl_handle *); hopefully you have proper locking when altering multiple objects to adjust how they point to one another.> +++ b/plugins/curl/config.c> +++ b/plugins/curl/curl.c > > +/* Get the file size. */ > +static int get_content_length_accept_range (struct curl_handle *ch); > +static bool try_fallback_GET_method (struct curl_handle *ch); > +static size_t header_cb (void *ptr, size_t size, size_t nmemb, void *opaque); > +static size_t error_cb (char *ptr, size_t size, size_t nmemb, void *opaque); > + > +static int64_t > +curl_get_size (void *handle) > +{ > + struct curl_handle *ch; > + CURLcode r; > + long code; > +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T > + curl_off_t o; > +#else > + double d; > +#endif > + int64_t exportsize; > + > + /* Get a curl easy handle. */ > + ch = allocate_handle (); > + if (ch == NULL) goto err; > + > + /* Prepare to read the headers. */ > + if (get_content_length_accept_range (ch) == -1) > + goto err; > + > + /* Send the command to the worker thread and wait. */ > + struct command cmd = { > + .type = EASY_HANDLE, > + .ch = ch, > + }; > + > + r = send_command_and_wait (&cmd); > + update_times (ch->c); > + if (r != CURLE_OK) { > + display_curl_error (ch, r, > + "problem doing HEAD request to fetch size of URL [%s]", > + url); > + > + /* Get the HTTP status code, if available. */ > + r = curl_easy_getinfo (ch->c, CURLINFO_RESPONSE_CODE, &code); > + if (r == CURLE_OK) > + nbdkit_debug ("HTTP status code: %ld", code); > + else > + code = -1; > + > + /* See comment on try_fallback_GET_method below. */ > + if (code != 403 || !try_fallback_GET_method (ch)) > + goto err; > + } > + > + /* Get the content length. > + * > + * Note there is some subtlety here: For web servers using chunked > + * encoding, either the Content-Length header will not be present, > + * or if present it should be ignored. (For such servers the only > + * way to find out the true length would be to read all of the > + * content, which we don't want to do). > + * > + * Curl itself resolves this for us. It will ignore the > + * Content-Length header if chunked encoding is used, returning the > + * length as -1 which we check below (see also > + * curl:lib/http.c:Curl_http_size). > + */ > +#ifdef HAVE_CURLINFO_CONTENT_LENGTH_DOWNLOAD_T > + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD_T, &o); > + if (r != CURLE_OK) { > + display_curl_error (ch, r, > + "could not get length of remote file [%s]", url); > + goto err; > + } > + > + if (o == -1) { > + nbdkit_error ("could not get length of remote file [%s], " > + "is the URL correct?", url); > + goto err; > + } > + > + exportsize = o; > +#else > + r = curl_easy_getinfo (ch->c, CURLINFO_CONTENT_LENGTH_DOWNLOAD, &d); > + if (r != CURLE_OK) { > + display_curl_error (ch, r, > + "could not get length of remote file [%s]", url); > + goto err; > + } > + > + if (d == -1) { > + nbdkit_error ("could not get length of remote file [%s], " > + "is the URL correct?", url); > + goto err; > + } > + > + exportsize = d;Does curl guarantee that the double d will contain a value assignable to int64_t without overflow/truncation? For particularly large sizes, double has insufficient precision for all possible file sizes, but I doubt someone is exposing such large files over HTTP.> +#endif > + nbdkit_debug ("content length: %" PRIi64, exportsize); > + > + /* If this is HTTP, check that byte ranges are supported. */ > + if (ascii_strncasecmp (url, "http://", strlen ("http://")) == 0 || > + ascii_strncasecmp (url, "https://", strlen ("https://")) == 0) { > + if (!ch->accept_range) { > + nbdkit_error ("server does not support 'range' (byte range) requests"); > + goto err; > + } > + > + nbdkit_debug ("accept range supported (for HTTP/HTTPS)"); > + } > + > + free_handle (ch); > + return exportsize; > + > + err: > + if (ch) > + free_handle (ch); > + return -1; > +} > + > +/* Get the file size and also whether the remote HTTP server > + * supports byte ranges. > + */ > +static int > +get_content_length_accept_range (struct curl_handle *ch) > +{ > + /* We must run the scripts if necessary and set headers in the > + * handle. > + */ > + if (do_scripts (ch) == -1) > + return -1; > + > + /* Set this flag in the handle to false. The callback should set it > + * to true if byte ranges are supported, which we check below. > + */ > + ch->accept_range = false; > + > + /* No Body, not nobody! This forces a HEAD request. */ > + curl_easy_setopt (ch->c, CURLOPT_NOBODY, 1L); > + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); > + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); > + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, NULL); > + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, NULL); > + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, NULL); > + curl_easy_setopt (ch->c, CURLOPT_READDATA, NULL); > + return 0; > +} > + > +/* S3 servers can return 403 Forbidden for HEAD but still respond > + * to GET, so we give it a second chance in that case. > + * https://github.com/kubevirt/containerized-data-importer/issues/2737 > + * > + * This function issues a GET request with a writefunction that always > + * returns an error, thus effectively getting the headers but > + * abandoning the transfer as soon as possible after. > + */ > +static bool > +try_fallback_GET_method (struct curl_handle *ch) > +{ > + CURLcode r; > + > + nbdkit_debug ("attempting to fetch headers using GET method"); > + > + curl_easy_setopt (ch->c, CURLOPT_HTTPGET, 1L); > + curl_easy_setopt (ch->c, CURLOPT_HEADERFUNCTION, header_cb); > + curl_easy_setopt (ch->c, CURLOPT_HEADERDATA, ch); > + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, error_cb); > + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); > + > + struct command cmd = { > + .type = EASY_HANDLE, > + .ch = ch, > + }; > + > + r = send_command_and_wait (&cmd); > + update_times (ch->c); > + > + /* We expect CURLE_WRITE_ERROR here, but CURLE_OK is possible too > + * (eg if the remote has zero length). Other errors might happen > + * but we ignore them since it is a fallback path. > + */ > + return r == CURLE_OK || r == CURLE_WRITE_ERROR; > +} > + > +static size_t > +header_cb (void *ptr, size_t size, size_t nmemb, void *opaque) > +{ > + struct curl_handle *ch = opaque; > + size_t realsize = size * nmemb; > + const char *header = ptr; > + const char *end = header + realsize; > + const char *accept_ranges = "accept-ranges:"; > + const char *bytes = "bytes"; > + > + if (realsize >= strlen (accept_ranges) && > + ascii_strncasecmp (header, accept_ranges, strlen (accept_ranges)) == 0) { > + const char *p = strchr (header, ':') + 1; > + > + /* Skip whitespace between the header name and value. */ > + while (p < end && *p && ascii_isspace (*p))Technically, '*p && ascii_isspace (*p)' can be shortened to 'ascii_isspace (*p)', since the NUL byte is not ascii space. I don't know if the compiler is smart enough to make that optimization on your behalf.> + p++; > + > + if (end - p >= strlen (bytes) > + && strncmp (p, bytes, strlen (bytes)) == 0) { > + /* Check that there is nothing but whitespace after the value. */ > + p += strlen (bytes); > + while (p < end && *p && ascii_isspace (*p))Another spot of the same.> + p++; > + > + if (p == end || !*p) > + ch->accept_range = true; > + } > + } > + > + return realsize; > +} > + > +static size_t > +error_cb (char *ptr, size_t size, size_t nmemb, void *opaque) > +{ > +#ifdef CURL_WRITEFUNC_ERROR > + return CURL_WRITEFUNC_ERROR; > +#else > + return 0; /* in older curl, any size < requested will also be an error */ > +#endif > +} > + > /* Read data from the remote server. */ > +static size_t write_cb (char *ptr, size_t size, size_t nmemb, void *opaque); > + > static int > curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > { > CURLcode r; > + struct curl_handle *ch; > char range[128]; > > - GET_HANDLE_FOR_CURRENT_SCOPE (ch); > - if (ch == NULL) > - return -1; > + /* Get a curl easy handle. */ > + ch = allocate_handle (); > + if (ch == NULL) goto err; > > /* Run the scripts if necessary and set headers in the handle. */ > - if (do_scripts (ch) == -1) return -1; > + if (do_scripts (ch) == -1) goto err; > > /* Tell the write_cb where we want the data to be written. write_cb > * will update this if the data comes in multiple sections. > */ > + curl_easy_setopt (ch->c, CURLOPT_WRITEFUNCTION, write_cb); > + curl_easy_setopt (ch->c, CURLOPT_WRITEDATA, ch); > ch->write_buf = buf; > ch->write_count = count; > > @@ -183,11 +388,16 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > offset, offset + count); > curl_easy_setopt (ch->c, CURLOPT_RANGE, range); > > - /* The assumption here is that curl will look after timeouts. */ > - r = curl_easy_perform (ch->c); > + /* Send the command to the worker thread and wait. */ > + struct command cmd = { > + .type = EASY_HANDLE, > + .ch = ch, > + }; > + > + r = send_command_and_wait (&cmd); > if (r != CURLE_OK) { > - display_curl_error (ch, r, "pread: curl_easy_perform"); > - return -1; > + display_curl_error (ch, r, "pread"); > + goto err; > } > update_times (ch->c); > > @@ -198,26 +408,67 @@ curl_pread (void *handle, void *buf, uint32_t count, uint64_t offset) > /* As far as I understand the cURL API, this should never happen. */ > assert (ch->write_count == 0); > > + free_handle (ch); > return 0; > + > + err: > + if (ch) > + free_handle (ch); > + return -1; > +} > + > +/* NB: The terminology used by libcurl is confusing! > + * > + * WRITEFUNCTION / write_cb is used when reading from the remote server > + * READFUNCTION / read_cb is used when writing to the remote server. > + * > + * We use the same terminology as libcurl here. > + */ > +static size_t > +write_cb (char *ptr, size_t size, size_t nmemb, void *opaque) > +{ > + struct curl_handle *ch = opaque; > + size_t orig_realsize = size * nmemb; > + size_t realsize = orig_realsize;Do we have to worry about overflow when compiling on 32-bit machines? Asked differently, should we be using off_t instead of size_t in any of this code? Thankfully, for now, we know NBD .pread and .pwrite requests are capped at 64M, so I think you're okay (we aren't ever going to ask curl for gigabytes in one request), but maybe a comment or assert() is worth it?> + > + assert (ch->write_buf); > + > + /* Don't read more than the requested amount of data, even if the > + * server or libcurl sends more. > + */ > + if (realsize > ch->write_count) > + realsize = ch->write_count; > + > + memcpy (ch->write_buf, ptr, realsize); > + > + ch->write_count -= realsize; > + ch->write_buf += realsize; > + > + return orig_realsize;[1]> } > > /* Write data to the remote server. */ > +static size_t read_cb (void *ptr, size_t size, size_t nmemb, void *opaque); > + > static int > curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > { > CURLcode r; > + struct curl_handle *ch; > char range[128]; > > - GET_HANDLE_FOR_CURRENT_SCOPE (ch); > - if (ch == NULL) > - return -1; > + /* Get a curl easy handle. */ > + ch = allocate_handle (); > + if (ch == NULL) goto err; > > /* Run the scripts if necessary and set headers in the handle. */ > - if (do_scripts (ch) == -1) return -1; > + if (do_scripts (ch) == -1) goto err; > > /* Tell the read_cb where we want the data to be read from. read_cb > * will update this if the data comes in multiple sections. > */ > + curl_easy_setopt (ch->c, CURLOPT_READFUNCTION, read_cb); > + curl_easy_setopt (ch->c, CURLOPT_READDATA, ch); > ch->read_buf = buf; > ch->read_count = count; > > @@ -228,11 +479,16 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > offset, offset + count); > curl_easy_setopt (ch->c, CURLOPT_RANGE, range); > > - /* The assumption here is that curl will look after timeouts. */ > - r = curl_easy_perform (ch->c); > + /* Send the command to the worker thread and wait. */ > + struct command cmd = { > + .type = EASY_HANDLE, > + .ch = ch, > + }; > + > + r = send_command_and_wait (&cmd); > if (r != CURLE_OK) { > - display_curl_error (ch, r, "pwrite: curl_easy_perform"); > - return -1; > + display_curl_error (ch, r, "pwrite"); > + goto err; > } > update_times (ch->c); > > @@ -243,7 +499,31 @@ curl_pwrite (void *handle, const void *buf, uint32_t count, uint64_t offset) > /* As far as I understand the cURL API, this should never happen. */ > assert (ch->read_count == 0); > > + free_handle (ch); > return 0; > + > + err: > + if (ch) > + free_handle (ch); > + return -1; > +} > + > +static size_t > +read_cb (void *ptr, size_t size, size_t nmemb, void *opaque) > +{ > + struct curl_handle *ch = opaque; > + size_t realsize = size * nmemb; > + > + assert (ch->read_buf); > + if (realsize > ch->read_count) > + realsize = ch->read_count; > + > + memcpy (ptr, ch->read_buf, realsize); > + > + ch->read_count -= realsize; > + ch->read_buf += realsize; > + > + return realsize;Why does write_cb in [1] above return orig_realsize, but read_cb returns the potentially modified realsize?> } > > static struct nbdkit_plugin plugin = { > diff --git a/plugins/curl/pool.c b/plugins/curl/pool.c > index eb2d330e1..2974cda3f 100644 > --- a/plugins/curl/pool.c > +++ b/plugins/curl/pool.c > @@ -30,11 +30,29 @@ > * SUCH DAMAGE. > */ > > -/* Curl handle pool. > +/* Worker thread which processes the curl multi interface. > * > - * To get a libcurl handle, call get_handle(). When you hold the > - * handle, it is yours exclusively to use. After you have finished > - * with the handle, put it back into the pool by calling put_handle(). > + * The main nbdkit threads (see curl.c) create curl easy handles > + * initialized with the work they want to carry out. Note there is > + * one easy handle per task (eg. per pread/pwrite request). The easy > + * handles are not reused. > + * > + * The commands + optional easy handle are submitted to the worker > + * thread over a self-pipe (it's easy to use a pipe here because the > + * way curl multi works is it can listen on an extra fd, but not on > + * anything else like a pthread condition). The curl multi performs > + * the work of the outstanding easy handles. > + * > + * When an easy handle finishes work or errors, we retire the command > + * by signalling back to the waiting nbdkit thread using a pthread > + * condition. > + * > + * In my experiments, we're almost always I/O bound so I haven't seen > + * any strong need to use more than one curl multi / worker thread, > + * although it would be possible to add more in future. > + * > + * See also this extremely useful thread: > + * https://curl.se/mail/lib-2019-03/0100.htmlVery useful comment (and link).> */ > > #include <config.h> > @@ -45,9 +63,19 @@ > #include <stdint.h> > #include <inttypes.h> > #include <string.h> > +#include <unistd.h> > #include <assert.h> > #include <pthread.h> > > +#ifdef HAVE_STDATOMIC_H > +#include <stdatomic.h> > +#else > +/* Some old platforms lack atomic types, but 32 bit ints are usually > + * "atomic enough". > + */ > +#define _Atomic /**/ > +#endif > + > #include <curl/curl.h> > > #include <nbdkit-plugin.h> > @@ -62,115 +90,281 @@ NBDKIT_DLL_PUBLIC int curl_debug_pool = 0; > > unsigned connections = 4; > > -/* This lock protects access to the curl_handles vector below. */ > -static pthread_mutex_t lock = PTHREAD_MUTEX_INITIALIZER; > +/* Pipe used to notify background thread that a command is pending in > + * the queue. A pointer to the 'struct command' is sent over the > + * pipe. > + */ > +static int self_pipe[2] = { -1, -1 }; > > -/* List of curl handles. This is allocated dynamically as more > - * handles are requested. Currently it does not shrink. It may grow > - * up to 'connections' in length. > +/* The curl multi handle. */ > +static CURLM *multi; > + > +/* List of running easy handles. We only need to maintain this so we > + * can remove them from the multi handle when cleaning up. > */ > DEFINE_VECTOR_TYPE (curl_handle_list, struct curl_handle *); > static curl_handle_list curl_handles = empty_vector; > > -/* The condition is used when the curl handles vector is full and > - * we're waiting for a thread to put_handle. > - */ > -static pthread_cond_t cond = PTHREAD_COND_INITIALIZER; > -static size_t in_use = 0, waiting = 0; > +static const char * > +command_type_to_string (enum command_type type) > +{ > + switch (type) { > + case EASY_HANDLE: return "EASY_HANDLE"; > + case STOP: return "STOP"; > + default: abort (); > + } > +} > > int > pool_get_ready (void) > { > + multi = curl_multi_init (); > + if (multi == NULL) { > + nbdkit_error ("curl_multi_init failed: %m"); > + return -1; > + } > + > return 0; > } > > +/* Start and stop the background thread. */ > +static pthread_t thread; > +static bool thread_running; > +static void *pool_worker (void *); > + > int > pool_after_fork (void) > { > + int err; > + > + if (pipe (self_pipe) == -1) { > + nbdkit_error ("pipe: %m"); > + return -1; > + } > + > + /* Start the pool background thread where all the curl work is done. */ > + err = pthread_create (&thread, NULL, pool_worker, NULL); > + if (err != 0) { > + errno = err; > + nbdkit_error ("pthread_create: %m"); > + return -1; > + } > + thread_running = true; > + > return 0; > } > > -/* Close and free all handles in the pool. */ > +/* Unload the background thread. */ > void > pool_unload (void) > { > - size_t i; > + if (thread_running) { > + /* Stop the background thread. */ > + struct command cmd = { .type = STOP }; > + send_command_and_wait (&cmd); > + pthread_join (thread, NULL); > + thread_running = false; > + } > > - if (curl_debug_pool) > - nbdkit_debug ("unload_pool: number of curl handles allocated: %zu", > - curl_handles.len); > + if (self_pipe[0] >= 0) { > + close (self_pipe[0]); > + self_pipe[0] = -1; > + } > + if (self_pipe[1] >= 0) { > + close (self_pipe[1]); > + self_pipe[1] = -1; > + } > > - for (i = 0; i < curl_handles.len; ++i) > - free_handle (curl_handles.ptr[i]); > - curl_handle_list_reset (&curl_handles); > + if (multi) { > + size_t i; > + > + /* Remove and free any easy handles in the multi. */ > + for (i = 0; i < curl_handles.len; ++i) { > + curl_multi_remove_handle (multi, curl_handles.ptr[i]->c); > + free_handle (curl_handles.ptr[i]); > + } > + > + curl_multi_cleanup (multi); > + multi = NULL; > + } > } > > -/* Get a handle from the pool. > - * > - * It is owned exclusively by the caller until they call put_handle. > +/* Command queue. */ > +static _Atomic uint64_t id; /* next command ID */ > + > +/* Send command to the background thread and wait for completion. > + * This is only called by one of the nbdkit threads. > */ > -struct curl_handle * > -get_handle (void) > +CURLcode > +send_command_and_wait (struct command *cmd) > { > - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); > - size_t i; > - struct curl_handle *ch; > - > - again: > - /* Look for a handle which is not in_use. */ > - for (i = 0; i < curl_handles.len; ++i) { > - ch = curl_handles.ptr[i]; > - if (!ch->in_use) { > - ch->in_use = true; > - in_use++; > + cmd->id = id++; > + > + /* CURLcode is 0 (CURLE_OK) or > 0, so use -1 as a sentinel to > + * indicate that the command has not yet been completed and status > + * set. > + */ > + cmd->status = -1; > + > + /* This will be used to signal command completion back to us. */ > + pthread_mutex_init (&cmd->mutex, NULL); > + pthread_cond_init (&cmd->cond, NULL); > + > + /* Send the command to the background thread. */ > + if (write (self_pipe[1], &cmd, sizeof cmd) != sizeof cmd) > + abort (); > + > + /* Wait for the command to be completed by the background thread. */ > + { > + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); > + while (cmd->status == -1) /* for -1, see above */ > + pthread_cond_wait (&cmd->cond, &cmd->mutex); > + } > + > + pthread_mutex_destroy (&cmd->mutex); > + pthread_cond_destroy (&cmd->cond); > + > + /* Note the main thread must call nbdkit_error on error! */ > + return cmd->status; > +} > + > +/* The background thread. */ > +static void check_for_finished_handles (void); > +static void retire_command (struct command *cmd, CURLcode code); > +static void do_easy_handle (struct command *cmd); > + > +static void * > +pool_worker (void *vp) > +{ > + bool stop = false; > + > + if (curl_debug_pool) > + nbdkit_debug ("curl: background thread started"); > + > + while (!stop) { > + struct command *cmd = NULL; > + struct curl_waitfd extra_fds[1] > + { { .fd = self_pipe[0], .events = CURL_WAIT_POLLIN } }; > + CURLMcode mc; > + int numfds, running_handles, repeats = 0; > + > + do { > + /* Process the multi handle. */ > + mc = curl_multi_perform (multi, &running_handles); > + if (mc != CURLM_OK) { > + nbdkit_error ("curl_multi_perform: %s", curl_multi_strerror (mc));Since nbdkit_error() stores its string in thread-local storage, is there anything that ever extracts this error over to the nbdkit thread that issued the original request to the worker thread?...> + abort (); /* XXX We don't expect this to happen */...Then again, if we abort, it doesn't matter.> + }> + > + check_for_finished_handles (); > + > + mc = curl_multi_wait (multi, extra_fds, 1, 1000000, &numfds); > + if (mc != CURLM_OK) { > + nbdkit_error ("curl_multi_wait: %s", curl_multi_strerror (mc)); > + abort (); /* XXX We don't expect this to happen */ > + } > + > if (curl_debug_pool) > - nbdkit_debug ("get_handle: %zu", ch->i); > - return ch; > - } > - } > + nbdkit_debug ("curl_multi_wait returned: running_handles=%d numfds=%d", > + running_handles, numfds); > + > + if (numfds == 0) { > + repeats++; > + if (repeats > 1) > + nbdkit_nanosleep (1, 0); > + } > + else { > + repeats = 0; > + if (extra_fds[0].revents == CURL_WAIT_POLLIN) { > + /* There's a command waiting. */ > + if (read (self_pipe[0], &cmd, sizeof cmd) != sizeof cmd) > + abort (); > + } > + } > + } while (!cmd); > > - /* If more connections are allowed, then allocate a new handle. */ > - if (curl_handles.len < connections) { > - ch = allocate_handle (); > - if (ch == NULL) > - return NULL; > - if (curl_handle_list_append (&curl_handles, ch) == -1) { > - free_handle (ch); > - return NULL; > - } > - ch->i = curl_handles.len - 1; > - ch->in_use = true; > - in_use++; > if (curl_debug_pool) > - nbdkit_debug ("get_handle: %zu", ch->i); > - return ch; > - } > + nbdkit_debug ("curl: dispatching %s command %" PRIu64, > + command_type_to_string (cmd->type), cmd->id); > + > + switch (cmd->type) { > + case STOP: > + stop = true; > + retire_command (cmd, CURLE_OK); > + break; > > - /* Otherwise we have run out of connections so we must wait until > - * another thread calls put_handle. > - */ > - assert (in_use == connections); > - waiting++; > - while (in_use == connections) > - pthread_cond_wait (&cond, &lock); > - waiting--; > + case EASY_HANDLE: > + do_easy_handle (cmd); > + break; > + } > + } /* while (!stop) */ > > - goto again; > + if (curl_debug_pool) > + nbdkit_debug ("curl: background thread stopped"); > + > + return NULL; > } > > -/* Return the handle to the pool. */ > -void > -put_handle (struct curl_handle *ch) > +/* This checks if any easy handles in the multi have > + * finished and retires the associated commands. > + */ > +static void > +check_for_finished_handles (void) > { > - ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&lock); > + CURLMsg *msg; > + int msgs_in_queue; > + > + while ((msg = curl_multi_info_read (multi, &msgs_in_queue)) != NULL) { > + size_t i; > + struct curl_handle *ch = NULL; > + > + if (msg->msg == CURLMSG_DONE) { > + /* Find this curl_handle. */ > + for (i = 0; i < curl_handles.len; ++i) { > + if (curl_handles.ptr[i]->c == msg->easy_handle) { > + ch = curl_handles.ptr[i]; > + curl_handle_list_remove (&curl_handles, i); > + } > + } > + if (ch == NULL) abort (); > + curl_multi_remove_handle (multi, ch->c); > + > + retire_command (ch->cmd, msg->data.result); > + } > + } > +} > > +/* Retire a command. status is a CURLcode. */ > +static void > +retire_command (struct command *cmd, CURLcode status) > +{ > if (curl_debug_pool) > - nbdkit_debug ("put_handle: %zu", ch->i); > + nbdkit_debug ("curl: retiring %s command %" PRIu64, > + command_type_to_string (cmd->type), cmd->id); > + > + ACQUIRE_LOCK_FOR_CURRENT_SCOPE (&cmd->mutex); > + cmd->status = status; > + pthread_cond_signal (&cmd->cond); > +} > + > +static void > +do_easy_handle (struct command *cmd) > +{ > + CURLMcode mc; > + > + cmd->ch->cmd = cmd; > + > + /* Add the handle to the multi. */ > + mc = curl_multi_add_handle (multi, cmd->ch->c); > + if (mc != CURLM_OK) { > + nbdkit_error ("curl_multi_add_handle: %s", curl_multi_strerror (mc)); > + goto err; > + } > > - ch->in_use = false; > - in_use--; > + if (curl_handle_list_append (&curl_handles, cmd->ch) == -1) > + goto err; > + return; > > - /* Signal the next thread which is waiting. */ > - if (waiting > 0) > - pthread_cond_signal (&cond); > + err: > + retire_command (cmd, CURLE_OUT_OF_MEMORY); > } > -- > 2.41.0Overall looks nice, and I learned more about curl in the process. -- Eric Blake, Principal Software Engineer Red Hat, Inc. Virtualization: qemu.org | libguestfs.org