Nir Soffer
2021-Jan-22 22:45 UTC
[Libguestfs] [PATCH 0/6] v2v: rhv-upload-plugin: Support multiple connections
Update the plugin to API_VERSION 2, and enable parallel threading model. With unordered writes in qemu-img, and 8 threads in nbdkit, matching number of parallel coroutines in qemu-img, I see about 50% speedup compared with current master. I tested this on a vm, using NFS storage provided by another vm, and accessing ovirt-engine on a third vm. Results are not stable, and can vary by 100% between runs. I'm comparing the best result from 3-5 runs. We need to test these changes with real images, real servers, and real storage to get more reliable results, but I think we see a clear trend. I will try to test this RHV scale lab next week. Nir Soffer (6): v2v: output_rhv_upload: Require nbdkit >= 1.22 v2v: rhv-upload-plugin: Use API_VERSION 2 v2v: rhv-upload-plugin: Get imageio connections limits v2v: rhv-upload-plugin: Support multiple connections v2v: Use unordered writes in qemu-img convert v2v: nbdkit: Match qemu-img number of parallel coroutines v2v/nbdkit.ml | 3 + v2v/output_rhv_upload.ml | 8 +- v2v/rhv-upload-plugin.py | 295 ++++++++++++++++++++++++++------------- v2v/v2v.ml | 1 + 4 files changed, 208 insertions(+), 99 deletions(-) -- 2.26.2
Nir Soffer
2021-Jan-22 22:45 UTC
[Libguestfs] [PATCH 1/6] v2v: output_rhv_upload: Require nbdkit >= 1.22
nbdkit 1.22 provides API_VERSION 2 and parallel threading model for the python plugin. This version is available in RHEL AV 8.3.0. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/output_rhv_upload.ml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/v2v/output_rhv_upload.ml b/v2v/output_rhv_upload.ml index b5cc95b9..08d6dd25 100644 --- a/v2v/output_rhv_upload.ml +++ b/v2v/output_rhv_upload.ml @@ -84,11 +84,11 @@ let parse_output_options options { rhv_cafile; rhv_cluster; rhv_direct; rhv_verifypeer; rhv_disk_uuids } -(* In theory even very old versions of nbdkit might work, but as - * with [Nbdkit_sources] check for at least 1.12. +(* We need nbdkit >= 1.22 for API_VERSIO 2 and parellel threading model in the + * python plugin. *) -let nbdkit_min_version = (1, 12, 0) -let nbdkit_min_version_string = "1.12.0" +let nbdkit_min_version = (1, 22, 0) +let nbdkit_min_version_string = "1.22.0" let nbdkit_python_plugin = Config.nbdkit_python_plugin let pidfile_timeout = 30 -- 2.26.2
Nir Soffer
2021-Jan-22 22:45 UTC
[Libguestfs] [PATCH 2/6] v2v: rhv-upload-plugin: Use API_VERSION 2
Update the callbacks to use new API, avoiding copies in pread(). Since pread() is not used during import, this is not very useful, but it allow using parallel threading model. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/rhv-upload-plugin.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index e261dfdb..0f8101dd 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -32,6 +32,9 @@ from urllib.parse import urlparse import ovirtsdk4 as sdk import ovirtsdk4.types as types +# Using version 2 supporting the buffer protocol for better performance. +API_VERSION = 2 + # Timeout to wait for oVirt disks to change status, or the transfer # object to finish initializing [seconds]. timeout = 5 * 60 @@ -190,7 +193,8 @@ def request_failed(r, msg): @failing -def pread(h, count, offset): +def pread(h, buf, offset, flags): + count = len(buf) http = h['http'] headers = {"Range": "bytes=%d-%d" % (offset, offset + count - 1)} @@ -203,11 +207,26 @@ def pread(h, count, offset): "could not read sector offset %d size %d" % (offset, count)) - return r.read() + content_length = int(r.getheader("content-length")) + if content_length != count: + # Should never happen. + request_failed(r, + "unexpected Content-Length offset %d size %d got %d" % + (offset, count, content_length)) + + with memoryview(buf) as view: + got = 0 + while got < count: + n = r.readinto(view[got:]) + if n == 0: + request_failed(r, + "short read offset %d size %d got %d" % + (offset, count, got)) + got += n @failing -def pwrite(h, buf, offset): +def pwrite(h, buf, offset, flags): http = h['http'] count = len(buf) @@ -234,7 +253,7 @@ def pwrite(h, buf, offset): @failing -def zero(h, count, offset, may_trim): +def zero(h, count, offset, flags): http = h['http'] # Unlike the trim and flush calls, there is no 'can_zero' method @@ -292,7 +311,7 @@ def emulate_zero(h, count, offset): @failing -def flush(h): +def flush(h, flags): http = h['http'] # Construct the JSON request for flushing. -- 2.26.2
Nir Soffer
2021-Jan-22 22:45 UTC
[Libguestfs] [PATCH 3/6] v2v: rhv-upload-plugin: Get imageio connections limits
In imageio >= 2.0 the server reports the maximum number of readers and writers. We will use these limits when creating a connection pool to the server. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/rhv-upload-plugin.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 0f8101dd..e639be0c 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -136,6 +136,7 @@ def open(readonly): debug("imageio features: flush=%(can_flush)r " "zero=%(can_zero)r unix_socket=%(unix_socket)r" + "max_readers=%(max_readers)r max_writers=%(max_writers)r" % options) # Save everything we need to make requests in the handle. @@ -700,6 +701,8 @@ def get_options(http, url): "can_flush": "flush" in features, "can_zero": "zero" in features, "unix_socket": j.get('unix_socket'), + "max_readers": j.get("max_readers", 1), + "max_writers": j.get("max_writers", 1), } elif r.status == 405 or r.status == 204: @@ -709,6 +712,8 @@ def get_options(http, url): "can_flush": False, "can_zero": False, "unix_socket": None, + "max_readers": 1, + "max_writers": 1, } else: raise RuntimeError("could not use OPTIONS request: %d: %s" % -- 2.26.2
Nir Soffer
2021-Jan-22 22:45 UTC
[Libguestfs] [PATCH 4/6] v2v: rhv-upload-plugin: Support multiple connections
Use multiple connections to imageio server to speed up the transfer. Connections are managed via a thread safe queue. Threads remove a connection from the queue for every request, and put it back when at the end of the request. Only one thread can access the connection at the same time. Threads are accessing existing values in the handle dict, like h["path"]. They may also modify h["failed"] on errors. These operations are thread safe and do not require additional locking. Sending flush request is more tricky; on imageio side, we have one qemu-nbd server, with multiple connections. I'm not sure if sending one flush command on one of the connections is good enough to flush all commands, so we send flush command on all connections in the flush callback. Since flush may be called when there are in-flight requests, we wait until all in-flight request are done before sending a flush. While flushing, the connection pool is empty so new requests will block on the queue until the flush is completed. Closing is done in a similar way, waiting until all in-flight requests are done and closing all connections. Testing shows that we requests are spread over 4 connections, but performance is worse. Connection time increased from 5.36 seconds to 7.08 seconds (32% slower). [connection 1 ops, 7.053520 s] [dispatch 548 ops, 1.188910 s] [write 469 ops, 1.014646 s, 332.75 MiB, 327.95 MiB/s] [zero 77 ops, 0.058495 s, 1.13 GiB, 19.29 GiB/s] [flush 2 ops, 0.000291 s] [connection 1 ops, 7.085039 s] [dispatch 548 ops, 1.097437 s] [write 478 ops, 0.924214 s, 323.25 MiB, 349.76 MiB/s] [zero 68 ops, 0.052265 s, 1.22 GiB, 23.43 GiB/s] [flush 2 ops, 0.000258 s] [connection 1 ops, 7.037253 s] [dispatch 547 ops, 1.111386 s] [write 477 ops, 0.959592 s, 343.25 MiB, 357.70 MiB/s] [zero 68 ops, 0.047005 s, 1.20 GiB, 25.44 GiB/s] [flush 2 ops, 0.000266 s] [connection 1 ops, 7.045538 s] [dispatch 548 ops, 1.191029 s] [write 482 ops, 1.041125 s, 347.12 MiB, 333.41 MiB/s] [zero 64 ops, 0.045171 s, 1.14 GiB, 25.16 GiB/s] [flush 2 ops, 0.000210 s] Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/rhv-upload-plugin.py | 293 +++++++++++++++++++++++++-------------- 1 file changed, 187 insertions(+), 106 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index e639be0c..f3242f87 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -21,20 +21,28 @@ import functools import inspect import json import logging +import queue import socket import ssl import sys import time +from contextlib import contextmanager from http.client import HTTPSConnection, HTTPConnection from urllib.parse import urlparse +import nbdkit + import ovirtsdk4 as sdk import ovirtsdk4.types as types # Using version 2 supporting the buffer protocol for better performance. API_VERSION = 2 +# Maximum number of connection to imageio server. Based on testing with imageio +# client, this give best performance. +MAX_CONNECTIONS = 4 + # Timeout to wait for oVirt disks to change status, or the transfer # object to finish initializing [seconds]. timeout = 5 * 60 @@ -61,6 +69,14 @@ def config_complete(): raise RuntimeError("missing configuration parameters") +def thread_model(): + """ + Using parallel model to speed up transfer with multiple connections to + imageio server. + """ + return nbdkit.THREAD_MODEL_PARALLEL + + def debug(s): if params['verbose']: print(s, file=sys.stderr) @@ -129,7 +145,7 @@ def open(readonly): # See https://bugzilla.redhat.com/1916176. http.close() - http = optimize_http(http, host, options) + pool = create_http_pool(destination_url, host, options) except: cancel_transfer(connection, transfer) raise @@ -147,7 +163,8 @@ def open(readonly): 'disk_id': disk.id, 'transfer': transfer, 'failed': False, - 'http': http, + 'pool': pool, + 'connections': pool.qsize(), 'path': destination_url.path, } @@ -196,67 +213,65 @@ def request_failed(r, msg): @failing def pread(h, buf, offset, flags): count = len(buf) - http = h['http'] - headers = {"Range": "bytes=%d-%d" % (offset, offset + count - 1)} - http.request("GET", h['path'], headers=headers) - r = http.getresponse() - # 206 = HTTP Partial Content. - if r.status != 206: - request_failed(r, - "could not read sector offset %d size %d" % - (offset, count)) - - content_length = int(r.getheader("content-length")) - if content_length != count: - # Should never happen. - request_failed(r, - "unexpected Content-Length offset %d size %d got %d" % - (offset, count, content_length)) - - with memoryview(buf) as view: - got = 0 - while got < count: - n = r.readinto(view[got:]) - if n == 0: - request_failed(r, - "short read offset %d size %d got %d" % - (offset, count, got)) - got += n + with http_context(h) as http: + http.request("GET", h['path'], headers=headers) + + r = http.getresponse() + # 206 = HTTP Partial Content. + if r.status != 206: + request_failed(r, + "could not read sector offset %d size %d" % + (offset, count)) + + content_length = int(r.getheader("content-length")) + if content_length != count: + # Should never happen. + request_failed(r, + "unexpected Content-Length offset %d size %d got %d" % + (offset, count, content_length)) + + with memoryview(buf) as view: + got = 0 + while got < count: + n = r.readinto(view[got:]) + if n == 0: + request_failed(r, + "short read offset %d size %d got %d" % + (offset, count, got)) + got += n @failing def pwrite(h, buf, offset, flags): - http = h['http'] - count = len(buf) - http.putrequest("PUT", h['path'] + "?flush=n") - # The oVirt server only uses the first part of the range, and the - # content-length. - http.putheader("Content-Range", "bytes %d-%d/*" % (offset, offset + count - 1)) - http.putheader("Content-Length", str(count)) - http.endheaders() + with http_context(h) as http: + http.putrequest("PUT", h['path'] + "?flush=n") + # The oVirt server only uses the first part of the range, and the + # content-length. + http.putheader("Content-Range", "bytes %d-%d/*" % + (offset, offset + count - 1)) + http.putheader("Content-Length", str(count)) + http.endheaders() - try: - http.send(buf) - except BrokenPipeError: - pass + try: + http.send(buf) + except BrokenPipeError: + pass - r = http.getresponse() - if r.status != 200: - request_failed(r, - "could not write sector offset %d size %d" % - (offset, count)) + r = http.getresponse() + if r.status != 200: + request_failed(r, + "could not write sector offset %d size %d" % + (offset, count)) - r.read() + r.read() @failing def zero(h, count, offset, flags): - http = h['http'] - # Unlike the trim and flush calls, there is no 'can_zero' method # so nbdkit could call this even if the server doesn't support # zeroing. If this is the case we must emulate. @@ -273,65 +288,66 @@ def zero(h, count, offset, flags): headers = {"Content-Type": "application/json", "Content-Length": str(len(buf))} - http.request("PATCH", h['path'], body=buf, headers=headers) + with http_context(h) as http: + http.request("PATCH", h['path'], body=buf, headers=headers) - r = http.getresponse() - if r.status != 200: - request_failed(r, - "could not zero sector offset %d size %d" % - (offset, count)) + r = http.getresponse() + if r.status != 200: + request_failed(r, + "could not zero sector offset %d size %d" % + (offset, count)) - r.read() + r.read() def emulate_zero(h, count, offset): - http = h['http'] + with http_context(h) as http: + http.putrequest("PUT", h['path']) + http.putheader("Content-Range", + "bytes %d-%d/*" % (offset, offset + count - 1)) + http.putheader("Content-Length", str(count)) + http.endheaders() - http.putrequest("PUT", h['path']) - http.putheader("Content-Range", - "bytes %d-%d/*" % (offset, offset + count - 1)) - http.putheader("Content-Length", str(count)) - http.endheaders() - - try: - buf = bytearray(128 * 1024) - while count > len(buf): - http.send(buf) - count -= len(buf) - http.send(memoryview(buf)[:count]) - except BrokenPipeError: - pass + try: + buf = bytearray(128 * 1024) + while count > len(buf): + http.send(buf) + count -= len(buf) + http.send(memoryview(buf)[:count]) + except BrokenPipeError: + pass - r = http.getresponse() - if r.status != 200: - request_failed(r, - "could not write zeroes offset %d size %d" % - (offset, count)) + r = http.getresponse() + if r.status != 200: + request_failed(r, + "could not write zeroes offset %d size %d" % + (offset, count)) - r.read() + r.read() @failing def flush(h, flags): - http = h['http'] - # Construct the JSON request for flushing. buf = json.dumps({'op': "flush"}).encode() headers = {"Content-Type": "application/json", "Content-Length": str(len(buf))} - http.request("PATCH", h['path'], body=buf, headers=headers) + # Wait until all inflight requests are completed, and send a flush request + # for all imageio connections. - r = http.getresponse() - if r.status != 200: - request_failed(r, "could not flush") + for http in iter_http_pool(h): + http.request("PATCH", h['path'], body=buf, headers=headers) + + r = http.getresponse() + if r.status != 200: + request_failed(r, "could not flush") - r.read() + r.read() def close(h): - http = h['http'] connection = h['connection'] transfer = h['transfer'] disk_id = h['disk_id'] @@ -342,7 +358,7 @@ def close(h): # plugin exits. sys.stderr.flush() - http.close() + close_http_pool(h) # If the connection failed earlier ensure we cancel the transfer. Canceling # the transfer will delete the disk. @@ -647,6 +663,81 @@ def transfer_supports_format(): return "format" in sig.parameters +# Connection pool managment + + +def create_http_pool(url, host, options): + pool = queue.Queue() + + count = min(options["max_readers"], + options["max_writers"], + MAX_CONNECTIONS) + debug("using %d connections" % count) + + unix_socket = options["unix_socket"] if host is not None else None + + for i in range(count): + http = create_http(url, unix_socket=unix_socket) + pool.put(http) + + return pool + + + at contextmanager +def http_context(h): + """ + Context manager yielding an imageio http connection from the pool. Blocks + until a connection is available. + """ + pool = h["pool"] + http = pool.get() + try: + yield http + finally: + pool.put(http) + + +def iter_http_pool(h): + """ + Wait until all inflight requests are done, and iterate on imageio + connections. + + The pool is empty during iteration. New requests issued during iteration + will block until iteration is done. + """ + pool = h["pool"] + locked = [] + + # Lock the pool by taking the connection out. + while len(locked) < h["connections"]: + locked.append(pool.get()) + + try: + for http in locked: + yield http + finally: + # Unlock the pool by puting the connection back. + for http in locked: + pool.put(http) + + +def close_http_pool(h): + """ + Wait until all inflight requests are done, close all connections and remove + them from the pool. + + No request can be served by the pool after this call. + """ + pool = h["pool"] + locked = [] + + while len(locked) < h["connections"]: + locked.append(pool.get()) + + for http in locked: + http.close() + + # oVirt imageio operations @@ -665,12 +756,20 @@ def parse_transfer_url(transfer): return urlparse(transfer.proxy_url) -def create_http(url): +def create_http(url, unix_socket=None): """ Create http connection for transfer url. Returns HTTPConnection. """ + if unix_socket: + debug("using unix socket %r" % unix_socket) + try: + return UnixHTTPConnection(unix_socket) + except Exception as e: + # Very unlikely, but we can recover by using https. + debug("cannot create unix socket connection: %s" % e) + if url.scheme == "https": context = \ ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, @@ -679,8 +778,10 @@ def create_http(url): context.check_hostname = False context.verify_mode = ssl.CERT_NONE + debug("using https connection") return HTTPSConnection(url.hostname, url.port, context=context) elif url.scheme == "http": + debug("using http connection") return HTTPConnection(url.hostname, url.port) else: raise RuntimeError("unknown URL scheme (%s)" % url.scheme) @@ -718,23 +819,3 @@ def get_options(http, url): else: raise RuntimeError("could not use OPTIONS request: %d: %s" % (r.status, r.reason)) - - -def optimize_http(http, host, options): - """ - Return an optimized http connection using unix socket if we are connected - to imageio server on the local host and it features a unix socket. - """ - unix_socket = options['unix_socket'] - - if host is not None and unix_socket is not None: - try: - http = UnixHTTPConnection(unix_socket) - except Exception as e: - # Very unlikely failure, but we can recover by using the https - # connection. - debug("cannot create unix socket connection, using https: %s" % e) - else: - debug("optimizing connection using unix socket %r" % unix_socket) - - return http -- 2.26.2
Nir Soffer
2021-Jan-22 22:45 UTC
[Libguestfs] [PATCH 5/6] v2v: Use unordered writes in qemu-img convert
Without unordered writes, qemu-img read data in parallel, but issue requests in order, keeping only one in-flight request. With unordered writes, qemu keeps up to 8 in-flight requests by default. With this change we see significant improvement in the rhv-upload-plugin. Connection time decreased from 7.08 seconds to 3.77 seconds, 88% faster. Compared with last version using single connection, we are now 42% faster. [connection 1 ops, 3.778596 s] [dispatch 547 ops, 2.977509 s] [write 464 ops, 2.703108 s, 355.62 MiB, 131.56 MiB/s] [zero 81 ops, 0.118312 s, 1.22 GiB, 10.31 GiB/s] [flush 2 ops, 0.000222 s] [connection 1 ops, 3.774985 s] [dispatch 555 ops, 2.976595 s] [write 490 ops, 2.685608 s, 352.19 MiB, 131.14 MiB/s] [zero 63 ops, 0.122802 s, 1.20 GiB, 9.80 GiB/s] [flush 2 ops, 0.000203 s] [connection 1 ops, 3.777071 s] [dispatch 564 ops, 2.930730 s] [write 499 ops, 2.631180 s, 323.38 MiB, 122.90 MiB/s] [zero 63 ops, 0.119163 s, 1.17 GiB, 9.86 GiB/s] [flush 2 ops, 0.000255 s] [connection 1 ops, 3.778360 s] [dispatch 528 ops, 2.979062 s] [write 456 ops, 2.676340 s, 318.00 MiB, 118.82 MiB/s] [zero 70 ops, 0.118221 s, 1.08 GiB, 9.18 GiB/s] [flush 2 ops, 0.000202 s] Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/v2v.ml | 1 + 1 file changed, 1 insertion(+) diff --git a/v2v/v2v.ml b/v2v/v2v.ml index 54f26297..aa9d9713 100644 --- a/v2v/v2v.ml +++ b/v2v/v2v.ml @@ -763,6 +763,7 @@ and copy_targets cmdline targets input output [ "-n"; "-f"; "qcow2"; "-O"; output#transfer_format t ] @ (if cmdline.compressed then [ "-c" ] else []) @ [ "-S"; "64k" ] @ + [ "-W"; ] @ [ overlay_file; filename ] in let start_time = gettimeofday () in if run_command cmd <> 0 then -- 2.26.2
Nir Soffer
2021-Jan-22 22:45 UTC
[Libguestfs] [PATCH 6/6] v2v: nbdkit: Match qemu-img number of parallel coroutines
qemu-img is using 8 parallel coroutines by default. I tests up to 16 parallel coroutines and it seems that 8 gives good results. nbdkit uses 16 threads by default. Testing nbdkit with qemu-img show that 8 threads give good results. I think for rhv upload plugin matching the number of threads to the number of connections would be optimal. We need to improve this later to use the optimal number for the configured input and output plugins. Testing rhv-upload-plugin show small improvement (~6%) in total connection time. Compared with last version using single connection, we are now 50% faster. Results are not stable, we need to test this with bigger images and real environment. [connection 1 ops, 3.561693 s] [dispatch 550 ops, 2.808350 s] [write 470 ops, 2.482875 s, 316.06 MiB, 127.30 MiB/s] [zero 78 ops, 0.178174 s, 1.26 GiB, 7.05 GiB/s] [flush 2 ops, 0.000211 s] [connection 1 ops, 3.561724 s] [dispatch 543 ops, 2.836738 s] [write 472 ops, 2.503561 s, 341.62 MiB, 136.46 MiB/s] [zero 69 ops, 0.162465 s, 1.12 GiB, 6.89 GiB/s] [flush 2 ops, 0.000181 s] [connection 1 ops, 3.566931 s] [dispatch 536 ops, 2.807226 s] [write 462 ops, 2.508345 s, 326.12 MiB, 130.02 MiB/s] [zero 72 ops, 0.141442 s, 1.30 GiB, 9.20 GiB/s] [flush 2 ops, 0.000158 s] [connection 1 ops, 3.564396 s] [dispatch 563 ops, 2.853623 s] [write 503 ops, 2.592482 s, 361.44 MiB, 139.42 MiB/s] [zero 58 ops, 0.113708 s, 1.01 GiB, 8.88 GiB/s] [flush 2 ops, 0.000149 s] Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/nbdkit.ml | 3 +++ 1 file changed, 3 insertions(+) diff --git a/v2v/nbdkit.ml b/v2v/nbdkit.ml index 46b20c9d..caa76342 100644 --- a/v2v/nbdkit.ml +++ b/v2v/nbdkit.ml @@ -137,6 +137,9 @@ let run_unix cmd add_arg "--pidfile"; add_arg pidfile; add_arg "--unix"; add_arg sock; + (* Match qemu-img default number of parallel coroutines *) + add_arg "--threads"; add_arg "8"; + (* Reduce verbosity in nbdkit >= 1.17.4. *) let version = version (config ()) in if version >= (1, 17, 4) then ( -- 2.26.2