Nir Soffer
2021-Jan-22 22:45 UTC
[Libguestfs] [PATCH 4/6] v2v: rhv-upload-plugin: Support multiple connections
Use multiple connections to imageio server to speed up the transfer. Connections are managed via a thread safe queue. Threads remove a connection from the queue for every request, and put it back when at the end of the request. Only one thread can access the connection at the same time. Threads are accessing existing values in the handle dict, like h["path"]. They may also modify h["failed"] on errors. These operations are thread safe and do not require additional locking. Sending flush request is more tricky; on imageio side, we have one qemu-nbd server, with multiple connections. I'm not sure if sending one flush command on one of the connections is good enough to flush all commands, so we send flush command on all connections in the flush callback. Since flush may be called when there are in-flight requests, we wait until all in-flight request are done before sending a flush. While flushing, the connection pool is empty so new requests will block on the queue until the flush is completed. Closing is done in a similar way, waiting until all in-flight requests are done and closing all connections. Testing shows that we requests are spread over 4 connections, but performance is worse. Connection time increased from 5.36 seconds to 7.08 seconds (32% slower). [connection 1 ops, 7.053520 s] [dispatch 548 ops, 1.188910 s] [write 469 ops, 1.014646 s, 332.75 MiB, 327.95 MiB/s] [zero 77 ops, 0.058495 s, 1.13 GiB, 19.29 GiB/s] [flush 2 ops, 0.000291 s] [connection 1 ops, 7.085039 s] [dispatch 548 ops, 1.097437 s] [write 478 ops, 0.924214 s, 323.25 MiB, 349.76 MiB/s] [zero 68 ops, 0.052265 s, 1.22 GiB, 23.43 GiB/s] [flush 2 ops, 0.000258 s] [connection 1 ops, 7.037253 s] [dispatch 547 ops, 1.111386 s] [write 477 ops, 0.959592 s, 343.25 MiB, 357.70 MiB/s] [zero 68 ops, 0.047005 s, 1.20 GiB, 25.44 GiB/s] [flush 2 ops, 0.000266 s] [connection 1 ops, 7.045538 s] [dispatch 548 ops, 1.191029 s] [write 482 ops, 1.041125 s, 347.12 MiB, 333.41 MiB/s] [zero 64 ops, 0.045171 s, 1.14 GiB, 25.16 GiB/s] [flush 2 ops, 0.000210 s] Signed-off-by: Nir Soffer <nsoffer at redhat.com> --- v2v/rhv-upload-plugin.py | 293 +++++++++++++++++++++++++-------------- 1 file changed, 187 insertions(+), 106 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index e639be0c..f3242f87 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -21,20 +21,28 @@ import functools import inspect import json import logging +import queue import socket import ssl import sys import time +from contextlib import contextmanager from http.client import HTTPSConnection, HTTPConnection from urllib.parse import urlparse +import nbdkit + import ovirtsdk4 as sdk import ovirtsdk4.types as types # Using version 2 supporting the buffer protocol for better performance. API_VERSION = 2 +# Maximum number of connection to imageio server. Based on testing with imageio +# client, this give best performance. +MAX_CONNECTIONS = 4 + # Timeout to wait for oVirt disks to change status, or the transfer # object to finish initializing [seconds]. timeout = 5 * 60 @@ -61,6 +69,14 @@ def config_complete(): raise RuntimeError("missing configuration parameters") +def thread_model(): + """ + Using parallel model to speed up transfer with multiple connections to + imageio server. + """ + return nbdkit.THREAD_MODEL_PARALLEL + + def debug(s): if params['verbose']: print(s, file=sys.stderr) @@ -129,7 +145,7 @@ def open(readonly): # See https://bugzilla.redhat.com/1916176. http.close() - http = optimize_http(http, host, options) + pool = create_http_pool(destination_url, host, options) except: cancel_transfer(connection, transfer) raise @@ -147,7 +163,8 @@ def open(readonly): 'disk_id': disk.id, 'transfer': transfer, 'failed': False, - 'http': http, + 'pool': pool, + 'connections': pool.qsize(), 'path': destination_url.path, } @@ -196,67 +213,65 @@ def request_failed(r, msg): @failing def pread(h, buf, offset, flags): count = len(buf) - http = h['http'] - headers = {"Range": "bytes=%d-%d" % (offset, offset + count - 1)} - http.request("GET", h['path'], headers=headers) - r = http.getresponse() - # 206 = HTTP Partial Content. - if r.status != 206: - request_failed(r, - "could not read sector offset %d size %d" % - (offset, count)) - - content_length = int(r.getheader("content-length")) - if content_length != count: - # Should never happen. - request_failed(r, - "unexpected Content-Length offset %d size %d got %d" % - (offset, count, content_length)) - - with memoryview(buf) as view: - got = 0 - while got < count: - n = r.readinto(view[got:]) - if n == 0: - request_failed(r, - "short read offset %d size %d got %d" % - (offset, count, got)) - got += n + with http_context(h) as http: + http.request("GET", h['path'], headers=headers) + + r = http.getresponse() + # 206 = HTTP Partial Content. + if r.status != 206: + request_failed(r, + "could not read sector offset %d size %d" % + (offset, count)) + + content_length = int(r.getheader("content-length")) + if content_length != count: + # Should never happen. + request_failed(r, + "unexpected Content-Length offset %d size %d got %d" % + (offset, count, content_length)) + + with memoryview(buf) as view: + got = 0 + while got < count: + n = r.readinto(view[got:]) + if n == 0: + request_failed(r, + "short read offset %d size %d got %d" % + (offset, count, got)) + got += n @failing def pwrite(h, buf, offset, flags): - http = h['http'] - count = len(buf) - http.putrequest("PUT", h['path'] + "?flush=n") - # The oVirt server only uses the first part of the range, and the - # content-length. - http.putheader("Content-Range", "bytes %d-%d/*" % (offset, offset + count - 1)) - http.putheader("Content-Length", str(count)) - http.endheaders() + with http_context(h) as http: + http.putrequest("PUT", h['path'] + "?flush=n") + # The oVirt server only uses the first part of the range, and the + # content-length. + http.putheader("Content-Range", "bytes %d-%d/*" % + (offset, offset + count - 1)) + http.putheader("Content-Length", str(count)) + http.endheaders() - try: - http.send(buf) - except BrokenPipeError: - pass + try: + http.send(buf) + except BrokenPipeError: + pass - r = http.getresponse() - if r.status != 200: - request_failed(r, - "could not write sector offset %d size %d" % - (offset, count)) + r = http.getresponse() + if r.status != 200: + request_failed(r, + "could not write sector offset %d size %d" % + (offset, count)) - r.read() + r.read() @failing def zero(h, count, offset, flags): - http = h['http'] - # Unlike the trim and flush calls, there is no 'can_zero' method # so nbdkit could call this even if the server doesn't support # zeroing. If this is the case we must emulate. @@ -273,65 +288,66 @@ def zero(h, count, offset, flags): headers = {"Content-Type": "application/json", "Content-Length": str(len(buf))} - http.request("PATCH", h['path'], body=buf, headers=headers) + with http_context(h) as http: + http.request("PATCH", h['path'], body=buf, headers=headers) - r = http.getresponse() - if r.status != 200: - request_failed(r, - "could not zero sector offset %d size %d" % - (offset, count)) + r = http.getresponse() + if r.status != 200: + request_failed(r, + "could not zero sector offset %d size %d" % + (offset, count)) - r.read() + r.read() def emulate_zero(h, count, offset): - http = h['http'] + with http_context(h) as http: + http.putrequest("PUT", h['path']) + http.putheader("Content-Range", + "bytes %d-%d/*" % (offset, offset + count - 1)) + http.putheader("Content-Length", str(count)) + http.endheaders() - http.putrequest("PUT", h['path']) - http.putheader("Content-Range", - "bytes %d-%d/*" % (offset, offset + count - 1)) - http.putheader("Content-Length", str(count)) - http.endheaders() - - try: - buf = bytearray(128 * 1024) - while count > len(buf): - http.send(buf) - count -= len(buf) - http.send(memoryview(buf)[:count]) - except BrokenPipeError: - pass + try: + buf = bytearray(128 * 1024) + while count > len(buf): + http.send(buf) + count -= len(buf) + http.send(memoryview(buf)[:count]) + except BrokenPipeError: + pass - r = http.getresponse() - if r.status != 200: - request_failed(r, - "could not write zeroes offset %d size %d" % - (offset, count)) + r = http.getresponse() + if r.status != 200: + request_failed(r, + "could not write zeroes offset %d size %d" % + (offset, count)) - r.read() + r.read() @failing def flush(h, flags): - http = h['http'] - # Construct the JSON request for flushing. buf = json.dumps({'op': "flush"}).encode() headers = {"Content-Type": "application/json", "Content-Length": str(len(buf))} - http.request("PATCH", h['path'], body=buf, headers=headers) + # Wait until all inflight requests are completed, and send a flush request + # for all imageio connections. - r = http.getresponse() - if r.status != 200: - request_failed(r, "could not flush") + for http in iter_http_pool(h): + http.request("PATCH", h['path'], body=buf, headers=headers) + + r = http.getresponse() + if r.status != 200: + request_failed(r, "could not flush") - r.read() + r.read() def close(h): - http = h['http'] connection = h['connection'] transfer = h['transfer'] disk_id = h['disk_id'] @@ -342,7 +358,7 @@ def close(h): # plugin exits. sys.stderr.flush() - http.close() + close_http_pool(h) # If the connection failed earlier ensure we cancel the transfer. Canceling # the transfer will delete the disk. @@ -647,6 +663,81 @@ def transfer_supports_format(): return "format" in sig.parameters +# Connection pool managment + + +def create_http_pool(url, host, options): + pool = queue.Queue() + + count = min(options["max_readers"], + options["max_writers"], + MAX_CONNECTIONS) + debug("using %d connections" % count) + + unix_socket = options["unix_socket"] if host is not None else None + + for i in range(count): + http = create_http(url, unix_socket=unix_socket) + pool.put(http) + + return pool + + + at contextmanager +def http_context(h): + """ + Context manager yielding an imageio http connection from the pool. Blocks + until a connection is available. + """ + pool = h["pool"] + http = pool.get() + try: + yield http + finally: + pool.put(http) + + +def iter_http_pool(h): + """ + Wait until all inflight requests are done, and iterate on imageio + connections. + + The pool is empty during iteration. New requests issued during iteration + will block until iteration is done. + """ + pool = h["pool"] + locked = [] + + # Lock the pool by taking the connection out. + while len(locked) < h["connections"]: + locked.append(pool.get()) + + try: + for http in locked: + yield http + finally: + # Unlock the pool by puting the connection back. + for http in locked: + pool.put(http) + + +def close_http_pool(h): + """ + Wait until all inflight requests are done, close all connections and remove + them from the pool. + + No request can be served by the pool after this call. + """ + pool = h["pool"] + locked = [] + + while len(locked) < h["connections"]: + locked.append(pool.get()) + + for http in locked: + http.close() + + # oVirt imageio operations @@ -665,12 +756,20 @@ def parse_transfer_url(transfer): return urlparse(transfer.proxy_url) -def create_http(url): +def create_http(url, unix_socket=None): """ Create http connection for transfer url. Returns HTTPConnection. """ + if unix_socket: + debug("using unix socket %r" % unix_socket) + try: + return UnixHTTPConnection(unix_socket) + except Exception as e: + # Very unlikely, but we can recover by using https. + debug("cannot create unix socket connection: %s" % e) + if url.scheme == "https": context = \ ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, @@ -679,8 +778,10 @@ def create_http(url): context.check_hostname = False context.verify_mode = ssl.CERT_NONE + debug("using https connection") return HTTPSConnection(url.hostname, url.port, context=context) elif url.scheme == "http": + debug("using http connection") return HTTPConnection(url.hostname, url.port) else: raise RuntimeError("unknown URL scheme (%s)" % url.scheme) @@ -718,23 +819,3 @@ def get_options(http, url): else: raise RuntimeError("could not use OPTIONS request: %d: %s" % (r.status, r.reason)) - - -def optimize_http(http, host, options): - """ - Return an optimized http connection using unix socket if we are connected - to imageio server on the local host and it features a unix socket. - """ - unix_socket = options['unix_socket'] - - if host is not None and unix_socket is not None: - try: - http = UnixHTTPConnection(unix_socket) - except Exception as e: - # Very unlikely failure, but we can recover by using the https - # connection. - debug("cannot create unix socket connection, using https: %s" % e) - else: - debug("optimizing connection using unix socket %r" % unix_socket) - - return http -- 2.26.2
Richard W.M. Jones
2021-Jan-23 06:38 UTC
[Libguestfs] [PATCH 4/6] v2v: rhv-upload-plugin: Support multiple connections
On Sat, Jan 23, 2021 at 12:45:22AM +0200, Nir Soffer wrote:> Use multiple connections to imageio server to speed up the transfer. > > Connections are managed via a thread safe queue. Threads remove > a connection from the queue for every request, and put it back when at > the end of the request. Only one thread can access the connection at the > same time. > > Threads are accessing existing values in the handle dict, like > h["path"]. They may also modify h["failed"] on errors. These operations > are thread safe and do not require additional locking. > > Sending flush request is more tricky; on imageio side, we have one > qemu-nbd server, with multiple connections. I'm not sure if sending one > flush command on one of the connections is good enough to flush all > commands, so we send flush command on all connections in the flush > callback.I know the answer to this! It depends if the NBD server advertises multi-conn or not. With libnbd you can find out by querying nbd_can_multi_conn on any of the connections (if the server is behaving, the answer should be identical for any connection with the same exportname). See: https://github.com/NetworkBlockDevice/nbd/blob/master/doc/proto.md "bit 8, NBD_FLAG_CAN_MULTI_CONN: Indicates that the server operates entirely without cache, or that the cache it uses is shared among all connections to the given device. In particular, if this flag is present, then the effects of NBD_CMD_FLUSH and NBD_CMD_FLAG_FUA MUST be visible across all connections when the server sends its reply to that command to the client. In the absence of this flag, clients SHOULD NOT multiplex their commands over more than one connection to the export." For unclear reasons qemu-nbd only advertises multi-conn for r/o connections, assuming my reading of the code is correct. For nbdkit we went through the plugins a long time ago and made them advertise (or not) multi-conn as appropriate.> Since flush may be called when there are in-flight requests, we wait > until all in-flight request are done before sending a flush. While > flushing, the connection pool is empty so new requests will block on the > queue until the flush is completed. > > Closing is done in a similar way, waiting until all in-flight requests > are done and closing all connections. > > Testing shows that we requests are spread over 4 connections, but > performance is worse. Connection time increased from 5.36 seconds to > 7.08 seconds (32% slower). > > [connection 1 ops, 7.053520 s] > [dispatch 548 ops, 1.188910 s] > [write 469 ops, 1.014646 s, 332.75 MiB, 327.95 MiB/s] > [zero 77 ops, 0.058495 s, 1.13 GiB, 19.29 GiB/s] > [flush 2 ops, 0.000291 s] > > [connection 1 ops, 7.085039 s] > [dispatch 548 ops, 1.097437 s] > [write 478 ops, 0.924214 s, 323.25 MiB, 349.76 MiB/s] > [zero 68 ops, 0.052265 s, 1.22 GiB, 23.43 GiB/s] > [flush 2 ops, 0.000258 s] > > [connection 1 ops, 7.037253 s] > [dispatch 547 ops, 1.111386 s] > [write 477 ops, 0.959592 s, 343.25 MiB, 357.70 MiB/s] > [zero 68 ops, 0.047005 s, 1.20 GiB, 25.44 GiB/s] > [flush 2 ops, 0.000266 s] > > [connection 1 ops, 7.045538 s] > [dispatch 548 ops, 1.191029 s] > [write 482 ops, 1.041125 s, 347.12 MiB, 333.41 MiB/s] > [zero 64 ops, 0.045171 s, 1.14 GiB, 25.16 GiB/s] > [flush 2 ops, 0.000210 s]Unexpected ...> Signed-off-by: Nir Soffer <nsoffer at redhat.com> > --- > v2v/rhv-upload-plugin.py | 293 +++++++++++++++++++++++++-------------- > 1 file changed, 187 insertions(+), 106 deletions(-) > > diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py > index e639be0c..f3242f87 100644 > --- a/v2v/rhv-upload-plugin.py > +++ b/v2v/rhv-upload-plugin.py > @@ -21,20 +21,28 @@ import functools > import inspect > import json > import logging > +import queue > import socket > import ssl > import sys > import time > > +from contextlib import contextmanager > from http.client import HTTPSConnection, HTTPConnection > from urllib.parse import urlparse > > +import nbdkit > + > import ovirtsdk4 as sdk > import ovirtsdk4.types as types > > # Using version 2 supporting the buffer protocol for better performance. > API_VERSION = 2 > > +# Maximum number of connection to imageio server. Based on testing with imageio > +# client, this give best performance. > +MAX_CONNECTIONS = 4 > + > # Timeout to wait for oVirt disks to change status, or the transfer > # object to finish initializing [seconds]. > timeout = 5 * 60 > @@ -61,6 +69,14 @@ def config_complete(): > raise RuntimeError("missing configuration parameters") > > > +def thread_model(): > + """ > + Using parallel model to speed up transfer with multiple connections to > + imageio server. > + """ > + return nbdkit.THREAD_MODEL_PARALLEL > + > + > def debug(s): > if params['verbose']: > print(s, file=sys.stderr) > @@ -129,7 +145,7 @@ def open(readonly): > # See https://bugzilla.redhat.com/1916176. > http.close() > > - http = optimize_http(http, host, options) > + pool = create_http_pool(destination_url, host, options) > except: > cancel_transfer(connection, transfer) > raise > @@ -147,7 +163,8 @@ def open(readonly): > 'disk_id': disk.id, > 'transfer': transfer, > 'failed': False, > - 'http': http, > + 'pool': pool, > + 'connections': pool.qsize(), > 'path': destination_url.path, > } > > @@ -196,67 +213,65 @@ def request_failed(r, msg): > @failing > def pread(h, buf, offset, flags): > count = len(buf) > - http = h['http'] > - > headers = {"Range": "bytes=%d-%d" % (offset, offset + count - 1)} > - http.request("GET", h['path'], headers=headers) > > - r = http.getresponse() > - # 206 = HTTP Partial Content. > - if r.status != 206: > - request_failed(r, > - "could not read sector offset %d size %d" % > - (offset, count)) > - > - content_length = int(r.getheader("content-length")) > - if content_length != count: > - # Should never happen. > - request_failed(r, > - "unexpected Content-Length offset %d size %d got %d" % > - (offset, count, content_length)) > - > - with memoryview(buf) as view: > - got = 0 > - while got < count: > - n = r.readinto(view[got:]) > - if n == 0: > - request_failed(r, > - "short read offset %d size %d got %d" % > - (offset, count, got)) > - got += n > + with http_context(h) as http: > + http.request("GET", h['path'], headers=headers) > + > + r = http.getresponse() > + # 206 = HTTP Partial Content. > + if r.status != 206: > + request_failed(r, > + "could not read sector offset %d size %d" % > + (offset, count)) > + > + content_length = int(r.getheader("content-length")) > + if content_length != count: > + # Should never happen. > + request_failed(r, > + "unexpected Content-Length offset %d size %d got %d" % > + (offset, count, content_length)) > + > + with memoryview(buf) as view: > + got = 0 > + while got < count: > + n = r.readinto(view[got:]) > + if n == 0: > + request_failed(r, > + "short read offset %d size %d got %d" % > + (offset, count, got)) > + got += n > > > @failing > def pwrite(h, buf, offset, flags): > - http = h['http'] > - > count = len(buf) > > - http.putrequest("PUT", h['path'] + "?flush=n") > - # The oVirt server only uses the first part of the range, and the > - # content-length. > - http.putheader("Content-Range", "bytes %d-%d/*" % (offset, offset + count - 1)) > - http.putheader("Content-Length", str(count)) > - http.endheaders() > + with http_context(h) as http: > + http.putrequest("PUT", h['path'] + "?flush=n") > + # The oVirt server only uses the first part of the range, and the > + # content-length. > + http.putheader("Content-Range", "bytes %d-%d/*" % > + (offset, offset + count - 1)) > + http.putheader("Content-Length", str(count)) > + http.endheaders() > > - try: > - http.send(buf) > - except BrokenPipeError: > - pass > + try: > + http.send(buf) > + except BrokenPipeError: > + pass > > - r = http.getresponse() > - if r.status != 200: > - request_failed(r, > - "could not write sector offset %d size %d" % > - (offset, count)) > + r = http.getresponse() > + if r.status != 200: > + request_failed(r, > + "could not write sector offset %d size %d" % > + (offset, count)) > > - r.read() > + r.read() > > > @failing > def zero(h, count, offset, flags): > - http = h['http'] > - > # Unlike the trim and flush calls, there is no 'can_zero' method > # so nbdkit could call this even if the server doesn't support > # zeroing. If this is the case we must emulate. > @@ -273,65 +288,66 @@ def zero(h, count, offset, flags): > headers = {"Content-Type": "application/json", > "Content-Length": str(len(buf))} > > - http.request("PATCH", h['path'], body=buf, headers=headers) > + with http_context(h) as http: > + http.request("PATCH", h['path'], body=buf, headers=headers) > > - r = http.getresponse() > - if r.status != 200: > - request_failed(r, > - "could not zero sector offset %d size %d" % > - (offset, count)) > + r = http.getresponse() > + if r.status != 200: > + request_failed(r, > + "could not zero sector offset %d size %d" % > + (offset, count)) > > - r.read() > + r.read() > > > def emulate_zero(h, count, offset): > - http = h['http'] > + with http_context(h) as http: > + http.putrequest("PUT", h['path']) > + http.putheader("Content-Range", > + "bytes %d-%d/*" % (offset, offset + count - 1)) > + http.putheader("Content-Length", str(count)) > + http.endheaders() > > - http.putrequest("PUT", h['path']) > - http.putheader("Content-Range", > - "bytes %d-%d/*" % (offset, offset + count - 1)) > - http.putheader("Content-Length", str(count)) > - http.endheaders() > - > - try: > - buf = bytearray(128 * 1024) > - while count > len(buf): > - http.send(buf) > - count -= len(buf) > - http.send(memoryview(buf)[:count]) > - except BrokenPipeError: > - pass > + try: > + buf = bytearray(128 * 1024) > + while count > len(buf): > + http.send(buf) > + count -= len(buf) > + http.send(memoryview(buf)[:count]) > + except BrokenPipeError: > + pass > > - r = http.getresponse() > - if r.status != 200: > - request_failed(r, > - "could not write zeroes offset %d size %d" % > - (offset, count)) > + r = http.getresponse() > + if r.status != 200: > + request_failed(r, > + "could not write zeroes offset %d size %d" % > + (offset, count)) > > - r.read() > + r.read() > > > @failing > def flush(h, flags): > - http = h['http'] > - > # Construct the JSON request for flushing. > buf = json.dumps({'op': "flush"}).encode() > > headers = {"Content-Type": "application/json", > "Content-Length": str(len(buf))} > > - http.request("PATCH", h['path'], body=buf, headers=headers) > + # Wait until all inflight requests are completed, and send a flush request > + # for all imageio connections. > > - r = http.getresponse() > - if r.status != 200: > - request_failed(r, "could not flush") > + for http in iter_http_pool(h): > + http.request("PATCH", h['path'], body=buf, headers=headers) > + > + r = http.getresponse() > + if r.status != 200: > + request_failed(r, "could not flush") > > - r.read() > + r.read() > > > def close(h): > - http = h['http'] > connection = h['connection'] > transfer = h['transfer'] > disk_id = h['disk_id'] > @@ -342,7 +358,7 @@ def close(h): > # plugin exits. > sys.stderr.flush() > > - http.close() > + close_http_pool(h) > > # If the connection failed earlier ensure we cancel the transfer. Canceling > # the transfer will delete the disk. > @@ -647,6 +663,81 @@ def transfer_supports_format(): > return "format" in sig.parameters > > > +# Connection pool managment > + > + > +def create_http_pool(url, host, options): > + pool = queue.Queue() > + > + count = min(options["max_readers"], > + options["max_writers"], > + MAX_CONNECTIONS) > + debug("using %d connections" % count) > + > + unix_socket = options["unix_socket"] if host is not None else None > + > + for i in range(count): > + http = create_http(url, unix_socket=unix_socket) > + pool.put(http) > + > + return pool > + > + > + at contextmanager > +def http_context(h): > + """ > + Context manager yielding an imageio http connection from the pool. Blocks > + until a connection is available. > + """ > + pool = h["pool"] > + http = pool.get() > + try: > + yield http > + finally: > + pool.put(http) > + > + > +def iter_http_pool(h): > + """ > + Wait until all inflight requests are done, and iterate on imageio > + connections. > + > + The pool is empty during iteration. New requests issued during iteration > + will block until iteration is done. > + """ > + pool = h["pool"] > + locked = [] > + > + # Lock the pool by taking the connection out. > + while len(locked) < h["connections"]: > + locked.append(pool.get()) > + > + try: > + for http in locked: > + yield http > + finally: > + # Unlock the pool by puting the connection back. > + for http in locked: > + pool.put(http) > + > + > +def close_http_pool(h): > + """ > + Wait until all inflight requests are done, close all connections and remove > + them from the pool. > + > + No request can be served by the pool after this call. > + """ > + pool = h["pool"] > + locked = [] > + > + while len(locked) < h["connections"]: > + locked.append(pool.get()) > + > + for http in locked: > + http.close() > + > + > # oVirt imageio operations > > > @@ -665,12 +756,20 @@ def parse_transfer_url(transfer): > return urlparse(transfer.proxy_url) > > > -def create_http(url): > +def create_http(url, unix_socket=None): > """ > Create http connection for transfer url. > > Returns HTTPConnection. > """ > + if unix_socket: > + debug("using unix socket %r" % unix_socket) > + try: > + return UnixHTTPConnection(unix_socket) > + except Exception as e: > + # Very unlikely, but we can recover by using https. > + debug("cannot create unix socket connection: %s" % e) > + > if url.scheme == "https": > context = \ > ssl.create_default_context(purpose=ssl.Purpose.SERVER_AUTH, > @@ -679,8 +778,10 @@ def create_http(url): > context.check_hostname = False > context.verify_mode = ssl.CERT_NONE > > + debug("using https connection") > return HTTPSConnection(url.hostname, url.port, context=context) > elif url.scheme == "http": > + debug("using http connection") > return HTTPConnection(url.hostname, url.port) > else: > raise RuntimeError("unknown URL scheme (%s)" % url.scheme) > @@ -718,23 +819,3 @@ def get_options(http, url): > else: > raise RuntimeError("could not use OPTIONS request: %d: %s" % > (r.status, r.reason)) > - > - > -def optimize_http(http, host, options): > - """ > - Return an optimized http connection using unix socket if we are connected > - to imageio server on the local host and it features a unix socket. > - """ > - unix_socket = options['unix_socket'] > - > - if host is not None and unix_socket is not None: > - try: > - http = UnixHTTPConnection(unix_socket) > - except Exception as e: > - # Very unlikely failure, but we can recover by using the https > - # connection. > - debug("cannot create unix socket connection, using https: %s" % e) > - else: > - debug("optimizing connection using unix socket %r" % unix_socket) > - > - return http > -- > 2.26.2Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com Fedora Windows cross-compiler. Compile Windows programs, test, and build Windows installers. Over 100 libraries supported. http://fedoraproject.org/wiki/MinGW