Richard W.M. Jones
2022-Jan-04 16:02 UTC
[Libguestfs] [PATCH 5/5] output/rhv-upload-plugin: Keep connections alive
On Sat, Dec 18, 2021 at 10:36:33PM +0200, Nir Soffer wrote:> When importing from vddk, nbdcopy may be blocked for few minutes(!) > trying to get extents. While nbdcopy is blocked, imageio server closes > the idle connections. When we finally get a request from nbdcopy, we > fail to detect that the connection was closed. > > Detecting a closed connection is hard and racy. In the good case, we get > a BrokenPipe error. In the bad case, imageio closed the socket right > after we sent a request, and we get an invalid status line. When using > imageio proxy, we may get http error (e.g. 500) if the proxy connection > to imageio server on the host was closed. > > Even worse, when we find that the connection was closed, it is not safe > to reopen the connection, since qemu-nbd does not ensure yet that data > written to the previous connection will be flushed when we flush the new > connection. > > Fix the issue by keeping the connections alive. A pool keeper thread > sends a flush request on idle connection every ~30 seconds. This also > improves data integrity and efficiency, using idle time to flush written > data. > > Fixes https://bugzilla.redhat.com/2032324Ideally imageio would not just time out after such a short time when a client has connections open. (Do we actually hold the TCP-level connection open during this time?) Is there a no-op ping-like request that we can send? If TCP-level connection is open, can we enable TCP keepalives? Rich.> output/rhv-upload-plugin.py | 71 +++++++++++++++++++++++++++++++++++++ > 1 file changed, 71 insertions(+) > > diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py > index 8d088c4e..172da358 100644 > --- a/output/rhv-upload-plugin.py > +++ b/output/rhv-upload-plugin.py > @@ -13,50 +13,60 @@ > # GNU General Public License for more details. > # > # You should have received a copy of the GNU General Public License along > # with this program; if not, write to the Free Software Foundation, Inc., > # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. > > import json > import queue > import socket > import ssl > +import threading > import time > > from contextlib import contextmanager > from http.client import HTTPSConnection, HTTPConnection > from urllib.parse import urlparse > > import nbdkit > > # Using version 2 supporting the buffer protocol for better performance. > API_VERSION = 2 > > # Maximum number of connection to imageio server. Based on testing with imageio > # client, this give best performance. > MAX_CONNECTIONS = 4 > > +# Maximum idle time allowed for imageio connections. > +IDLE_TIMEOUT = 30 > + > # Required parameters. > size = None > url = None > > # Optional parameters. > cafile = None > insecure = False > is_ovirt_host = False > > # List of options read from imageio server. > options = None > > # Pool of HTTP connections. > pool = None > > +# Set when plugin is cleaning up. > +done = threading.Event() > + > +# Set when periodic flush request fails. > +pool_error = None > + > > # Parse parameters. > def config(key, value): > global cafile, url, is_ovirt_host, insecure, size > > if key == "cafile": > cafile = value > elif key == "insecure": > insecure = value.lower() in ['true', '1'] > elif key == "is_ovirt_host": > @@ -84,25 +94,31 @@ def after_fork(): > options = get_options(http, url) > http.close() > > nbdkit.debug("imageio features: flush=%(can_flush)r " > "zero=%(can_zero)r unix_socket=%(unix_socket)r " > "max_readers=%(max_readers)r max_writers=%(max_writers)r" > % options) > > pool = create_http_pool(url, options) > > + t = threading.Thread(target=pool_keeper, name="poolkeeper") > + t.daemon = True > + t.start() > + > > # This function is not actually defined before nbdkit 1.28, but it > # doesn't particularly matter if we don't close the pool because > # clients should call flush(). > def cleanup(): > + nbdkit.debug("cleaning up") > + done.set() > close_http_pool(pool) > > > def thread_model(): > """ > Using parallel model to speed up transfer with multiple connections to > imageio server. > """ > return nbdkit.THREAD_MODEL_PARALLEL > > @@ -272,20 +288,23 @@ def emulate_zero(h, count, offset, flags): > r = http.getresponse() > if r.status != 200: > request_failed(r, > "could not write zeroes offset %d size %d" % > (offset, count)) > > r.read() > > > def flush(h, flags): > + if pool_error: > + raise pool_error > + > # Wait until all inflight requests are completed, and send a flush > # request for all imageio connections. > locked = [] > > # Lock the pool by taking all connections out. > while len(locked) < pool.maxsize: > locked.append(pool.get()) > > try: > for item in locked: > @@ -348,26 +367,78 @@ def create_http_pool(url, options): > > pool = queue.Queue(count) > > for i in range(count): > http = create_http(url, unix_socket=unix_socket) > pool.put(PoolItem(http)) > > return pool > > > +def pool_keeper(): > + """ > + Thread flushing idle connections, keeping them alive. > + > + If a connection does not send any request for 60 seconds, imageio > + server closes the connection. Recovering from closed connection is > + hard and unsafe, so this thread ensure that connections never > + becomes idle by sending a flush request if the connection is idle > + for too much time. > + > + In normal conditions, all connections are busy most of the time, so > + the keeper will find no idle connections. If there short delays in > + nbdcopy, the keeper will find some idle connections, but will > + quickly return them back to the pool. In the pathological case when > + nbdcopy is blocked for 3 minutes on vddk input, the keeper will send > + a flush request on all connections every ~30 seconds, until nbdcopy > + starts communicating again. > + """ > + global pool_error > + > + nbdkit.debug("pool keeper started") > + > + while not done.wait(IDLE_TIMEOUT / 2): > + idle = [] > + > + while True: > + try: > + idle.append(pool.get_nowait()) > + except queue.Empty: > + break > + > + if idle: > + now = time.monotonic() > + for item in idle: > + if item.last_used and now - item.last_used > IDLE_TIMEOUT: > + nbdkit.debug("Flushing idle connection") > + try: > + send_flush(item.http) > + item.last_used = now > + except Exception as e: > + # We will report this error on the next request. > + pool_error = e > + item.last_used = None > + > + pool.put(item) > + > + nbdkit.debug("pool keeper stopped") > + > + > @contextmanager > def http_context(pool): > """ > Context manager yielding an imageio http connection from the pool. Blocks > until a connection is available. > """ > + if pool_error: > + raise pool_error > + > item = pool.get() > try: > yield item.http > finally: > item.last_used = time.monotonic() > pool.put(item) > > > def close_http_pool(pool): > """ > -- > 2.33.1-- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-df lists disk usage of guests without needing to install any software inside the virtual machine. Supports Linux and Windows. http://people.redhat.com/~rjones/virt-df/
Nir Soffer
2022-Jan-04 16:29 UTC
[Libguestfs] [PATCH 5/5] output/rhv-upload-plugin: Keep connections alive
On Tue, Jan 4, 2022 at 6:02 PM Richard W.M. Jones <rjones at redhat.com> wrote:> > On Sat, Dec 18, 2021 at 10:36:33PM +0200, Nir Soffer wrote: > > When importing from vddk, nbdcopy may be blocked for few minutes(!) > > trying to get extents. While nbdcopy is blocked, imageio server closes > > the idle connections. When we finally get a request from nbdcopy, we > > fail to detect that the connection was closed. > > > > Detecting a closed connection is hard and racy. In the good case, we get > > a BrokenPipe error. In the bad case, imageio closed the socket right > > after we sent a request, and we get an invalid status line. When using > > imageio proxy, we may get http error (e.g. 500) if the proxy connection > > to imageio server on the host was closed. > > > > Even worse, when we find that the connection was closed, it is not safe > > to reopen the connection, since qemu-nbd does not ensure yet that data > > written to the previous connection will be flushed when we flush the new > > connection. > > > > Fix the issue by keeping the connections alive. A pool keeper thread > > sends a flush request on idle connection every ~30 seconds. This also > > improves data integrity and efficiency, using idle time to flush written > > data. > > > > Fixes https://bugzilla.redhat.com/2032324 > > Ideally imageio would not just time out after such a short time when a > client has connections open. (Do we actually hold the TCP-level > connection open during this time?) > > Is there a no-op ping-like request that we can send?We don't have no-op request, but OPTIONS can be used for that. It is has tiny json response that can be droped when you use it as a "ping". Using options can simplify the change, since we don't have to report errors in OPTIONS, they are not critical.> If TCP-level > connection is open, can we enable TCP keepalives?We can but I think the timeouts are too long, and it is not the right way to keep a connection open. You want to do this in the application level, this way you verify the entire stack on each ping.> > Rich. > > > output/rhv-upload-plugin.py | 71 +++++++++++++++++++++++++++++++++++++ > > 1 file changed, 71 insertions(+) > > > > diff --git a/output/rhv-upload-plugin.py b/output/rhv-upload-plugin.py > > index 8d088c4e..172da358 100644 > > --- a/output/rhv-upload-plugin.py > > +++ b/output/rhv-upload-plugin.py > > @@ -13,50 +13,60 @@ > > # GNU General Public License for more details. > > # > > # You should have received a copy of the GNU General Public License along > > # with this program; if not, write to the Free Software Foundation, Inc., > > # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. > > > > import json > > import queue > > import socket > > import ssl > > +import threading > > import time > > > > from contextlib import contextmanager > > from http.client import HTTPSConnection, HTTPConnection > > from urllib.parse import urlparse > > > > import nbdkit > > > > # Using version 2 supporting the buffer protocol for better performance. > > API_VERSION = 2 > > > > # Maximum number of connection to imageio server. Based on testing with imageio > > # client, this give best performance. > > MAX_CONNECTIONS = 4 > > > > +# Maximum idle time allowed for imageio connections. > > +IDLE_TIMEOUT = 30 > > + > > # Required parameters. > > size = None > > url = None > > > > # Optional parameters. > > cafile = None > > insecure = False > > is_ovirt_host = False > > > > # List of options read from imageio server. > > options = None > > > > # Pool of HTTP connections. > > pool = None > > > > +# Set when plugin is cleaning up. > > +done = threading.Event() > > + > > +# Set when periodic flush request fails. > > +pool_error = None > > + > > > > # Parse parameters. > > def config(key, value): > > global cafile, url, is_ovirt_host, insecure, size > > > > if key == "cafile": > > cafile = value > > elif key == "insecure": > > insecure = value.lower() in ['true', '1'] > > elif key == "is_ovirt_host": > > @@ -84,25 +94,31 @@ def after_fork(): > > options = get_options(http, url) > > http.close() > > > > nbdkit.debug("imageio features: flush=%(can_flush)r " > > "zero=%(can_zero)r unix_socket=%(unix_socket)r " > > "max_readers=%(max_readers)r max_writers=%(max_writers)r" > > % options) > > > > pool = create_http_pool(url, options) > > > > + t = threading.Thread(target=pool_keeper, name="poolkeeper") > > + t.daemon = True > > + t.start() > > + > > > > # This function is not actually defined before nbdkit 1.28, but it > > # doesn't particularly matter if we don't close the pool because > > # clients should call flush(). > > def cleanup(): > > + nbdkit.debug("cleaning up") > > + done.set() > > close_http_pool(pool) > > > > > > def thread_model(): > > """ > > Using parallel model to speed up transfer with multiple connections to > > imageio server. > > """ > > return nbdkit.THREAD_MODEL_PARALLEL > > > > @@ -272,20 +288,23 @@ def emulate_zero(h, count, offset, flags): > > r = http.getresponse() > > if r.status != 200: > > request_failed(r, > > "could not write zeroes offset %d size %d" % > > (offset, count)) > > > > r.read() > > > > > > def flush(h, flags): > > + if pool_error: > > + raise pool_error > > + > > # Wait until all inflight requests are completed, and send a flush > > # request for all imageio connections. > > locked = [] > > > > # Lock the pool by taking all connections out. > > while len(locked) < pool.maxsize: > > locked.append(pool.get()) > > > > try: > > for item in locked: > > @@ -348,26 +367,78 @@ def create_http_pool(url, options): > > > > pool = queue.Queue(count) > > > > for i in range(count): > > http = create_http(url, unix_socket=unix_socket) > > pool.put(PoolItem(http)) > > > > return pool > > > > > > +def pool_keeper(): > > + """ > > + Thread flushing idle connections, keeping them alive. > > + > > + If a connection does not send any request for 60 seconds, imageio > > + server closes the connection. Recovering from closed connection is > > + hard and unsafe, so this thread ensure that connections never > > + becomes idle by sending a flush request if the connection is idle > > + for too much time. > > + > > + In normal conditions, all connections are busy most of the time, so > > + the keeper will find no idle connections. If there short delays in > > + nbdcopy, the keeper will find some idle connections, but will > > + quickly return them back to the pool. In the pathological case when > > + nbdcopy is blocked for 3 minutes on vddk input, the keeper will send > > + a flush request on all connections every ~30 seconds, until nbdcopy > > + starts communicating again. > > + """ > > + global pool_error > > + > > + nbdkit.debug("pool keeper started") > > + > > + while not done.wait(IDLE_TIMEOUT / 2): > > + idle = [] > > + > > + while True: > > + try: > > + idle.append(pool.get_nowait()) > > + except queue.Empty: > > + break > > + > > + if idle: > > + now = time.monotonic() > > + for item in idle: > > + if item.last_used and now - item.last_used > IDLE_TIMEOUT: > > + nbdkit.debug("Flushing idle connection") > > + try: > > + send_flush(item.http) > > + item.last_used = now > > + except Exception as e: > > + # We will report this error on the next request. > > + pool_error = e > > + item.last_used = None > > + > > + pool.put(item) > > + > > + nbdkit.debug("pool keeper stopped") > > + > > + > > @contextmanager > > def http_context(pool): > > """ > > Context manager yielding an imageio http connection from the pool. Blocks > > until a connection is available. > > """ > > + if pool_error: > > + raise pool_error > > + > > item = pool.get() > > try: > > yield item.http > > finally: > > item.last_used = time.monotonic() > > pool.put(item) > > > > > > def close_http_pool(pool): > > """ > > -- > > 2.33.1 > > -- > Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones > Read my programming and virtualization blog: http://rwmj.wordpress.com > virt-df lists disk usage of guests without needing to install any > software inside the virtual machine. Supports Linux and Windows. > http://people.redhat.com/~rjones/virt-df/ >