Nir Soffer
2021-Feb-03 21:13 UTC
[Libguestfs] [PATCH v3 0/7] v2v: rhv-upload-plugin: Performance improvments
Update the plugin to API_VERSION 2, and enable parallel threading model. With unordered writes in qemu-img, and 8 threads in nbdkit, matching number of parallel coroutines in qemu-img, I see about 50% speedup compared with current master. I tested this on a vm, using NFS storage provided by another vm, and accessing ovirt-engine on a third vm. Results are not stable, and can vary by 100% between runs. I'm comparing the best result from 3-5 runs. We need to test these changes with real images, real servers, and real storage to get more reliable results, but I could not find avilable server yet. Changes since v2: - Add plugin#write_out_order method and use it to enable out of order writes (-W) in the rhv upload plugin. - Update the commit message about fua and flush support, based on the discussion with Eric. - Fix more typos in comments v2 was here: https://www.redhat.com/archives/libguestfs/2021-January/msg00067.html Nir Soffer (7): v2v: output_rhv_upload: Require nbdkit >= 1.22 v2v: rhv-upload-plugin: Use API_VERSION 2 v2v: rhv-upload-plugin: Support FUA v2v: rhv-upload-plugin: Get imageio connections limits v2v: rhv-upload-plugin: Support multiple connections v2v: rhv-upload: Use out of order writes v2v: rvh-upload: Match nbdkit threads to qemu-img coroutines v2v/nbdkit.ml | 4 + v2v/nbdkit.mli | 1 + v2v/output_rhv_upload.ml | 13 +- v2v/rhv-upload-plugin.py | 318 +++++++++++++++++++++++++++------------ v2v/types.ml | 1 + v2v/types.mli | 2 + v2v/v2v.ml | 1 + 7 files changed, 238 insertions(+), 102 deletions(-) -- 2.26.2
Nir Soffer
2021-Feb-03 21:13 UTC
[Libguestfs] [PATCH v3 1/7] v2v: output_rhv_upload: Require nbdkit >= 1.22
nbdkit 1.22 provides API_VERSION 2 and parallel threading model for the python plugin. This version is available in RHEL AV 8.3.0. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/output_rhv_upload.ml | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/v2v/output_rhv_upload.ml b/v2v/output_rhv_upload.ml index b5cc95b9..40768519 100644 --- a/v2v/output_rhv_upload.ml +++ b/v2v/output_rhv_upload.ml @@ -84,11 +84,11 @@ let parse_output_options options { rhv_cafile; rhv_cluster; rhv_direct; rhv_verifypeer; rhv_disk_uuids } -(* In theory even very old versions of nbdkit might work, but as - * with [Nbdkit_sources] check for at least 1.12. +(* We need nbdkit >= 1.22 for API_VERSION 2 and parallel threading model + * in the python plugin. *) -let nbdkit_min_version = (1, 12, 0) -let nbdkit_min_version_string = "1.12.0" +let nbdkit_min_version = (1, 22, 0) +let nbdkit_min_version_string = "1.22.0" let nbdkit_python_plugin = Config.nbdkit_python_plugin let pidfile_timeout = 30 -- 2.26.2
Nir Soffer
2021-Feb-03 21:13 UTC
[Libguestfs] [PATCH v3 2/7] v2v: rhv-upload-plugin: Use API_VERSION 2
Update the callbacks to use new API, avoiding copies in pread(). Since pread() is not used during import, this is not very useful, but it allow using parallel threading model. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/rhv-upload-plugin.py | 29 ++++++++++++++++++++++++----- 1 file changed, 24 insertions(+), 5 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index e261dfdb..0f8101dd 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -32,6 +32,9 @@ from urllib.parse import urlparse import ovirtsdk4 as sdk import ovirtsdk4.types as types +# Using version 2 supporting the buffer protocol for better performance. +API_VERSION = 2 + # Timeout to wait for oVirt disks to change status, or the transfer # object to finish initializing [seconds]. timeout = 5 * 60 @@ -190,7 +193,8 @@ def request_failed(r, msg): @failing -def pread(h, count, offset): +def pread(h, buf, offset, flags): + count = len(buf) http = h['http'] headers = {"Range": "bytes=%d-%d" % (offset, offset + count - 1)} @@ -203,11 +207,26 @@ def pread(h, count, offset): "could not read sector offset %d size %d" % (offset, count)) - return r.read() + content_length = int(r.getheader("content-length")) + if content_length != count: + # Should never happen. + request_failed(r, + "unexpected Content-Length offset %d size %d got %d" % + (offset, count, content_length)) + + with memoryview(buf) as view: + got = 0 + while got < count: + n = r.readinto(view[got:]) + if n == 0: + request_failed(r, + "short read offset %d size %d got %d" % + (offset, count, got)) + got += n @failing -def pwrite(h, buf, offset): +def pwrite(h, buf, offset, flags): http = h['http'] count = len(buf) @@ -234,7 +253,7 @@ def pwrite(h, buf, offset): @failing -def zero(h, count, offset, may_trim): +def zero(h, count, offset, flags): http = h['http'] # Unlike the trim and flush calls, there is no 'can_zero' method @@ -292,7 +311,7 @@ def emulate_zero(h, count, offset): @failing -def flush(h): +def flush(h, flags): http = h['http'] # Construct the JSON request for flushing. -- 2.26.2
Nir Soffer
2021-Feb-03 21:13 UTC
[Libguestfs] [PATCH v3 3/7] v2v: rhv-upload-plugin: Support FUA
imageio does not have a FUA flag, but we can emulate it using the flush feature. With flush=y, write or zero will issue a NBD_CMD_FLUSH at the end, so the call returns only after the data is flushed to storage. The plugin reports now that it supports FUA (based on can_flush), and when FUA flag is set, it emulates fua by setting flush query (PUT) or flag (PATCH). For example, this nbd command: NBD_CMD_WRITE flags=NBD_CMD_FUA_FLAG Is translated to this http request: PUT /path?flush=y On imageio server, this translates back to: NBD_CMD_WRITE flags=0 NBD_CMD_FLUSH Imageio server uses preallocated buffer per connection. If the write request is bigger than the buffer, it sends multiple write commands. The request returns only when the flush is completed. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/rhv-upload-plugin.py | 22 +++++++++++++++++----- 1 file changed, 17 insertions(+), 5 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 0f8101dd..1c6f0603 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -161,6 +161,12 @@ def can_flush(h): return h['can_flush'] + at failing +def can_fua(h): + # imageio flush feature is is compatible with NBD_CMD_FLAG_FUA. + return h['can_flush'] + + @failing def get_size(h): return params['disk_size'] @@ -231,7 +237,9 @@ def pwrite(h, buf, offset, flags): count = len(buf) - http.putrequest("PUT", h['path'] + "?flush=n") + flush = "y" if (h['can_flush'] and (flags & nbdkit.FLAG_FUA)) else "n" + + http.putrequest("PUT", h['path'] + "?flush=" + flush) # The oVirt server only uses the first part of the range, and the # content-length. http.putheader("Content-Range", "bytes %d-%d/*" % (offset, offset + count - 1)) @@ -260,14 +268,16 @@ def zero(h, count, offset, flags): # so nbdkit could call this even if the server doesn't support # zeroing. If this is the case we must emulate. if not h['can_zero']: - emulate_zero(h, count, offset) + emulate_zero(h, count, offset, flags) return + flush = bool(h['can_flush'] and (flags & nbdkit.FLAG_FUA)) + # Construct the JSON request for zeroing. buf = json.dumps({'op': "zero", 'offset': offset, 'size': count, - 'flush': False}).encode() + 'flush': flush}).encode() headers = {"Content-Type": "application/json", "Content-Length": str(len(buf))} @@ -283,10 +293,12 @@ def zero(h, count, offset, flags): r.read() -def emulate_zero(h, count, offset): +def emulate_zero(h, count, offset, flags): http = h['http'] - http.putrequest("PUT", h['path']) + flush = "y" if (h['can_flush'] and (flags & nbdkit.FLAG_FUA)) else "n" + + http.putrequest("PUT", h['path'] + "?flush=" + flush) http.putheader("Content-Range", "bytes %d-%d/*" % (offset, offset + count - 1)) http.putheader("Content-Length", str(count)) -- 2.26.2
Nir Soffer
2021-Feb-03 21:13 UTC
[Libguestfs] [PATCH v3 4/7] v2v: rhv-upload-plugin: Get imageio connections limits
In imageio >= 2.0 the server reports the maximum number of readers and writers. We will use these limits when creating a connection pool to the server. Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/rhv-upload-plugin.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 1c6f0603..20477c91 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -135,7 +135,8 @@ def open(readonly): raise debug("imageio features: flush=%(can_flush)r " - "zero=%(can_zero)r unix_socket=%(unix_socket)r" + "zero=%(can_zero)r unix_socket=%(unix_socket)r " + "max_readers=%(max_readers)r max_writers=%(max_writers)r" % options) # Save everything we need to make requests in the handle. @@ -712,6 +713,8 @@ def get_options(http, url): "can_flush": "flush" in features, "can_zero": "zero" in features, "unix_socket": j.get('unix_socket'), + "max_readers": j.get("max_readers", 1), + "max_writers": j.get("max_writers", 1), } elif r.status == 405 or r.status == 204: @@ -721,6 +724,8 @@ def get_options(http, url): "can_flush": False, "can_zero": False, "unix_socket": None, + "max_readers": 1, + "max_writers": 1, } else: raise RuntimeError("could not use OPTIONS request: %d: %s" % -- 2.26.2
Nir Soffer
2021-Feb-03 21:13 UTC
[Libguestfs] [PATCH v3 5/7] v2v: rhv-upload-plugin: Support multiple connections
Use multiple connections to imageio server to speed up the transfer. Connections are managed via a thread safe queue. Threads remove a connection from the queue for every request, and put it back when at the end of the request. Only one thread can access the connection at the same time. Threads are accessing existing values in the handle dict, like h["path"]. They may also modify h["failed"] on errors. These operations are thread safe and do not require additional locking. Sending flush request is more tricky; imageio server does not guarantee consistency for multiple for connections, so we need to send flush command for every connection. Since flush may be called when there are in-flight requests, we wait until all in-flight request are done before sending a flush. While flushing, the connection pool is empty so new requests will block on the queue until the flush is completed. Closing is done in a similar way, waiting until all in-flight requests are done and closing all connections. Testing shows that we requests are spread over 4 connections, but performance is worse. Connection time increased from 5.36 seconds to 7.08 seconds (32% slower). [connection 1 ops, 7.053520 s] [dispatch 548 ops, 1.188910 s] [write 469 ops, 1.014646 s, 332.75 MiB, 327.95 MiB/s] [zero 77 ops, 0.058495 s, 1.13 GiB, 19.29 GiB/s] [flush 2 ops, 0.000291 s] [connection 1 ops, 7.085039 s] [dispatch 548 ops, 1.097437 s] [write 478 ops, 0.924214 s, 323.25 MiB, 349.76 MiB/s] [zero 68 ops, 0.052265 s, 1.22 GiB, 23.43 GiB/s] [flush 2 ops, 0.000258 s] [connection 1 ops, 7.037253 s] [dispatch 547 ops, 1.111386 s] [write 477 ops, 0.959592 s, 343.25 MiB, 357.70 MiB/s] [zero 68 ops, 0.047005 s, 1.20 GiB, 25.44 GiB/s] [flush 2 ops, 0.000266 s] [connection 1 ops, 7.045538 s] [dispatch 548 ops, 1.191029 s] [write 482 ops, 1.041125 s, 347.12 MiB, 333.41 MiB/s] [zero 64 ops, 0.045171 s, 1.14 GiB, 25.16 GiB/s] [flush 2 ops, 0.000210 s] Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/rhv-upload-plugin.py | 298 +++++++++++++++++++++++++-------------- 1 file changed, 192 insertions(+), 106 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 20477c91..2b0178a9 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -21,20 +21,28 @@ import functools import inspect import json import logging +import queue import socket import ssl import sys import time +from contextlib import contextmanager from http.client import HTTPSConnection, HTTPConnection from urllib.parse import urlparse +import nbdkit + import ovirtsdk4 as sdk import ovirtsdk4.types as types # Using version 2 supporting the buffer protocol for better performance. API_VERSION = 2 +# Maximum number of connection to imageio server. Based on testing with imageio +# client, this give best performance. +MAX_CONNECTIONS = 4 + # Timeout to wait for oVirt disks to change status, or the transfer # object to finish initializing [seconds]. timeout = 5 * 60 @@ -61,6 +69,14 @@ def config_complete(): raise RuntimeError("missing configuration parameters") +def thread_model(): + """ + Using parallel model to speed up transfer with multiple connections to + imageio server. + """ + return nbdkit.THREAD_MODEL_PARALLEL + + def debug(s): if params['verbose']: print(s, file=sys.stderr) @@ -129,7 +145,7 @@ def open(readonly): # See https://bugzilla.redhat.com/1916176. http.close() - http = optimize_http(http, host, options) + pool = create_http_pool(destination_url, host, options) except: cancel_transfer(connection, transfer) raise @@ -147,7 +163,8 @@ def open(readonly): 'disk_id': disk.id, 'transfer': transfer, 'failed': False, - 'http': http, + 'pool': pool, + 'connections': pool.qsize(), 'path': destination_url.path, } @@ -202,69 +219,67 @@ def request_failed(r, msg): @failing def pread(h, buf, offset, flags): count = len(buf) - http = h['http'] - headers = {"Range": "bytes=%d-%d" % (offset, offset + count - 1)} - http.request("GET", h['path'], headers=headers) - r = http.getresponse() - # 206 = HTTP Partial Content. - if r.status != 206: - request_failed(r, - "could not read sector offset %d size %d" % - (offset, count)) - - content_length = int(r.getheader("content-length")) - if content_length != count: - # Should never happen. - request_failed(r, - "unexpected Content-Length offset %d size %d got %d" % - (offset, count, content_length)) - - with memoryview(buf) as view: - got = 0 - while got < count: - n = r.readinto(view[got:]) - if n == 0: - request_failed(r, - "short read offset %d size %d got %d" % - (offset, count, got)) - got += n + with http_context(h) as http: + http.request("GET", h['path'], headers=headers) + + r = http.getresponse() + # 206 = HTTP Partial Content. + if r.status != 206: + request_failed(r, + "could not read sector offset %d size %d" % + (offset, count)) + + content_length = int(r.getheader("content-length")) + if content_length != count: + # Should never happen. + request_failed(r, + "unexpected Content-Length offset %d size %d got %d" % + (offset, count, content_length)) + + with memoryview(buf) as view: + got = 0 + while got < count: + n = r.readinto(view[got:]) + if n == 0: + request_failed(r, + "short read offset %d size %d got %d" % + (offset, count, got)) + got += n @failing def pwrite(h, buf, offset, flags): - http = h['http'] - count = len(buf) flush = "y" if (h['can_flush'] and (flags & nbdkit.FLAG_FUA)) else "n" - http.putrequest("PUT", h['path'] + "?flush=" + flush) - # The oVirt server only uses the first part of the range, and the - # content-length. - http.putheader("Content-Range", "bytes %d-%d/*" % (offset, offset + count - 1)) - http.putheader("Content-Length", str(count)) - http.endheaders() + with http_context(h) as http: + http.putrequest("PUT", h['path'] + "?flush=" + flush) + # The oVirt server only uses the first part of the range, and the + # content-length. + http.putheader("Content-Range", "bytes %d-%d/*" % + (offset, offset + count - 1)) + http.putheader("Content-Length", str(count)) + http.endheaders() - try: - http.send(buf) - except BrokenPipeError: - pass + try: + http.send(buf) + except BrokenPipeError: + pass - r = http.getresponse() - if r.status != 200: - request_failed(r, - "could not write sector offset %d size %d" % - (offset, count)) + r = http.getresponse() + if r.status != 200: + request_failed(r, + "could not write sector offset %d size %d" % + (offset, count)) - r.read() + r.read() @failing def zero(h, count, offset, flags): - http = h['http'] - # Unlike the trim and flush calls, there is no 'can_zero' method # so nbdkit could call this even if the server doesn't support # zeroing. If this is the case we must emulate. @@ -283,67 +298,68 @@ def zero(h, count, offset, flags): headers = {"Content-Type": "application/json", "Content-Length": str(len(buf))} - http.request("PATCH", h['path'], body=buf, headers=headers) + with http_context(h) as http: + http.request("PATCH", h['path'], body=buf, headers=headers) - r = http.getresponse() - if r.status != 200: - request_failed(r, - "could not zero sector offset %d size %d" % - (offset, count)) + r = http.getresponse() + if r.status != 200: + request_failed(r, + "could not zero sector offset %d size %d" % + (offset, count)) - r.read() + r.read() def emulate_zero(h, count, offset, flags): - http = h['http'] - flush = "y" if (h['can_flush'] and (flags & nbdkit.FLAG_FUA)) else "n" - http.putrequest("PUT", h['path'] + "?flush=" + flush) - http.putheader("Content-Range", - "bytes %d-%d/*" % (offset, offset + count - 1)) - http.putheader("Content-Length", str(count)) - http.endheaders() + with http_context(h) as http: + http.putrequest("PUT", h['path'] + "?flush=" + flush) + http.putheader("Content-Range", + "bytes %d-%d/*" % (offset, offset + count - 1)) + http.putheader("Content-Length", str(count)) + http.endheaders() - try: - buf = bytearray(128 * 1024) - while count > len(buf): - http.send(buf) - count -= len(buf) - http.send(memoryview(buf)[:count]) - except BrokenPipeError: - pass + try: + buf = bytearray(128 * 1024) + while count > len(buf): + http.send(buf) + count -= len(buf) + http.send(memoryview(buf)[:count]) + except BrokenPipeError: + pass - r = http.getresponse() - if r.status != 200: - request_failed(r, - "could not write zeroes offset %d size %d" % - (offset, count)) + r = http.getresponse() + if r.status != 200: + request_failed(r, + "could not write zeroes offset %d size %d" % + (offset, count)) - r.read() + r.read() @failing def flush(h, flags): - http = h['http'] - # Construct the JSON request for flushing. buf = json.dumps({'op': "flush"}).encode() headers = {"Content-Type": "application/json", "Content-Length": str(len(buf))} - http.request("PATCH", h['path'], body=buf, headers=headers) + # Wait until all inflight requests are completed, and send a flush request + # for all imageio connections. - r = http.getresponse() - if r.status != 200: - request_failed(r, "could not flush") + for http in iter_http_pool(h): + http.request("PATCH", h['path'], body=buf, headers=headers) - r.read() + r = http.getresponse() + if r.status != 200: + request_failed(r, "could not flush") + + r.read() def close(h): - http = h['http'] connection = h['connection'] transfer = h['transfer'] disk_id = h['disk_id'] @@ -354,7 +370,7 @@ def close(h): # plugin exits. sys.stderr.flush() - http.close() + close_http_pool(h) # If the connection failed earlier ensure we cancel the transfer. Canceling # the transfer will delete the disk. @@ -659,6 +675,84 @@ def transfer_supports_format(): return "format" in sig.parameters +# Connection pool managment + + +def create_http_pool(url, host, options): + pool = queue.Queue() + + count = min(options["max_readers"], + options["max_writers"], + MAX_CONNECTIONS) + + debug("creating http pool connections=%d" % count) + + unix_socket = options["unix_socket"] if host is not None else None + + for i in range(count): + http = create_http(url, unix_socket=unix_socket) + pool.put(http) + + return pool + + + at contextmanager +def http_context(h): + """ + Context manager yielding an imageio http connection from the pool. Blocks + until a connection is available. + """ + pool = h["pool"] + http = pool.get() + try: + yield http + finally: + pool.put(http) + + +def iter_http_pool(h): + """ + Wait until all inflight requests are done, and iterate on imageio + connections. + + The pool is empty during iteration. New requests issued during iteration + will block until iteration is done. + """ + pool = h["pool"] + locked = [] + + # Lock the pool by taking the connection out. + while len(locked) < h["connections"]: + locked.append(pool.get()) + + try: + for http in locked: + yield http + finally: + # Unlock the pool by puting the connection back. + for http in locked: + pool.put(http) + + +def close_http_pool(h): + """ + Wait until all inflight requests are done, close all connections and remove + them from the pool. + + No request can be served by the pool after this call. + """ + debug("closing http pool") + + pool = h["pool"] + locked = [] + + while len(locked) < h["connections"]: + locked.append(pool.get()) + + for http in locked: + http.close() + + # oVirt imageio operations @@ -677,12 +771,20 @@ def parse_transfer_url(transfer): return urlparse(transfer.proxy_url) -def create_http(url): +def create_http(url, unix_socket=None): """ Create http connection for transfer url. Returns HTTPConnection. """ + if unix_socket: + debug("creating unix http connection socket=%r" % unix_socket) + try: + return UnixHTTPConnection(unix_socket) + except Exception as e: + # Very unlikely, but we can recover by using https. + debug("cannot create unix socket connection: %s" % e) + if url.scheme == "https": context = \ ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, @@ -691,8 +793,12 @@ def create_http(url): context.check_hostname = False context.verify_mode = ssl.CERT_NONE + debug("creating https connection host=%s port=%s" % + (url.hostname, url.port)) return HTTPSConnection(url.hostname, url.port, context=context) elif url.scheme == "http": + debug("creating http connection host=%s port=%s" % + (url.hostname, url.port)) return HTTPConnection(url.hostname, url.port) else: raise RuntimeError("unknown URL scheme (%s)" % url.scheme) @@ -730,23 +836,3 @@ def get_options(http, url): else: raise RuntimeError("could not use OPTIONS request: %d: %s" % (r.status, r.reason)) - - -def optimize_http(http, host, options): - """ - Return an optimized http connection using unix socket if we are connected - to imageio server on the local host and it features a unix socket. - """ - unix_socket = options['unix_socket'] - - if host is not None and unix_socket is not None: - try: - http = UnixHTTPConnection(unix_socket) - except Exception as e: - # Very unlikely failure, but we can recover by using the https - # connection. - debug("cannot create unix socket connection, using https: %s" % e) - else: - debug("optimizing connection using unix socket %r" % unix_socket) - - return http -- 2.26.2
Nir Soffer
2021-Feb-03 21:13 UTC
[Libguestfs] [PATCH v3 6/7] v2v: rhv-upload: Use out of order writes
Without out of order writes, qemu-img read data in parallel, but issue requests in order, keeping only one in-flight request. With out of order writes, qemu keeps up to 8 in-flight requests by default. Add output#write_out_of_order method. Output plugins that can handle out of order writes should override this method to return true. This enables the -W flag in qemu-img convert. With this change we see significant improvement in the rhv-upload-plugin. Connection time decreased from 7.08 seconds to 3.77 seconds, 88% faster. Compared with last version using single connection, we are now 42% faster. [connection 1 ops, 3.778596 s] [dispatch 547 ops, 2.977509 s] [write 464 ops, 2.703108 s, 355.62 MiB, 131.56 MiB/s] [zero 81 ops, 0.118312 s, 1.22 GiB, 10.31 GiB/s] [flush 2 ops, 0.000222 s] [connection 1 ops, 3.774985 s] [dispatch 555 ops, 2.976595 s] [write 490 ops, 2.685608 s, 352.19 MiB, 131.14 MiB/s] [zero 63 ops, 0.122802 s, 1.20 GiB, 9.80 GiB/s] [flush 2 ops, 0.000203 s] [connection 1 ops, 3.777071 s] [dispatch 564 ops, 2.930730 s] [write 499 ops, 2.631180 s, 323.38 MiB, 122.90 MiB/s] [zero 63 ops, 0.119163 s, 1.17 GiB, 9.86 GiB/s] [flush 2 ops, 0.000255 s] [connection 1 ops, 3.778360 s] [dispatch 528 ops, 2.979062 s] [write 456 ops, 2.676340 s, 318.00 MiB, 118.82 MiB/s] [zero 70 ops, 0.118221 s, 1.08 GiB, 9.18 GiB/s] [flush 2 ops, 0.000202 s] Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/output_rhv_upload.ml | 2 ++ v2v/types.ml | 1 + v2v/types.mli | 2 ++ v2v/v2v.ml | 1 + 4 files changed, 6 insertions(+) diff --git a/v2v/output_rhv_upload.ml b/v2v/output_rhv_upload.ml index 40768519..a9dfe39b 100644 --- a/v2v/output_rhv_upload.ml +++ b/v2v/output_rhv_upload.ml @@ -283,6 +283,8 @@ object (* rhev-apt.exe will be installed (if available). *) method install_rhev_apt = true + method write_out_of_order = true + method prepare_targets source_name overlays guestcaps let rhv_cluster_name match List.assoc "rhv_cluster" json_params with diff --git a/v2v/types.ml b/v2v/types.ml index 53daefed..a8949e4b 100644 --- a/v2v/types.ml +++ b/v2v/types.ml @@ -536,6 +536,7 @@ class virtual output = object method virtual create_metadata : source -> target list -> target_buses -> guestcaps -> inspect -> target_firmware -> unit method keep_serial_console = true method install_rhev_apt = false + method write_out_of_order = false end type output_settings = < keep_serial_console : bool; diff --git a/v2v/types.mli b/v2v/types.mli index a9b0a70e..f474dcaa 100644 --- a/v2v/types.mli +++ b/v2v/types.mli @@ -512,6 +512,8 @@ class virtual output : object (** Whether this output supports serial consoles (RHV does not). *) method install_rhev_apt : bool (** If [rhev-apt.exe] should be installed (only for RHV). *) + method write_out_of_order : bool + (** Whether this output supports out of order writes. *) end (** Encapsulates all [-o], etc output arguments as an object. *) diff --git a/v2v/v2v.ml b/v2v/v2v.ml index 54f26297..5019290a 100644 --- a/v2v/v2v.ml +++ b/v2v/v2v.ml @@ -763,6 +763,7 @@ and copy_targets cmdline targets input output [ "-n"; "-f"; "qcow2"; "-O"; output#transfer_format t ] @ (if cmdline.compressed then [ "-c" ] else []) @ [ "-S"; "64k" ] @ + (if output#write_out_of_order then [ "-W" ] else []) @ [ overlay_file; filename ] in let start_time = gettimeofday () in if run_command cmd <> 0 then -- 2.26.2
Nir Soffer
2021-Feb-03 21:13 UTC
[Libguestfs] [PATCH v3 7/7] v2v: rvh-upload: Match nbdkit threads to qemu-img coroutines
qemu-img is using 8 parallel coroutines by default. I tested up to 16 parallel coroutines and it seems that 8 gives good results. nbdkit uses 16 threads by default. Testing nbdkit with qemu-img show that 8 threads give good results. Make nbdkit thread count configurable, and configure it to 8 threads in the rhv upload output. Testing rhv-upload-plugin show small improvement (~6%) in total connection time. Compared with last version using single connection, we are now 50% faster. Results are not stable, we need to test this with bigger images and real environment. [connection 1 ops, 3.561693 s] [dispatch 550 ops, 2.808350 s] [write 470 ops, 2.482875 s, 316.06 MiB, 127.30 MiB/s] [zero 78 ops, 0.178174 s, 1.26 GiB, 7.05 GiB/s] [flush 2 ops, 0.000211 s] [connection 1 ops, 3.561724 s] [dispatch 543 ops, 2.836738 s] [write 472 ops, 2.503561 s, 341.62 MiB, 136.46 MiB/s] [zero 69 ops, 0.162465 s, 1.12 GiB, 6.89 GiB/s] [flush 2 ops, 0.000181 s] [connection 1 ops, 3.566931 s] [dispatch 536 ops, 2.807226 s] [write 462 ops, 2.508345 s, 326.12 MiB, 130.02 MiB/s] [zero 72 ops, 0.141442 s, 1.30 GiB, 9.20 GiB/s] [flush 2 ops, 0.000158 s] [connection 1 ops, 3.564396 s] [dispatch 563 ops, 2.853623 s] [write 503 ops, 2.592482 s, 361.44 MiB, 139.42 MiB/s] [zero 58 ops, 0.113708 s, 1.01 GiB, 8.88 GiB/s] [flush 2 ops, 0.000149 s] Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/nbdkit.ml | 4 ++++ v2v/nbdkit.mli | 1 + v2v/output_rhv_upload.ml | 3 +++ 3 files changed, 8 insertions(+) diff --git a/v2v/nbdkit.ml b/v2v/nbdkit.ml index 46b20c9d..c7a92bf0 100644 --- a/v2v/nbdkit.ml +++ b/v2v/nbdkit.ml @@ -72,6 +72,7 @@ type cmd = { exportname : string option; readonly : bool; selinux_label : string option; + threads : int; verbose : bool; } @@ -84,6 +85,7 @@ let new_cmd = { exportname = None; readonly = false; selinux_label = None; + threads = 16; verbose = false; } @@ -93,6 +95,7 @@ let add_debug_flag cmd name value let set_exportname cmd v = { cmd with exportname = Some v } let set_readonly cmd v = { cmd with readonly = v } let set_selinux_label cmd v = { cmd with selinux_label = v } +let set_threads cmd v = { cmd with threads = v } let set_verbose cmd v = { cmd with verbose = v } let set_plugin cmd v = { cmd with plugin = Some v } let add_filter cmd v = { cmd with filters = v :: cmd.filters } @@ -136,6 +139,7 @@ let run_unix cmd add_arg "--newstyle"; add_arg "--pidfile"; add_arg pidfile; add_arg "--unix"; add_arg sock; + add_arg "--threads"; add_arg (string_of_int cmd.threads); (* Reduce verbosity in nbdkit >= 1.17.4. *) let version = version (config ()) in diff --git a/v2v/nbdkit.mli b/v2v/nbdkit.mli index d927fe40..85eb75b6 100644 --- a/v2v/nbdkit.mli +++ b/v2v/nbdkit.mli @@ -56,6 +56,7 @@ val add_debug_flag : cmd -> string -> string -> cmd val set_exportname : cmd -> string -> cmd val set_readonly : cmd -> bool -> cmd val set_selinux_label : cmd -> string option -> cmd +val set_threads : cmd -> int -> cmd val set_verbose : cmd -> bool -> cmd (** Set various command line flags. *) diff --git a/v2v/output_rhv_upload.ml b/v2v/output_rhv_upload.ml index a9dfe39b..dbef7011 100644 --- a/v2v/output_rhv_upload.ml +++ b/v2v/output_rhv_upload.ml @@ -208,6 +208,9 @@ class output_rhv_upload output_alloc output_conn let nbdkit_cmd = Nbdkit.set_plugin nbdkit_cmd nbdkit_python_plugin in let nbdkit_cmd = Nbdkit.add_arg nbdkit_cmd "script" (Python_script.path plugin_script) in + (* Match number of parallel coroutines in qemu-img *) + let nbdkit_cmd = Nbdkit.set_threads nbdkit_cmd 8 in + let nbdkit_cmd if have_selinux then (* Label the socket so qemu can open it. *) -- 2.26.2