Fix problems in new rhv-upload implementation: - The plugin does not flush to all connections in flush() - The plugin does not close all connections in cleanup() - Idle connections are closed in imageio server, and we don't have a safe way to recover. - virt-v2v try to get disk allocation using imageio output, but imageio output does not support extents. Even if imageio output will support extents, the call is done after the transfer was finalized so it does not have access to storage. Problems not fixed yet: - Image transfer is finalized *before* closing the connection to imageio - this will always time out with RHV < 4.4.9, and succeeds by mistake with RHV 4.4.9 due to a regression that will be fixed in 4.4.10. This will be a non-issue in next RHV version[1]. To support older RHV versions, virt-v2v must finalize the image transfer *after* closing the output. Tested on RHEL 8.6 with upstream nbdkit and libnbd. [1] https://github.com/oVirt/ovirt-imageio/pull/15 Fixes https://bugzilla.redhat.com/2032324 Nir Soffer (5): output/rhv-upload-plugin: Fix flush and close v2v/lib/util.ml: Get disk allocation from input output/rhv-upload-plugin: Extract send_flush() helper output/rhv-upload-plugin: Track http last request time output/rhv-upload-plugin: Keep connections alive lib/utils.ml | 2 +- output/rhv-upload-plugin.py | 151 ++++++++++++++++++++++++++---------- 2 files changed, 113 insertions(+), 40 deletions(-) -- 2.33.1
Nir Soffer
2021-Dec-18 20:36 UTC
[Libguestfs] [PATCH 1/5] output/rhv-upload-plugin: Fix flush and close
When locking the http pool, we wait until all connections are idle, and take them from the pool. But since we used pool.qsize(), which is the number of items currently in the queue, we did not wait for all connections. This leads to following issues: - We send flush request only for some connections, which does not ensure that all uploaded data is flushed to storage. - We close only some of the connections in cleanup(). This should not matter since the connections are closed when the plugin process terminates. An example import showing sending only one FLUSH request instead of 4: https://bugzilla.redhat.com/2032324#c8 Fixed by creating a bounded queue and using pool.maxsize to get all the connections from the pool. --- output/rhv-upload-plugin.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py index 1cb837dd..bad0e8a3 100644 --- a/output/rhv-upload-plugin.py +++ b/output/rhv-upload-plugin.py @@ -307,30 +307,30 @@ class UnixHTTPConnection(HTTPConnection): def connect(self): self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) if self.timeout is not socket._GLOBAL_DEFAULT_TIMEOUT: self.sock.settimeout(timeout) self.sock.connect(self.path) # Connection pool. def create_http_pool(url, options): - pool = queue.Queue() - count = min(options["max_readers"], options["max_writers"], MAX_CONNECTIONS) nbdkit.debug("creating http pool connections=%d" % count) unix_socket = options["unix_socket"] if is_ovirt_host else None + pool = queue.Queue(count) + for i in range(count): http = create_http(url, unix_socket=unix_socket) pool.put(http) return pool @contextmanager def http_context(pool): """ @@ -347,22 +347,22 @@ def http_context(pool): def iter_http_pool(pool): """ Wait until all inflight requests are done, and iterate on imageio connections. The pool is empty during iteration. New requests issued during iteration will block until iteration is done. """ locked = [] - # Lock the pool by taking the connection out. - while len(locked) < pool.qsize(): + # Lock the pool by taking all connections out. + while len(locked) < pool.maxsize: locked.append(pool.get()) try: for http in locked: yield http finally: # Unlock the pool by puting the connection back. for http in locked: pool.put(http) @@ -371,21 +371,21 @@ def close_http_pool(pool): """ Wait until all inflight requests are done, close all connections and remove them from the pool. No request can be served by the pool after this call. """ nbdkit.debug("closing http pool") locked = [] - while len(locked) < pool.qsize(): + while len(locked) < pool.maxsize: locked.append(pool.get()) for http in locked: http.close() def create_http(url, unix_socket=None): """ Create http connection for transfer url. -- 2.33.1
Nir Soffer
2021-Dec-18 20:36 UTC
[Libguestfs] [PATCH 2/5] v2v/lib/util.ml: Get disk allocation from input
After finalizing the transfer, virt-v2v try to connect to the output socket and query disk allocation. This may work for some outputs supporting block status, but for rhv_upload output this cannot work for 2 reasons: - The rhv-upload-plugin does not support extents - The transfer was finalized before this call, so the plugin lost access to the image. Here is an example failure log: [ 74.2] Creating output metadata python3 '/tmp/v2v.WMq8Tk/rhv-upload-finalize.py' '/tmp/v2v.WMq8Tk/params6.json' finalizing transfer b03fe3ba-a4ff-4634-a0a0-10b3daba3cc2 ... transfer b03fe3ba-a4ff-4634-a0a0-10b3daba3cc2 finalized in 2.118 seconds ... nbdkit: debug: accepted connection ... nbdkit: python[4]: debug: python: close virt-v2v: error: exception: NBD.Error("nbd_block_status: request out of bounds: Invalid argument", 22) Fix by using the input socket. --- lib/utils.ml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/lib/utils.ml b/lib/utils.ml index d6861d08..f6c85543 100644 --- a/lib/utils.ml +++ b/lib/utils.ml @@ -171,21 +171,21 @@ let with_nbd_connect_unix ~socket ~meta_contexts ~f ~f:(fun () -> List.iter (NBD.add_meta_context nbd) meta_contexts; NBD.connect_unix nbd socket; protect ~f:(fun () -> f nbd) ~finally:(fun () -> NBD.shutdown nbd) ) ~finally:(fun () -> NBD.close nbd) let get_disk_allocated ~dir ~disknr - let socket = sprintf "%s/out%d" dir disknr + let socket = sprintf "%s/in%d" dir disknr and alloc_ctx = "base:allocation" in with_nbd_connect_unix ~socket ~meta_contexts:[alloc_ctx] ~f:(fun nbd -> if NBD.can_meta_context nbd alloc_ctx then ( (* Get the list of extents, using a 2GiB chunk size as hint. *) let size = NBD.get_size nbd and allocated = ref 0_L and fetch_offset = ref 0_L in while !fetch_offset < size do let remaining = size -^ !fetch_offset in -- 2.33.1
Nir Soffer
2021-Dec-18 20:36 UTC
[Libguestfs] [PATCH 3/5] output/rhv-upload-plugin: Extract send_flush() helper
Extract a helper for sending flush request for single connection, and inline the iter_http_pool() helper into flush(), its only user. --- output/rhv-upload-plugin.py | 54 ++++++++++++++++--------------------- 1 file changed, 23 insertions(+), 31 deletions(-) diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py index bad0e8a3..f7e5950f 100644 --- a/output/rhv-upload-plugin.py +++ b/output/rhv-upload-plugin.py @@ -271,36 +271,51 @@ def emulate_zero(h, count, offset, flags): r = http.getresponse() if r.status != 200: request_failed(r, "could not write zeroes offset %d size %d" % (offset, count)) r.read() def flush(h, flags): + # Wait until all inflight requests are completed, and send a flush + # request for all imageio connections. + locked = [] + + # Lock the pool by taking all connections out. + while len(locked) < pool.maxsize: + locked.append(pool.get()) + + try: + for http in locked: + send_flush(http) + finally: + # Unlock the pool by puting the connection back. + for http in locked: + pool.put(http) + + +def send_flush(http): # Construct the JSON request for flushing. buf = json.dumps({'op': "flush"}).encode() headers = {"Content-Type": "application/json", "Content-Length": str(len(buf))} - # Wait until all inflight requests are completed, and send a flush - # request for all imageio connections. - for http in iter_http_pool(pool): - http.request("PATCH", url.path, body=buf, headers=headers) + http.request("PATCH", url.path, body=buf, headers=headers) - r = http.getresponse() - if r.status != 200: - request_failed(r, "could not flush") + r = http.getresponse() + if r.status != 200: + request_failed(r, "could not flush") - r.read() + r.read() # Modify http.client.HTTPConnection to work over a Unix domain socket. # Derived from uhttplib written by Erik van Zijst under an MIT license. # (https://pypi.org/project/uhttplib/) # Ported to Python 3 by Irit Goihman. class UnixHTTPConnection(HTTPConnection): def __init__(self, path, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): self.path = path HTTPConnection.__init__(self, "localhost", timeout=timeout) @@ -337,43 +352,20 @@ def http_context(pool): Context manager yielding an imageio http connection from the pool. Blocks until a connection is available. """ http = pool.get() try: yield http finally: pool.put(http) -def iter_http_pool(pool): - """ - Wait until all inflight requests are done, and iterate on imageio - connections. - - The pool is empty during iteration. New requests issued during iteration - will block until iteration is done. - """ - locked = [] - - # Lock the pool by taking all connections out. - while len(locked) < pool.maxsize: - locked.append(pool.get()) - - try: - for http in locked: - yield http - finally: - # Unlock the pool by puting the connection back. - for http in locked: - pool.put(http) - - def close_http_pool(pool): """ Wait until all inflight requests are done, close all connections and remove them from the pool. No request can be served by the pool after this call. """ nbdkit.debug("closing http pool") locked = [] -- 2.33.1
Nir Soffer
2021-Dec-18 20:36 UTC
[Libguestfs] [PATCH 4/5] output/rhv-upload-plugin: Track http last request time
Track the last time a connection was used. This will be used to detect idle connections. --- output/rhv-upload-plugin.py | 30 ++++++++++++++++++++---------- 1 file changed, 20 insertions(+), 10 deletions(-) diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py index f7e5950f..8d088c4e 100644 --- a/output/rhv-upload-plugin.py +++ b/output/rhv-upload-plugin.py @@ -13,20 +13,21 @@ # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import json import queue import socket import ssl +import time from contextlib import contextmanager from http.client import HTTPSConnection, HTTPConnection from urllib.parse import urlparse import nbdkit # Using version 2 supporting the buffer protocol for better performance. API_VERSION = 2 @@ -280,26 +281,27 @@ def emulate_zero(h, count, offset, flags): def flush(h, flags): # Wait until all inflight requests are completed, and send a flush # request for all imageio connections. locked = [] # Lock the pool by taking all connections out. while len(locked) < pool.maxsize: locked.append(pool.get()) try: - for http in locked: - send_flush(http) + for item in locked: + send_flush(item.http) + item.last_used = time.monotonic() finally: # Unlock the pool by puting the connection back. - for http in locked: - pool.put(http) + for item in locked: + pool.put(item) def send_flush(http): # Construct the JSON request for flushing. buf = json.dumps({'op': "flush"}).encode() headers = {"Content-Type": "application/json", "Content-Length": str(len(buf))} http.request("PATCH", url.path, body=buf, headers=headers) @@ -320,68 +322,76 @@ class UnixHTTPConnection(HTTPConnection): self.path = path HTTPConnection.__init__(self, "localhost", timeout=timeout) def connect(self): self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) if self.timeout is not socket._GLOBAL_DEFAULT_TIMEOUT: self.sock.settimeout(timeout) self.sock.connect(self.path) +class PoolItem: + + def __init__(self, http): + self.http = http + self.last_used = None + + # Connection pool. def create_http_pool(url, options): count = min(options["max_readers"], options["max_writers"], MAX_CONNECTIONS) nbdkit.debug("creating http pool connections=%d" % count) unix_socket = options["unix_socket"] if is_ovirt_host else None pool = queue.Queue(count) for i in range(count): http = create_http(url, unix_socket=unix_socket) - pool.put(http) + pool.put(PoolItem(http)) return pool @contextmanager def http_context(pool): """ Context manager yielding an imageio http connection from the pool. Blocks until a connection is available. """ - http = pool.get() + item = pool.get() try: - yield http + yield item.http finally: - pool.put(http) + item.last_used = time.monotonic() + pool.put(item) def close_http_pool(pool): """ Wait until all inflight requests are done, close all connections and remove them from the pool. No request can be served by the pool after this call. """ nbdkit.debug("closing http pool") locked = [] while len(locked) < pool.maxsize: locked.append(pool.get()) - for http in locked: - http.close() + for item in locked: + item.http.close() def create_http(url, unix_socket=None): """ Create http connection for transfer url. Returns HTTPConnection. """ if unix_socket: nbdkit.debug("creating unix http connection socket=%r" % unix_socket) -- 2.33.1
Nir Soffer
2021-Dec-18 20:36 UTC
[Libguestfs] [PATCH 5/5] output/rhv-upload-plugin: Keep connections alive
When importing from vddk, nbdcopy may be blocked for few minutes(!) trying to get extents. While nbdcopy is blocked, imageio server closes the idle connections. When we finally get a request from nbdcopy, we fail to detect that the connection was closed. Detecting a closed connection is hard and racy. In the good case, we get a BrokenPipe error. In the bad case, imageio closed the socket right after we sent a request, and we get an invalid status line. When using imageio proxy, we may get http error (e.g. 500) if the proxy connection to imageio server on the host was closed. Even worse, when we find that the connection was closed, it is not safe to reopen the connection, since qemu-nbd does not ensure yet that data written to the previous connection will be flushed when we flush the new connection. Fix the issue by keeping the connections alive. A pool keeper thread sends a flush request on idle connection every ~30 seconds. This also improves data integrity and efficiency, using idle time to flush written data. Fixes https://bugzilla.redhat.com/2032324 --- output/rhv-upload-plugin.py | 71 +++++++++++++++++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py index 8d088c4e..172da358 100644 --- a/output/rhv-upload-plugin.py +++ b/output/rhv-upload-plugin.py @@ -13,50 +13,60 @@ # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License along # with this program; if not, write to the Free Software Foundation, Inc., # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. import json import queue import socket import ssl +import threading import time from contextlib import contextmanager from http.client import HTTPSConnection, HTTPConnection from urllib.parse import urlparse import nbdkit # Using version 2 supporting the buffer protocol for better performance. API_VERSION = 2 # Maximum number of connection to imageio server. Based on testing with imageio # client, this give best performance. MAX_CONNECTIONS = 4 +# Maximum idle time allowed for imageio connections. +IDLE_TIMEOUT = 30 + # Required parameters. size = None url = None # Optional parameters. cafile = None insecure = False is_ovirt_host = False # List of options read from imageio server. options = None # Pool of HTTP connections. pool = None +# Set when plugin is cleaning up. +done = threading.Event() + +# Set when periodic flush request fails. +pool_error = None + # Parse parameters. def config(key, value): global cafile, url, is_ovirt_host, insecure, size if key == "cafile": cafile = value elif key == "insecure": insecure = value.lower() in ['true', '1'] elif key == "is_ovirt_host": @@ -84,25 +94,31 @@ def after_fork(): options = get_options(http, url) http.close() nbdkit.debug("imageio features: flush=%(can_flush)r " "zero=%(can_zero)r unix_socket=%(unix_socket)r " "max_readers=%(max_readers)r max_writers=%(max_writers)r" % options) pool = create_http_pool(url, options) + t = threading.Thread(target=pool_keeper, name="poolkeeper") + t.daemon = True + t.start() + # This function is not actually defined before nbdkit 1.28, but it # doesn't particularly matter if we don't close the pool because # clients should call flush(). def cleanup(): + nbdkit.debug("cleaning up") + done.set() close_http_pool(pool) def thread_model(): """ Using parallel model to speed up transfer with multiple connections to imageio server. """ return nbdkit.THREAD_MODEL_PARALLEL @@ -272,20 +288,23 @@ def emulate_zero(h, count, offset, flags): r = http.getresponse() if r.status != 200: request_failed(r, "could not write zeroes offset %d size %d" % (offset, count)) r.read() def flush(h, flags): + if pool_error: + raise pool_error + # Wait until all inflight requests are completed, and send a flush # request for all imageio connections. locked = [] # Lock the pool by taking all connections out. while len(locked) < pool.maxsize: locked.append(pool.get()) try: for item in locked: @@ -348,26 +367,78 @@ def create_http_pool(url, options): pool = queue.Queue(count) for i in range(count): http = create_http(url, unix_socket=unix_socket) pool.put(PoolItem(http)) return pool +def pool_keeper(): + """ + Thread flushing idle connections, keeping them alive. + + If a connection does not send any request for 60 seconds, imageio + server closes the connection. Recovering from closed connection is + hard and unsafe, so this thread ensure that connections never + becomes idle by sending a flush request if the connection is idle + for too much time. + + In normal conditions, all connections are busy most of the time, so + the keeper will find no idle connections. If there short delays in + nbdcopy, the keeper will find some idle connections, but will + quickly return them back to the pool. In the pathological case when + nbdcopy is blocked for 3 minutes on vddk input, the keeper will send + a flush request on all connections every ~30 seconds, until nbdcopy + starts communicating again. + """ + global pool_error + + nbdkit.debug("pool keeper started") + + while not done.wait(IDLE_TIMEOUT / 2): + idle = [] + + while True: + try: + idle.append(pool.get_nowait()) + except queue.Empty: + break + + if idle: + now = time.monotonic() + for item in idle: + if item.last_used and now - item.last_used > IDLE_TIMEOUT: + nbdkit.debug("Flushing idle connection") + try: + send_flush(item.http) + item.last_used = now + except Exception as e: + # We will report this error on the next request. + pool_error = e + item.last_used = None + + pool.put(item) + + nbdkit.debug("pool keeper stopped") + + @contextmanager def http_context(pool): """ Context manager yielding an imageio http connection from the pool. Blocks until a connection is available. """ + if pool_error: + raise pool_error + item = pool.get() try: yield item.http finally: item.last_used = time.monotonic() pool.put(item) def close_http_pool(pool): """ -- 2.33.1
On Sat, Dec 18, 2021 at 10:36 PM Nir Soffer <nsoffer at redhat.com> wrote:> > Fix problems in new rhv-upload implementation: > > - The plugin does not flush to all connections in flush() > - The plugin does not close all connections in cleanup() > - Idle connections are closed in imageio server, and we don't have a safe way > to recover. > - virt-v2v try to get disk allocation using imageio output, but imageio output > does not support extents. Even if imageio output will support extents, the > call is done after the transfer was finalized so it does not have access to > storage. > > Problems not fixed yet: > > - Image transfer is finalized *before* closing the connection to imageio - this > will always time out with RHV < 4.4.9, and succeeds by mistake with RHV 4.4.9 > due to a regression that will be fixed in 4.4.10. This will be a non-issue in > next RHV version[1]. To support older RHV versions, virt-v2v must finalize > the image transfer *after* closing the output. > > Tested on RHEL 8.6 with upstream nbdkit and libnbd. > > [1] https://github.com/oVirt/ovirt-imageio/pull/15 > > Fixes https://bugzilla.redhat.com/2032324 > > Nir Soffer (5): > output/rhv-upload-plugin: Fix flush and close > v2v/lib/util.ml: Get disk allocation from input > output/rhv-upload-plugin: Extract send_flush() helper > output/rhv-upload-plugin: Track http last request time > output/rhv-upload-plugin: Keep connections aliveRichard, can you take a look at the patches?> > lib/utils.ml | 2 +- > output/rhv-upload-plugin.py | 151 ++++++++++++++++++++++++++---------- > 2 files changed, 113 insertions(+), 40 deletions(-) > > -- > 2.33.1 > >
Richard W.M. Jones
2022-Jan-04 16:02 UTC
[Libguestfs] [PATCH 5/5] output/rhv-upload-plugin: Keep connections alive
On Sat, Dec 18, 2021 at 10:36:33PM +0200, Nir Soffer wrote:> When importing from vddk, nbdcopy may be blocked for few minutes(!) > trying to get extents. While nbdcopy is blocked, imageio server closes > the idle connections. When we finally get a request from nbdcopy, we > fail to detect that the connection was closed. > > Detecting a closed connection is hard and racy. In the good case, we get > a BrokenPipe error. In the bad case, imageio closed the socket right > after we sent a request, and we get an invalid status line. When using > imageio proxy, we may get http error (e.g. 500) if the proxy connection > to imageio server on the host was closed. > > Even worse, when we find that the connection was closed, it is not safe > to reopen the connection, since qemu-nbd does not ensure yet that data > written to the previous connection will be flushed when we flush the new > connection. > > Fix the issue by keeping the connections alive. A pool keeper thread > sends a flush request on idle connection every ~30 seconds. This also > improves data integrity and efficiency, using idle time to flush written > data. > > Fixes https://bugzilla.redhat.com/2032324Ideally imageio would not just time out after such a short time when a client has connections open. (Do we actually hold the TCP-level connection open during this time?) Is there a no-op ping-like request that we can send? If TCP-level connection is open, can we enable TCP keepalives? Rich.> output/rhv-upload-plugin.py | 71 +++++++++++++++++++++++++++++++++++++ > 1 file changed, 71 insertions(+) > > diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py > index 8d088c4e..172da358 100644 > --- a/output/rhv-upload-plugin.py > +++ b/output/rhv-upload-plugin.py > @@ -13,50 +13,60 @@ > # GNU General Public License for more details. > # > # You should have received a copy of the GNU General Public License along > # with this program; if not, write to the Free Software Foundation, Inc., > # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. > > import json > import queue > import socket > import ssl > +import threading > import time > > from contextlib import contextmanager > from http.client import HTTPSConnection, HTTPConnection > from urllib.parse import urlparse > > import nbdkit > > # Using version 2 supporting the buffer protocol for better performance. > API_VERSION = 2 > > # Maximum number of connection to imageio server. Based on testing with imageio > # client, this give best performance. > MAX_CONNECTIONS = 4 > > +# Maximum idle time allowed for imageio connections. > +IDLE_TIMEOUT = 30 > + > # Required parameters. > size = None > url = None > > # Optional parameters. > cafile = None > insecure = False > is_ovirt_host = False > > # List of options read from imageio server. > options = None > > # Pool of HTTP connections. > pool = None > > +# Set when plugin is cleaning up. > +done = threading.Event() > + > +# Set when periodic flush request fails. > +pool_error = None > + > > # Parse parameters. > def config(key, value): > global cafile, url, is_ovirt_host, insecure, size > > if key == "cafile": > cafile = value > elif key == "insecure": > insecure = value.lower() in ['true', '1'] > elif key == "is_ovirt_host": > @@ -84,25 +94,31 @@ def after_fork(): > options = get_options(http, url) > http.close() > > nbdkit.debug("imageio features: flush=%(can_flush)r " > "zero=%(can_zero)r unix_socket=%(unix_socket)r " > "max_readers=%(max_readers)r max_writers=%(max_writers)r" > % options) > > pool = create_http_pool(url, options) > > + t = threading.Thread(target=pool_keeper, name="poolkeeper") > + t.daemon = True > + t.start() > + > > # This function is not actually defined before nbdkit 1.28, but it > # doesn't particularly matter if we don't close the pool because > # clients should call flush(). > def cleanup(): > + nbdkit.debug("cleaning up") > + done.set() > close_http_pool(pool) > > > def thread_model(): > """ > Using parallel model to speed up transfer with multiple connections to > imageio server. > """ > return nbdkit.THREAD_MODEL_PARALLEL > > @@ -272,20 +288,23 @@ def emulate_zero(h, count, offset, flags): > r = http.getresponse() > if r.status != 200: > request_failed(r, > "could not write zeroes offset %d size %d" % > (offset, count)) > > r.read() > > > def flush(h, flags): > + if pool_error: > + raise pool_error > + > # Wait until all inflight requests are completed, and send a flush > # request for all imageio connections. > locked = [] > > # Lock the pool by taking all connections out. > while len(locked) < pool.maxsize: > locked.append(pool.get()) > > try: > for item in locked: > @@ -348,26 +367,78 @@ def create_http_pool(url, options): > > pool = queue.Queue(count) > > for i in range(count): > http = create_http(url, unix_socket=unix_socket) > pool.put(PoolItem(http)) > > return pool > > > +def pool_keeper(): > + """ > + Thread flushing idle connections, keeping them alive. > + > + If a connection does not send any request for 60 seconds, imageio > + server closes the connection. Recovering from closed connection is > + hard and unsafe, so this thread ensure that connections never > + becomes idle by sending a flush request if the connection is idle > + for too much time. > + > + In normal conditions, all connections are busy most of the time, so > + the keeper will find no idle connections. If there short delays in > + nbdcopy, the keeper will find some idle connections, but will > + quickly return them back to the pool. In the pathological case when > + nbdcopy is blocked for 3 minutes on vddk input, the keeper will send > + a flush request on all connections every ~30 seconds, until nbdcopy > + starts communicating again. > + """ > + global pool_error > + > + nbdkit.debug("pool keeper started") > + > + while not done.wait(IDLE_TIMEOUT / 2): > + idle = [] > + > + while True: > + try: > + idle.append(pool.get_nowait()) > + except queue.Empty: > + break > + > + if idle: > + now = time.monotonic() > + for item in idle: > + if item.last_used and now - item.last_used > IDLE_TIMEOUT: > + nbdkit.debug("Flushing idle connection") > + try: > + send_flush(item.http) > + item.last_used = now > + except Exception as e: > + # We will report this error on the next request. > + pool_error = e > + item.last_used = None > + > + pool.put(item) > + > + nbdkit.debug("pool keeper stopped") > + > + > @contextmanager > def http_context(pool): > """ > Context manager yielding an imageio http connection from the pool. Blocks > until a connection is available. > """ > + if pool_error: > + raise pool_error > + > item = pool.get() > try: > yield item.http > finally: > item.last_used = time.monotonic() > pool.put(item) > > > def close_http_pool(pool): > """ > -- > 2.33.1-- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-df lists disk usage of guests without needing to install any software inside the virtual machine. Supports Linux and Windows. http://people.redhat.com/~rjones/virt-df/
Richard W.M. Jones
2022-Feb-08 15:22 UTC
[Libguestfs] [PATCH 3/5] output/rhv-upload-plugin: Extract send_flush() helper
On Sat, Dec 18, 2021 at 10:36:31PM +0200, Nir Soffer wrote:> Extract a helper for sending flush request for single connection, and > inline the iter_http_pool() helper into flush(), its only user. > --- > output/rhv-upload-plugin.py | 54 ++++++++++++++++--------------------- > 1 file changed, 23 insertions(+), 31 deletions(-) > > diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py > index bad0e8a3..f7e5950f 100644 > --- a/output/rhv-upload-plugin.py > +++ b/output/rhv-upload-plugin.py > @@ -271,36 +271,51 @@ def emulate_zero(h, count, offset, flags): > r = http.getresponse() > if r.status != 200: > request_failed(r, > "could not write zeroes offset %d size %d" % > (offset, count)) > > r.read() > > > def flush(h, flags): > + # Wait until all inflight requests are completed, and send a flush > + # request for all imageio connections. > + locked = [] > + > + # Lock the pool by taking all connections out. > + while len(locked) < pool.maxsize: > + locked.append(pool.get()) > + > + try: > + for http in locked: > + send_flush(http) > + finally: > + # Unlock the pool by puting the connection back. > + for http in locked: > + pool.put(http) > + > + > +def send_flush(http): > # Construct the JSON request for flushing. > buf = json.dumps({'op': "flush"}).encode() > > headers = {"Content-Type": "application/json", > "Content-Length": str(len(buf))} > > - # Wait until all inflight requests are completed, and send a flush > - # request for all imageio connections. > - for http in iter_http_pool(pool): > - http.request("PATCH", url.path, body=buf, headers=headers) > + http.request("PATCH", url.path, body=buf, headers=headers) > > - r = http.getresponse() > - if r.status != 200: > - request_failed(r, "could not flush") > + r = http.getresponse() > + if r.status != 200: > + request_failed(r, "could not flush") > > - r.read() > + r.read() > > > # Modify http.client.HTTPConnection to work over a Unix domain socket. > # Derived from uhttplib written by Erik van Zijst under an MIT license. > # (https://pypi.org/project/uhttplib/) > # Ported to Python 3 by Irit Goihman. > class UnixHTTPConnection(HTTPConnection): > def __init__(self, path, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): > self.path = path > HTTPConnection.__init__(self, "localhost", timeout=timeout) > @@ -337,43 +352,20 @@ def http_context(pool): > Context manager yielding an imageio http connection from the pool. Blocks > until a connection is available. > """ > http = pool.get() > try: > yield http > finally: > pool.put(http) > > > -def iter_http_pool(pool): > - """ > - Wait until all inflight requests are done, and iterate on imageio > - connections. > - > - The pool is empty during iteration. New requests issued during iteration > - will block until iteration is done. > - """ > - locked = [] > - > - # Lock the pool by taking all connections out. > - while len(locked) < pool.maxsize: > - locked.append(pool.get()) > - > - try: > - for http in locked: > - yield http > - finally: > - # Unlock the pool by puting the connection back. > - for http in locked: > - pool.put(http) > - > - > def close_http_pool(pool): > """ > Wait until all inflight requests are done, close all connections and remove > them from the pool. > > No request can be served by the pool after this call. > """ > nbdkit.debug("closing http pool") > > locked = []This one looks like a neutral refactoring, so ACK Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com Fedora Windows cross-compiler. Compile Windows programs, test, and build Windows installers. Over 100 libraries supported. http://fedoraproject.org/wiki/MinGW
Richard W.M. Jones
2022-Feb-08 15:24 UTC
[Libguestfs] [PATCH 5/5] output/rhv-upload-plugin: Keep connections alive
On Sat, Dec 18, 2021 at 10:36:33PM +0200, Nir Soffer wrote:> When importing from vddk, nbdcopy may be blocked for few minutes(!) > trying to get extents. While nbdcopy is blocked, imageio server closes > the idle connections. When we finally get a request from nbdcopy, we > fail to detect that the connection was closed. > > Detecting a closed connection is hard and racy. In the good case, we get > a BrokenPipe error. In the bad case, imageio closed the socket right > after we sent a request, and we get an invalid status line. When using > imageio proxy, we may get http error (e.g. 500) if the proxy connection > to imageio server on the host was closed. > > Even worse, when we find that the connection was closed, it is not safe > to reopen the connection, since qemu-nbd does not ensure yet that data > written to the previous connection will be flushed when we flush the new > connection. > > Fix the issue by keeping the connections alive. A pool keeper thread > sends a flush request on idle connection every ~30 seconds. This also > improves data integrity and efficiency, using idle time to flush written > data. > > Fixes https://bugzilla.redhat.com/2032324 > --- > output/rhv-upload-plugin.py | 71 +++++++++++++++++++++++++++++++++++++ > 1 file changed, 71 insertions(+) > > diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py > index 8d088c4e..172da358 100644 > --- a/output/rhv-upload-plugin.py > +++ b/output/rhv-upload-plugin.py > @@ -13,50 +13,60 @@ > # GNU General Public License for more details. > # > # You should have received a copy of the GNU General Public License along > # with this program; if not, write to the Free Software Foundation, Inc., > # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. > > import json > import queue > import socket > import ssl > +import threading > import time > > from contextlib import contextmanager > from http.client import HTTPSConnection, HTTPConnection > from urllib.parse import urlparse > > import nbdkit > > # Using version 2 supporting the buffer protocol for better performance. > API_VERSION = 2 > > # Maximum number of connection to imageio server. Based on testing with imageio > # client, this give best performance. > MAX_CONNECTIONS = 4 > > +# Maximum idle time allowed for imageio connections. > +IDLE_TIMEOUT = 30 > + > # Required parameters. > size = None > url = None > > # Optional parameters. > cafile = None > insecure = False > is_ovirt_host = False > > # List of options read from imageio server. > options = None > > # Pool of HTTP connections. > pool = None > > +# Set when plugin is cleaning up. > +done = threading.Event() > + > +# Set when periodic flush request fails. > +pool_error = None > + > > # Parse parameters. > def config(key, value): > global cafile, url, is_ovirt_host, insecure, size > > if key == "cafile": > cafile = value > elif key == "insecure": > insecure = value.lower() in ['true', '1'] > elif key == "is_ovirt_host": > @@ -84,25 +94,31 @@ def after_fork(): > options = get_options(http, url) > http.close() > > nbdkit.debug("imageio features: flush=%(can_flush)r " > "zero=%(can_zero)r unix_socket=%(unix_socket)r " > "max_readers=%(max_readers)r max_writers=%(max_writers)r" > % options) > > pool = create_http_pool(url, options) > > + t = threading.Thread(target=pool_keeper, name="poolkeeper") > + t.daemon = True > + t.start() > + > > # This function is not actually defined before nbdkit 1.28, but it > # doesn't particularly matter if we don't close the pool because > # clients should call flush(). > def cleanup(): > + nbdkit.debug("cleaning up") > + done.set() > close_http_pool(pool) > > > def thread_model(): > """ > Using parallel model to speed up transfer with multiple connections to > imageio server. > """ > return nbdkit.THREAD_MODEL_PARALLEL > > @@ -272,20 +288,23 @@ def emulate_zero(h, count, offset, flags): > r = http.getresponse() > if r.status != 200: > request_failed(r, > "could not write zeroes offset %d size %d" % > (offset, count)) > > r.read() > > > def flush(h, flags): > + if pool_error: > + raise pool_error > + > # Wait until all inflight requests are completed, and send a flush > # request for all imageio connections. > locked = [] > > # Lock the pool by taking all connections out. > while len(locked) < pool.maxsize: > locked.append(pool.get()) > > try: > for item in locked: > @@ -348,26 +367,78 @@ def create_http_pool(url, options): > > pool = queue.Queue(count) > > for i in range(count): > http = create_http(url, unix_socket=unix_socket) > pool.put(PoolItem(http)) > > return pool > > > +def pool_keeper(): > + """ > + Thread flushing idle connections, keeping them alive. > + > + If a connection does not send any request for 60 seconds, imageio > + server closes the connection. Recovering from closed connection is > + hard and unsafe, so this thread ensure that connections never > + becomes idle by sending a flush request if the connection is idle > + for too much time. > + > + In normal conditions, all connections are busy most of the time, so > + the keeper will find no idle connections. If there short delays in > + nbdcopy, the keeper will find some idle connections, but will > + quickly return them back to the pool. In the pathological case when > + nbdcopy is blocked for 3 minutes on vddk input, the keeper will send > + a flush request on all connections every ~30 seconds, until nbdcopy > + starts communicating again. > + """ > + global pool_error > + > + nbdkit.debug("pool keeper started") > + > + while not done.wait(IDLE_TIMEOUT / 2): > + idle = [] > + > + while True: > + try: > + idle.append(pool.get_nowait()) > + except queue.Empty: > + break > + > + if idle: > + now = time.monotonic() > + for item in idle: > + if item.last_used and now - item.last_used > IDLE_TIMEOUT: > + nbdkit.debug("Flushing idle connection") > + try: > + send_flush(item.http) > + item.last_used = now > + except Exception as e: > + # We will report this error on the next request. > + pool_error = e > + item.last_used = None > + > + pool.put(item) > + > + nbdkit.debug("pool keeper stopped") > + > + > @contextmanager > def http_context(pool): > """ > Context manager yielding an imageio http connection from the pool. Blocks > until a connection is available. > """ > + if pool_error: > + raise pool_error > + > item = pool.get() > try: > yield item.http > finally: > item.last_used = time.monotonic() > pool.put(item) > > > def close_http_pool(pool):ACK 4 & 5. Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-top is 'top' for virtual machines. Tiny program with many powerful monitoring features, net stats, disk stats, logging, etc. http://people.redhat.com/~rjones/virt-top
Richard W.M. Jones
2022-Feb-08 16:45 UTC
[Libguestfs] [PATCH 1/5] output/rhv-upload-plugin: Fix flush and close
On Sat, Dec 18, 2021 at 10:36:29PM +0200, Nir Soffer wrote:> When locking the http pool, we wait until all connections are idle, and > take them from the pool. But since we used pool.qsize(), which is the > number of items currently in the queue, we did not wait for all > connections. > > This leads to following issues: > > - We send flush request only for some connections, which does not ensure > that all uploaded data is flushed to storage. > > - We close only some of the connections in cleanup(). This should not > matter since the connections are closed when the plugin process > terminates. > > An example import showing sending only one FLUSH request instead of 4: > https://bugzilla.redhat.com/2032324#c8 > > Fixed by creating a bounded queue and using pool.maxsize to get all the > connections from the pool. > --- > output/rhv-upload-plugin.py | 10 +++++----- > 1 file changed, 5 insertions(+), 5 deletions(-) > > diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py > index 1cb837dd..bad0e8a3 100644 > --- a/output/rhv-upload-plugin.py > +++ b/output/rhv-upload-plugin.py > @@ -307,30 +307,30 @@ class UnixHTTPConnection(HTTPConnection): > > def connect(self): > self.sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM) > if self.timeout is not socket._GLOBAL_DEFAULT_TIMEOUT: > self.sock.settimeout(timeout) > self.sock.connect(self.path) > > > # Connection pool. > def create_http_pool(url, options): > - pool = queue.Queue() > - > count = min(options["max_readers"], > options["max_writers"], > MAX_CONNECTIONS) > > nbdkit.debug("creating http pool connections=%d" % count) > > unix_socket = options["unix_socket"] if is_ovirt_host else None > > + pool = queue.Queue(count) > + > for i in range(count): > http = create_http(url, unix_socket=unix_socket) > pool.put(http) > > return pool > > > @contextmanager > def http_context(pool): > """ > @@ -347,22 +347,22 @@ def http_context(pool): > def iter_http_pool(pool): > """ > Wait until all inflight requests are done, and iterate on imageio > connections. > > The pool is empty during iteration. New requests issued during iteration > will block until iteration is done. > """ > locked = [] > > - # Lock the pool by taking the connection out. > - while len(locked) < pool.qsize(): > + # Lock the pool by taking all connections out. > + while len(locked) < pool.maxsize: > locked.append(pool.get()) > > try: > for http in locked: > yield http > finally: > # Unlock the pool by puting the connection back. > for http in locked: > pool.put(http) > > @@ -371,21 +371,21 @@ def close_http_pool(pool): > """ > Wait until all inflight requests are done, close all connections and remove > them from the pool. > > No request can be served by the pool after this call. > """ > nbdkit.debug("closing http pool") > > locked = [] > > - while len(locked) < pool.qsize(): > + while len(locked) < pool.maxsize: > locked.append(pool.get()) > > for http in locked: > http.close() > > > def create_http(url, unix_socket=None): > """ > Create http connection for transfer url.Obvious bug fix, ACK Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-builder quickly builds VMs from scratch http://libguestfs.org/virt-builder.1.html