Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 00/18] rvh-upload: Various fixes and cleanups
This series extract oVirt SDK and imageio code to make it eaiser to follow the code and improve error handing in open() and close(). The first small patches can be consider as fixes for downstream. Tested based on libguestfs v1.41.5, since I had trouble building virt-v2v and libguestfs from master. Nir Soffer (18): rhv-upload: Remove unused exception class rhv-upload: Check status more frequently rhv-upload: Don't flush() in close() rhv-upload: Ensure connection is closed in close() rhv-upload: Ensure http is closed in close() rhv-upload: Fix cleanup after errors rhv-upload: Group oVirt SDK functions rhv-upload: Don't keep disk_service in handle rhv-upload: Extract create_disk() function rhv-upload: Try to remove disk on timeout rhv-upload: Keep disk_id in handle rhv-upload: Don't keep transfer_service in handle rhv-upload: Get host before creating disk rhv-upload: Extract create_transfer() function rhv-upload: Extract imageio helpers rhv-upload: Extract get_options() helper rhv-upload: Extract optimize_http() helper rhv-upload: Clean up username and password v2v/rhv-upload-plugin.py | 550 +++++++++++++++++++++------------------ 1 file changed, 295 insertions(+), 255 deletions(-) -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 01/18] rhv-upload: Remove unused exception class
--- v2v/rhv-upload-plugin.py | 3 --- 1 file changed, 3 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 896c17942..fdd2012f5 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -565,9 +565,6 @@ def close(h): # (https://pypi.org/project/uhttplib/) # Ported to Python 3 by Irit Goihman. -class UnsupportedError(Exception): - pass - class UnixHTTPConnection(HTTPConnection): def __init__(self, path, timeout=socket._GLOBAL_DEFAULT_TIMEOUT): self.path = path -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 02/18] rhv-upload: Check status more frequently
Checking status more frequently save a couple of seconds. Here is an example flow tested with oVirt upload_disk.py example: With 5 seconds wait: Created disk in 11.085111 seconds Created transfer in 1.857502 seconds With 1 second wait: Created disk in 4.991227 seconds Created transfer in 1.961243 seconds --- v2v/rhv-upload-plugin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index fdd2012f5..472d483f2 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -161,7 +161,7 @@ def open(readonly): endt = time.time() + timeout while True: - time.sleep(5) + time.sleep(1) disk = disk_service.get() if disk.status == types.DiskStatus.OK: break @@ -197,7 +197,7 @@ def open(readonly): transfer_service.cancel() raise RuntimeError("timed out waiting for transfer status " "!= INITIALIZING") - time.sleep(5) + time.sleep(1) # Now we have permission to start the transfer. if params['rhv_direct']: -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 03/18] rhv-upload: Don't flush() in close()
Closing a file does not issue fsync() for the caller, and is also unsafe if previous fsync() failed. Clients should issue flush() and check the result. In imageio logs we see that every upload ends with 3 flushes: 2019-11-09 20:07:48,223 INFO (Thread-200) [images] [local] WRITE size=65536 offset=6442385408 flush=False ticket=a3981648-41b4-4b98-9937-6d42e7df1bfa 2019-11-09 20:07:48,228 INFO (Thread-200) [images] [local] FLUSH ticket=a3981648-41b4-4b98-9937-6d42e7df1bfa 2019-11-09 20:07:48,229 INFO (Thread-200) [images] [local] FLUSH ticket=a3981648-41b4-4b98-9937-6d42e7df1bfa 2019-11-09 20:07:48,231 INFO (Thread-200) [images] [local] FLUSH ticket=a3981648-41b4-4b98-9937-6d42e7df1bfa 2019-11-09 20:07:48,233 INFO (Thread-200) [http] CLOSE client=local [connection=17.320226/1, dispatch=13.651897/2151, operation=12.640790/2151, read=0.517585/1876, write=9.985399/1876, zero=0.652223/272, flush=0.000034/3] Removing flush() in close() should remove one of the 3 flush calls, so now we flush() only twice. --- v2v/rhv-upload-plugin.py | 5 ----- 1 file changed, 5 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 472d483f2..fdb3f1021 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -513,11 +513,6 @@ def close(h): return try: - # Issue a flush request on close so that the data is written to - # persistent store before we create the VM. - if h['can_flush']: - flush(h) - http.close() disk = h['disk'] -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 04/18] rhv-upload: Ensure connection is closed in close()
Previously the connection could be left open after close() if finalizing the transfer failed. --- v2v/rhv-upload-plugin.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index fdb3f1021..3fb3091a9 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -553,7 +553,8 @@ def close(h): delete_disk_on_failure(h) raise - connection.close() + finally: + connection.close() # Modify http.client.HTTPConnection to work over a Unix domain socket. # Derived from uhttplib written by Erik van Zijst under an MIT license. -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 05/18] rhv-upload: Ensure http is closed in close()
After failure we forgot to close the http connection. --- v2v/rhv-upload-plugin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 3fb3091a9..5bdfdf49f 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -506,6 +506,8 @@ def close(h): # plugin exits. sys.stderr.flush() + http.close() + # If the connection failed earlier ensure we clean up the disk. if h['failed']: delete_disk_on_failure(h) @@ -513,8 +515,6 @@ def close(h): return try: - http.close() - disk = h['disk'] transfer_service = h['transfer_service'] -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 06/18] rhv-upload: Fix cleanup after errors
When request failed, we paused the transfer. This is not needed since our intent it to cancel the transfer. When closing after failure, we canceled the transfer and removed the disk. This is not needed since the transfer owns the disk and will remove it when canceled. When finalizing times out, we canceled the transfer and removed the disk. This is not needed since the transfer will clean it self, and likely to fail because cancelling is not possible after finalizing. --- v2v/rhv-upload-plugin.py | 37 +++++++++++++++---------------------- 1 file changed, 15 insertions(+), 22 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 5bdfdf49f..1f42c4a55 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -180,6 +180,10 @@ def open(readonly): inactivity_timeout = 3600, ) ) + + # At this point the transfer owns the disk and will delete the disk if the + # transfer is canceled, or if finalizing the transfer fails. + debug("transfer.id = %r" % transfer.id) # Get a reference to the created transfer service. @@ -309,15 +313,12 @@ def can_flush(h): def get_size(h): return params['disk_size'] -# Any unexpected HTTP response status from the server will end up -# calling this function which logs the full error, pauses the -# transfer, sets the failed state, and raises a RuntimeError -# exception. +# Any unexpected HTTP response status from the server will end up calling this +# function which logs the full error, sets the failed state, and raises a +# RuntimeError exception. def request_failed(h, r, msg): - # Setting the failed flag in the handle causes the disk to be - # cleaned up on close. + # Setting the failed flag in the handle will cancel the transfer on close. h['failed'] = True - h['transfer_service'].pause() status = r.status reason = r.reason @@ -490,15 +491,10 @@ def flush(h): r.read() -def delete_disk_on_failure(h): - transfer_service = h['transfer_service'] - transfer_service.cancel() - disk_service = h['disk_service'] - disk_service.remove() - def close(h): http = h['http'] connection = h['connection'] + transfer_service = h['transfer_service'] # This is sometimes necessary because python doesn't set up # sys.stderr to be line buffered and so debug, errors or @@ -508,15 +504,17 @@ def close(h): http.close() - # If the connection failed earlier ensure we clean up the disk. + # If the connection failed earlier ensure we cancel the trasfer. Canceling + # the transfer will delete the disk. if h['failed']: - delete_disk_on_failure(h) - connection.close() + try: + transfer_service.cancel() + finally: + connection.close() return try: disk = h['disk'] - transfer_service = h['transfer_service'] transfer_service.finalize() @@ -548,11 +546,6 @@ def close(h): with builtins.open(params['diskid_file'], 'w') as fp: fp.write(disk.id) - except: - # Otherwise on any failure we must clean up the disk. - delete_disk_on_failure(h) - raise - finally: connection.close() -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 07/18] rhv-upload: Group oVirt SDK functions
Group together functions for working with oVirt SDK. Currently we have only one (find_host), but I plan to extract other functions to make the flow more clear and fix error handling. Maybe these functions can be in a separate module, shared with other oVirt plugins. Starting with minimal change of grouping them. --- v2v/rhv-upload-plugin.py | 98 ++++++++++++++++++++-------------------- 1 file changed, 50 insertions(+), 48 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 1f42c4a55..7075ce3ba 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -57,54 +57,6 @@ def debug(s): print(s, file=sys.stderr) sys.stderr.flush() -def find_host(connection): - """Return the current host object or None.""" - try: - with builtins.open("/etc/vdsm/vdsm.id") as f: - vdsm_id = f.readline().strip() - except Exception as e: - # This is most likely not an oVirt host. - debug("cannot read /etc/vdsm/vdsm.id, using any host: %s" % e) - return None - - debug("hw_id = %r" % vdsm_id) - - system_service = connection.system_service() - storage_name = params['output_storage'] - data_centers = system_service.data_centers_service().list( - search='storage.name=%s' % storage_name, - case_sensitive=True, - ) - if len(data_centers) == 0: - # The storage domain is not attached to a datacenter - # (shouldn't happen, would fail on disk creation). - debug("storange domain (%s) is not attached to a DC" % storage_name) - return None - - datacenter = data_centers[0] - debug("datacenter = %s" % datacenter.name) - - hosts_service = system_service.hosts_service() - hosts = hosts_service.list( - search="hw_id=%s and datacenter=%s and status=Up" - % (vdsm_id, datacenter.name), - case_sensitive=True, - ) - if len(hosts) == 0: - # Couldn't find a host that's fulfilling the following criteria: - # - 'hw_id' equals to 'vdsm_id' - # - Its status is 'Up' - # - Belongs to the storage domain's datacenter - debug("cannot find a running host with hw_id=%r, " - "that belongs to datacenter '%s', " - "using any host" % (vdsm_id, datacenter.name)) - return None - - host = hosts[0] - debug("host.id = %r" % host.id) - - return types.Host(id = host.id) - def open(readonly): # Parse out the username from the output_conn URL. parsed = urlparse(params['output_conn']) @@ -564,3 +516,53 @@ class UnixHTTPConnection(HTTPConnection): if self.timeout is not socket._GLOBAL_DEFAULT_TIMEOUT: self.sock.settimeout(timeout) self.sock.connect(self.path) + +# oVirt SDK operations + +def find_host(connection): + """Return the current host object or None.""" + try: + with builtins.open("/etc/vdsm/vdsm.id") as f: + vdsm_id = f.readline().strip() + except Exception as e: + # This is most likely not an oVirt host. + debug("cannot read /etc/vdsm/vdsm.id, using any host: %s" % e) + return None + + debug("hw_id = %r" % vdsm_id) + + system_service = connection.system_service() + storage_name = params['output_storage'] + data_centers = system_service.data_centers_service().list( + search='storage.name=%s' % storage_name, + case_sensitive=True, + ) + if len(data_centers) == 0: + # The storage domain is not attached to a datacenter + # (shouldn't happen, would fail on disk creation). + debug("storange domain (%s) is not attached to a DC" % storage_name) + return None + + datacenter = data_centers[0] + debug("datacenter = %s" % datacenter.name) + + hosts_service = system_service.hosts_service() + hosts = hosts_service.list( + search="hw_id=%s and datacenter=%s and status=Up" + % (vdsm_id, datacenter.name), + case_sensitive=True, + ) + if len(hosts) == 0: + # Couldn't find a host that's fulfilling the following criteria: + # - 'hw_id' equals to 'vdsm_id' + # - Its status is 'Up' + # - Belongs to the storage domain's datacenter + debug("cannot find a running host with hw_id=%r, " + "that belongs to datacenter '%s', " + "using any host" % (vdsm_id, datacenter.name)) + return None + + host = hosts[0] + debug("host.id = %r" % host.id) + + return types.Host(id = host.id) -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 08/18] rhv-upload: Don't keep disk_service in handle
We don't need to keep the disk_service object in the handle since it can be extracted from the connection and the disk id. This will make it easier to extract the code for creating a disk from open(). --- v2v/rhv-upload-plugin.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 7075ce3ba..809917562 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -246,7 +246,6 @@ def open(readonly): 'can_zero': can_zero, 'connection': connection, 'disk': disk, - 'disk_service': disk_service, 'failed': False, 'highestwrite': 0, 'http': http, @@ -477,11 +476,12 @@ def close(h): # falls through to the exception case and then we can # continue. disk_id = disk.id + disk_service = ( + connection.system_service().disks_service().disk_service(disk.id)) start = time.time() try: while True: time.sleep(1) - disk_service = h['disk_service'] disk = disk_service.get() if disk.status == types.DiskStatus.LOCKED: if time.time() > start + timeout: -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 09/18] rhv-upload: Extract create_disk() function
Creating a disk is complex and clumsy. Moving the code to a helper function cleans open() and will make it easier to fix error handling. --- v2v/rhv-upload-plugin.py | 96 ++++++++++++++++++++++------------------ 1 file changed, 54 insertions(+), 42 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 809917562..9b433bd7e 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -77,50 +77,10 @@ def open(readonly): insecure = params['insecure'], ) - system_service = connection.system_service() - - # Create the disk. - disks_service = system_service.disks_service() - if params['disk_format'] == "raw": - disk_format = types.DiskFormat.RAW - else: - disk_format = types.DiskFormat.COW - disk = disks_service.add( - disk = types.Disk( - # The ID is optional. - id = params.get('rhv_disk_uuid'), - name = params['disk_name'], - description = "Uploaded by virt-v2v", - format = disk_format, - initial_size = params['disk_size'], - provisioned_size = params['disk_size'], - # XXX Ignores params['output_sparse']. - # Handling this properly will be complex, see: - # https://www.redhat.com/archives/libguestfs/2018-March/msg00177.html - sparse = True, - storage_domains = [ - types.StorageDomain( - name = params['output_storage'], - ) - ], - ) - ) - - # Wait till the disk is up, as the transfer can't start if the - # disk is locked: - disk_service = disks_service.disk_service(disk.id) - debug("disk.id = %r" % disk.id) - - endt = time.time() + timeout - while True: - time.sleep(1) - disk = disk_service.get() - if disk.status == types.DiskStatus.OK: - break - if time.time() > endt: - raise RuntimeError("timed out waiting for disk to become unlocked") + disk = create_disk(connection) # Get a reference to the transfer service. + system_service = connection.system_service() transfers_service = system_service.image_transfers_service() # Create a new image transfer, using the local host is possible. @@ -566,3 +526,55 @@ def find_host(connection): debug("host.id = %r" % host.id) return types.Host(id = host.id) + +def create_disk(connection): + """ + Create a new disk for the transfer and wait until the disk is ready. + + Returns disk object. + """ + system_service = connection.system_service() + disks_service = system_service.disks_service() + + if params['disk_format'] == "raw": + disk_format = types.DiskFormat.RAW + else: + disk_format = types.DiskFormat.COW + + disk = disks_service.add( + disk = types.Disk( + # The ID is optional. + id = params.get('rhv_disk_uuid'), + name = params['disk_name'], + description = "Uploaded by virt-v2v", + format = disk_format, + initial_size = params['disk_size'], + provisioned_size = params['disk_size'], + # XXX Ignores params['output_sparse']. + # Handling this properly will be complex, see: + # https://www.redhat.com/archives/libguestfs/2018-March/msg00177.html + sparse = True, + storage_domains = [ + types.StorageDomain( + name = params['output_storage'], + ) + ], + ) + ) + + debug("disk.id = %r" % disk.id) + + # Wait till the disk moved from LOCKED state to OK state, as the transfer + # can't start if the disk is locked. + + disk_service = disks_service.disk_service(disk.id) + endt = time.time() + timeout + while True: + time.sleep(1) + disk = disk_service.get() + if disk.status == types.DiskStatus.OK: + break + if time.time() > endt: + raise RuntimeError("timed out waiting for disk to become unlocked") + + return disk -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 10/18] rhv-upload: Try to remove disk on timeout
If waiting for the disk to become OK times out, try to remove it. This is likely to fail. --- v2v/rhv-upload-plugin.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 9b433bd7e..e79e7ddf7 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -575,6 +575,11 @@ def create_disk(connection): if disk.status == types.DiskStatus.OK: break if time.time() > endt: - raise RuntimeError("timed out waiting for disk to become unlocked") + try: + disk_service.remove() + except Exception as e: + debug("error removing disk %s: %s" % (disk.id, e)) + raise RuntimeError( + "timed out waiting for disk %s to become unlocked" % disk.id) return disk -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 11/18] rhv-upload: Keep disk_id in handle
We kept the disk object for its id. Replace it with the disk id. This can make debugging easier when we log the handle. --- v2v/rhv-upload-plugin.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index e79e7ddf7..4e48033e4 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -205,7 +205,7 @@ def open(readonly): 'can_trim': can_trim, 'can_zero': can_zero, 'connection': connection, - 'disk': disk, + 'disk_id': disk.id, 'failed': False, 'highestwrite': 0, 'http': http, @@ -425,7 +425,7 @@ def close(h): return try: - disk = h['disk'] + disk_id = h['disk_id'] transfer_service.finalize() @@ -435,9 +435,8 @@ def close(h): # waiting for the transfer object to cease to exist, which # falls through to the exception case and then we can # continue. - disk_id = disk.id disk_service = ( - connection.system_service().disks_service().disk_service(disk.id)) + connection.system_service().disks_service().disk_service(disk_id)) start = time.time() try: while True: -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 12/18] rhv-upload: Don't keep transfer_service in handle
Instead of keeping the transfer_service in the handle, extract the transfer service object in close() using the transfer id. This will make it easier to extract the code for creating a transfer out of open(). --- v2v/rhv-upload-plugin.py | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 4e48033e4..3f0ec7b2e 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -206,13 +206,12 @@ def open(readonly): 'can_zero': can_zero, 'connection': connection, 'disk_id': disk.id, + 'transfer': transfer, 'failed': False, 'highestwrite': 0, 'http': http, 'needs_auth': needs_auth, 'path': destination_url.path, - 'transfer': transfer, - 'transfer_service': transfer_service, } def can_trim(h): @@ -405,7 +404,11 @@ def flush(h): def close(h): http = h['http'] connection = h['connection'] - transfer_service = h['transfer_service'] + transfer = h['transfer'] + + transfer_service = (connection.system_service() + .image_transfers_service() + .image_transfer_service(transfer.id)) # This is sometimes necessary because python doesn't set up # sys.stderr to be line buffered and so debug, errors or -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 13/18] rhv-upload: Get host before creating disk
If getting the host object raised, we forgot to remove the disk. Getting the host object before creating the disk avoids the error handling, and will will fail faster on errors, without waiting until the disk is ready. This will help to extract the code for creating transfer out of open(). --- v2v/rhv-upload-plugin.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 3f0ec7b2e..6e0b609e7 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -77,14 +77,15 @@ def open(readonly): insecure = params['insecure'], ) + # Use the local host is possible. + host = find_host(connection) if params['rhv_direct'] else None disk = create_disk(connection) # Get a reference to the transfer service. system_service = connection.system_service() transfers_service = system_service.image_transfers_service() - # Create a new image transfer, using the local host is possible. - host = find_host(connection) if params['rhv_direct'] else None + # Create a new image transfer. transfer = transfers_service.add( types.ImageTransfer( disk = types.Disk(id = disk.id), -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 14/18] rhv-upload: Extract create_transfer() function
Creating a transfer is complex and clumsy. Extracting the code to a helper function will clean up open() and make it easier to improve error handling. --- v2v/rhv-upload-plugin.py | 77 ++++++++++++++++++++++------------------ 1 file changed, 42 insertions(+), 35 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 6e0b609e7..0186e3566 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -80,41 +80,7 @@ def open(readonly): # Use the local host is possible. host = find_host(connection) if params['rhv_direct'] else None disk = create_disk(connection) - - # Get a reference to the transfer service. - system_service = connection.system_service() - transfers_service = system_service.image_transfers_service() - - # Create a new image transfer. - transfer = transfers_service.add( - types.ImageTransfer( - disk = types.Disk(id = disk.id), - host = host, - inactivity_timeout = 3600, - ) - ) - - # At this point the transfer owns the disk and will delete the disk if the - # transfer is canceled, or if finalizing the transfer fails. - - debug("transfer.id = %r" % transfer.id) - - # Get a reference to the created transfer service. - transfer_service = transfers_service.image_transfer_service(transfer.id) - - # After adding a new transfer for the disk, the transfer's status - # will be INITIALIZING. Wait until the init phase is over. The - # actual transfer can start when its status is "Transferring". - endt = time.time() + timeout - while True: - transfer = transfer_service.get() - if transfer.phase != types.ImageTransferPhase.INITIALIZING: - break - if time.time() > endt: - transfer_service.cancel() - raise RuntimeError("timed out waiting for transfer status " - "!= INITIALIZING") - time.sleep(1) + transfer = create_transfer(connection, disk, host) # Now we have permission to start the transfer. if params['rhv_direct']: @@ -586,3 +552,44 @@ def create_disk(connection): "timed out waiting for disk %s to become unlocked" % disk.id) return disk + +def create_transfer(connection, disk, host): + """ + Create image transfer and wait until the transfer is ready. + + Returns a transfer object. + """ + system_service = connection.system_service() + transfers_service = system_service.image_transfers_service() + + transfer = transfers_service.add( + types.ImageTransfer( + disk = types.Disk(id = disk.id), + host = host, + inactivity_timeout = 3600, + ) + ) + + # At this point the transfer owns the disk and will delete the disk if the + # transfer is canceled, or if finalizing the transfer fails. + + debug("transfer.id = %r" % transfer.id) + + # Get a reference to the created transfer service. + transfer_service = transfers_service.image_transfer_service(transfer.id) + + # After adding a new transfer for the disk, the transfer's status + # will be INITIALIZING. Wait until the init phase is over. The + # actual transfer can start when its status is "Transferring". + endt = time.time() + timeout + while True: + transfer = transfer_service.get() + if transfer.phase != types.ImageTransferPhase.INITIALIZING: + break + if time.time() > endt: + transfer_service.cancel() + raise RuntimeError("timed out waiting for transfer status " + "!= INITIALIZING") + time.sleep(1) + + return transfer -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 15/18] rhv-upload: Extract imageio helpers
Starting an upload to imageio is complex. Starting extracting helpers to make the flow more clear: - parse_transfer_url(): Select and parse the transfer url. - create_http(): Create http connection for the selected url. Cleanup error handling by adding a try block after creating the transfer. Every error in this try block will cancel the transfer before raising the error. When the refactoring is done, all the code in open() will be protected by this try block. --- v2v/rhv-upload-plugin.py | 74 +++++++++++++++++++++++----------------- 1 file changed, 42 insertions(+), 32 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 0186e3566..f04495d1e 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -80,40 +80,14 @@ def open(readonly): # Use the local host is possible. host = find_host(connection) if params['rhv_direct'] else None disk = create_disk(connection) - transfer = create_transfer(connection, disk, host) - # Now we have permission to start the transfer. - if params['rhv_direct']: - if transfer.transfer_url is None: - transfer_service.cancel() - raise RuntimeError("direct upload to host not supported, " - "requires ovirt-engine >= 4.2 and only works " - "when virt-v2v is run within the oVirt/RHV " - "environment, eg. on an oVirt node.") - destination_url = urlparse(transfer.transfer_url) - else: - destination_url = urlparse(transfer.proxy_url) - - if destination_url.scheme == "https": - context = \ - ssl.create_default_context(purpose = ssl.Purpose.SERVER_AUTH, - cafile = params['rhv_cafile']) - if params['insecure']: - context.check_hostname = False - context.verify_mode = ssl.CERT_NONE - http = HTTPSConnection( - destination_url.hostname, - destination_url.port, - context = context - ) - elif destination_url.scheme == "http": - http = HTTPConnection( - destination_url.hostname, - destination_url.port, - ) - else: + transfer = create_transfer(connection, disk, host) + try: + destination_url = parse_transfer_url(transfer) + http = create_http(destination_url) + except: transfer_service.cancel() - raise RuntimeError("unknown URL scheme (%s)" % destination_url.scheme) + raise # The first request is to fetch the features of the server. @@ -593,3 +567,39 @@ def create_transfer(connection, disk, host): time.sleep(1) return transfer + +# oVirt imageio operations + +def parse_transfer_url(transfer): + """ + Returns a parsed transfer url, preferring direct transfer if possible. + """ + if params['rhv_direct']: + if transfer.transfer_url is None: + raise RuntimeError("direct upload to host not supported, " + "requires ovirt-engine >= 4.2 and only works " + "when virt-v2v is run within the oVirt/RHV " + "environment, eg. on an oVirt node.") + return urlparse(transfer.transfer_url) + else: + return urlparse(transfer.proxy_url) + +def create_http(url): + """ + Create http connection for transfer url. + + Returns HTTPConnction. + """ + if url.scheme == "https": + context = \ + ssl.create_default_context(purpose = ssl.Purpose.SERVER_AUTH, + cafile = params['rhv_cafile']) + if params['insecure']: + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE + + return HTTPSConnection(url.hostname, url.port, context = context) + elif url.scheme == "http": + return HTTPConnection(url.hostname, url.port) + else: + raise RuntimeError("unknown URL scheme (%s)" % url.scheme) -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 16/18] rhv-upload: Extract get_options() helper
Extract the code for sending OPTIONS request and handling response for new and old imageio daemon and proxy to a helper function. The response from imageio is normalized as a dict to make it easier to handle the various combinations. --- v2v/rhv-upload-plugin.py | 93 +++++++++++++++++++++------------------- 1 file changed, 48 insertions(+), 45 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index f04495d1e..2abd304bb 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -85,73 +85,40 @@ def open(readonly): try: destination_url = parse_transfer_url(transfer) http = create_http(destination_url) + options = get_options(http, destination_url) except: transfer_service.cancel() raise - # The first request is to fetch the features of the server. - - # Authentication was needed only for GET and PUT requests when - # communicating with old imageio-proxy. - needs_auth = not params['rhv_direct'] - - can_flush = False - can_trim = False - can_zero = False - unix_socket = None - - http.request("OPTIONS", destination_url.path) - r = http.getresponse() - data = r.read() - - if r.status == 200: - # New imageio never needs authentication. - needs_auth = False - - j = json.loads(data) - can_flush = "flush" in j['features'] - can_trim = "trim" in j['features'] - can_zero = "zero" in j['features'] - unix_socket = j.get('unix_socket') - - # Old imageio servers returned either 405 Method Not Allowed or - # 204 No Content (with an empty body). If we see that we leave - # all the features as False and they will be emulated. - elif r.status == 405 or r.status == 204: - pass - - else: - transfer_service.cancel() - raise RuntimeError("could not use OPTIONS request: %d: %s" % - (r.status, r.reason)) - - debug("imageio features: flush=%r trim=%r zero=%r unix_socket=%r" % - (can_flush, can_trim, can_zero, unix_socket)) + debug("imageio features: flush=%(can_flush)r trim=%(can_trim)r " + "zero=%(can_zero)r unix_socket=%(unix_socket)r" + % options) # If we are connected to imageio on the local host and the # transfer features a unix_socket then we can reconnect to that. - if host is not None and unix_socket is not None: + if host is not None and options['unix_socket'] is not None: try: - http = UnixHTTPConnection(unix_socket) + http = UnixHTTPConnection(options['unix_socket']) except Exception as e: # Very unlikely failure, but we can recover by using the https # connection. debug("cannot create unix socket connection, using https: %s" % e) else: - debug("optimizing connection using unix socket %r" % unix_socket) + debug("optimizing connection using unix socket %r" + % options['unix_socket']) # Save everything we need to make requests in the handle. return { - 'can_flush': can_flush, - 'can_trim': can_trim, - 'can_zero': can_zero, + 'can_flush': options['can_flush'], + 'can_trim': options['can_trim'], + 'can_zero': options['can_zero'], + 'needs_auth': options['needs_auth'], 'connection': connection, 'disk_id': disk.id, 'transfer': transfer, 'failed': False, 'highestwrite': 0, 'http': http, - 'needs_auth': needs_auth, 'path': destination_url.path, } @@ -603,3 +570,39 @@ def create_http(url): return HTTPConnection(url.hostname, url.port) else: raise RuntimeError("unknown URL scheme (%s)" % url.scheme) + +def get_options(http, url): + """ + Send OPTIONS request to imageio server and return options dict. + """ + http.request("OPTIONS", url.path) + r = http.getresponse() + data = r.read() + + if r.status == 200: + j = json.loads(data) + features = j["features"] + return { + # New imageio never used authentication. + "needs_auth": False, + "can_flush": "flush" in features, + "can_trim": "trim" in features, + "can_zero": "zero" in features, + "unix_socket": j.get('unix_socket'), + } + + elif r.status == 405 or r.status == 204: + # Old imageio servers returned either 405 Method Not Allowed or + # 204 No Content (with an empty body). + return { + # Authentication was required only when using old imageio proxy. + # Can be removed when dropping support for oVirt < 4.2. + "needs_auth": not params['rhv_direct'], + "can_flush": False, + "can_trim": False, + "can_zero": False, + "unix_socket": None, + } + else: + raise RuntimeError("could not use OPTIONS request: %d: %s" % + (r.status, r.reason)) -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 17/18] rhv-upload: Extract optimize_http() helper
Extract the code for optimizing the http connection using unix socket to a helper function. Calling the new function inside the try block ensure that errors creating the connection will cancel the transfer. --- v2v/rhv-upload-plugin.py | 33 ++++++++++++++++++++------------- 1 file changed, 20 insertions(+), 13 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 2abd304bb..3755c34b7 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -86,6 +86,7 @@ def open(readonly): destination_url = parse_transfer_url(transfer) http = create_http(destination_url) options = get_options(http, destination_url) + http = optimize_http(http, host, options) except: transfer_service.cancel() raise @@ -94,19 +95,6 @@ def open(readonly): "zero=%(can_zero)r unix_socket=%(unix_socket)r" % options) - # If we are connected to imageio on the local host and the - # transfer features a unix_socket then we can reconnect to that. - if host is not None and options['unix_socket'] is not None: - try: - http = UnixHTTPConnection(options['unix_socket']) - except Exception as e: - # Very unlikely failure, but we can recover by using the https - # connection. - debug("cannot create unix socket connection, using https: %s" % e) - else: - debug("optimizing connection using unix socket %r" - % options['unix_socket']) - # Save everything we need to make requests in the handle. return { 'can_flush': options['can_flush'], @@ -606,3 +594,22 @@ def get_options(http, url): else: raise RuntimeError("could not use OPTIONS request: %d: %s" % (r.status, r.reason)) + +def optimize_http(http, host, options): + """ + Return an optimized http connection using unix socket if we are connected + to imageio server on the local host and it features a unix socket. + """ + unix_socket = options['unix_socket'] + + if host is not None and unix_socket is not None: + try: + http = UnixHTTPConnection(unix_socket) + except Exception as e: + # Very unlikely failure, but we can recover by using the https + # connection. + debug("cannot create unix socket connection, using https: %s" % e) + else: + debug("optimizing connection using unix socket %r" % unix_socket) + + return http -- 2.21.0
Nir Soffer
2019-Nov-17 23:04 UTC
[Libguestfs] [PATCH 18/18] rhv-upload: Clean up username and password
Extract functions for getting the username and password, cleaning up open(). --- v2v/rhv-upload-plugin.py | 27 ++++++++++++++++----------- 1 file changed, 16 insertions(+), 11 deletions(-) diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py index 3755c34b7..08b40cfcf 100644 --- a/v2v/rhv-upload-plugin.py +++ b/v2v/rhv-upload-plugin.py @@ -57,21 +57,26 @@ def debug(s): print(s, file=sys.stderr) sys.stderr.flush() -def open(readonly): - # Parse out the username from the output_conn URL. - parsed = urlparse(params['output_conn']) - username = parsed.username or "admin@internal" - - # Read the password from file. +def read_password(): + """ + Read the password from file. + """ with builtins.open(params['output_password'], 'r') as fp: - password = fp.read() - password = password.rstrip() + data = fp.read() + return data.rstrip() - # Connect to the server. +def parse_username(): + """ + Parse out the username from the output_conn URL. + """ + parsed = urlparse(params['output_conn']) + return parsed.username or "admin@internal" + +def open(readonly): connection = sdk.Connection( url = params['output_conn'], - username = username, - password = password, + username = parse_username(), + password = read_password(), ca_file = params['rhv_cafile'], log = logging.getLogger(), insecure = params['insecure'], -- 2.21.0
Nir Soffer
2019-Nov-17 23:10 UTC
Re: [Libguestfs] [PATCH 00/18] rvh-upload: Various fixes and cleanups
On Mon, Nov 18, 2019 at 1:04 AM Nir Soffer <nirsof@gmail.com> wrote:> > This series extract oVirt SDK and imageio code to make it eaiser to follow the > code and improve error handing in open() and close(). > > The first small patches can be consider as fixes for downstream.I forgot to add Pino, who also did some fixes in this area.> > Tested based on libguestfs v1.41.5, since I had trouble building virt-v2v and > libguestfs from master. > > Nir Soffer (18): > rhv-upload: Remove unused exception class > rhv-upload: Check status more frequently > rhv-upload: Don't flush() in close() > rhv-upload: Ensure connection is closed in close() > rhv-upload: Ensure http is closed in close() > rhv-upload: Fix cleanup after errorsPino, this completes changes you did recently for canceling transfer.> rhv-upload: Group oVirt SDK functions > rhv-upload: Don't keep disk_service in handle > rhv-upload: Extract create_disk() function > rhv-upload: Try to remove disk on timeout > rhv-upload: Keep disk_id in handle > rhv-upload: Don't keep transfer_service in handle > rhv-upload: Get host before creating disk > rhv-upload: Extract create_transfer() function > rhv-upload: Extract imageio helpers > rhv-upload: Extract get_options() helper > rhv-upload: Extract optimize_http() helper > rhv-upload: Clean up username and password > > v2v/rhv-upload-plugin.py | 550 +++++++++++++++++++++------------------ > 1 file changed, 295 insertions(+), 255 deletions(-) > > -- > 2.21.0 >
Nir Soffer
2019-Nov-17 23:50 UTC
Re: [Libguestfs] [PATCH 02/18] rhv-upload: Check status more frequently
Daniel, please review this one. On Mon, Nov 18, 2019 at 1:04 AM Nir Soffer <nirsof@gmail.com> wrote:> > Checking status more frequently save a couple of seconds. Here is > an example flow tested with oVirt upload_disk.py example: > > With 5 seconds wait: > > Created disk in 11.085111 seconds > Created transfer in 1.857502 seconds > > With 1 second wait: > > Created disk in 4.991227 seconds > Created transfer in 1.961243 seconds > --- > v2v/rhv-upload-plugin.py | 4 ++-- > 1 file changed, 2 insertions(+), 2 deletions(-) > > diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py > index fdd2012f5..472d483f2 100644 > --- a/v2v/rhv-upload-plugin.py > +++ b/v2v/rhv-upload-plugin.py > @@ -161,7 +161,7 @@ def open(readonly): > > endt = time.time() + timeout > while True: > - time.sleep(5) > + time.sleep(1) > disk = disk_service.get() > if disk.status == types.DiskStatus.OK: > break > @@ -197,7 +197,7 @@ def open(readonly): > transfer_service.cancel() > raise RuntimeError("timed out waiting for transfer status " > "!= INITIALIZING") > - time.sleep(5) > + time.sleep(1) > > # Now we have permission to start the transfer. > if params['rhv_direct']: > -- > 2.21.0 >
Nir Soffer
2019-Nov-17 23:52 UTC
Re: [Libguestfs] [PATCH 06/18] rhv-upload: Fix cleanup after errors
On Mon, Nov 18, 2019 at 1:04 AM Nir Soffer <nirsof@gmail.com> wrote:> > When request failed, we paused the transfer. This is not needed since > our intent it to cancel the transfer. > > When closing after failure, we canceled the transfer and removed the > disk. This is not needed since the transfer owns the disk and will > remove it when canceled. > > When finalizing times out, we canceled the transfer and removed the > disk. This is not needed since the transfer will clean it self, and > likely to fail because cancelling is not possible after finalizing.Daniel, can you confirm my assumptions about engine behaviour?> --- > v2v/rhv-upload-plugin.py | 37 +++++++++++++++---------------------- > 1 file changed, 15 insertions(+), 22 deletions(-) > > diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py > index 5bdfdf49f..1f42c4a55 100644 > --- a/v2v/rhv-upload-plugin.py > +++ b/v2v/rhv-upload-plugin.py > @@ -180,6 +180,10 @@ def open(readonly): > inactivity_timeout = 3600, > ) > ) > + > + # At this point the transfer owns the disk and will delete the disk if the > + # transfer is canceled, or if finalizing the transfer fails. > + > debug("transfer.id = %r" % transfer.id) > > # Get a reference to the created transfer service. > @@ -309,15 +313,12 @@ def can_flush(h): > def get_size(h): > return params['disk_size'] > > -# Any unexpected HTTP response status from the server will end up > -# calling this function which logs the full error, pauses the > -# transfer, sets the failed state, and raises a RuntimeError > -# exception. > +# Any unexpected HTTP response status from the server will end up calling this > +# function which logs the full error, sets the failed state, and raises a > +# RuntimeError exception. > def request_failed(h, r, msg): > - # Setting the failed flag in the handle causes the disk to be > - # cleaned up on close. > + # Setting the failed flag in the handle will cancel the transfer on close. > h['failed'] = True > - h['transfer_service'].pause() > > status = r.status > reason = r.reason > @@ -490,15 +491,10 @@ def flush(h): > > r.read() > > -def delete_disk_on_failure(h): > - transfer_service = h['transfer_service'] > - transfer_service.cancel() > - disk_service = h['disk_service'] > - disk_service.remove() > - > def close(h): > http = h['http'] > connection = h['connection'] > + transfer_service = h['transfer_service'] > > # This is sometimes necessary because python doesn't set up > # sys.stderr to be line buffered and so debug, errors or > @@ -508,15 +504,17 @@ def close(h): > > http.close() > > - # If the connection failed earlier ensure we clean up the disk. > + # If the connection failed earlier ensure we cancel the trasfer. Canceling > + # the transfer will delete the disk. > if h['failed']: > - delete_disk_on_failure(h) > - connection.close() > + try: > + transfer_service.cancel() > + finally: > + connection.close() > return > > try: > disk = h['disk'] > - transfer_service = h['transfer_service'] > > transfer_service.finalize() > > @@ -548,11 +546,6 @@ def close(h): > with builtins.open(params['diskid_file'], 'w') as fp: > fp.write(disk.id) > > - except: > - # Otherwise on any failure we must clean up the disk. > - delete_disk_on_failure(h) > - raise > - > finally: > connection.close() > > -- > 2.21.0 >
Nir Soffer
2019-Nov-17 23:53 UTC
Re: [Libguestfs] [PATCH 10/18] rhv-upload: Try to remove disk on timeout
On Mon, Nov 18, 2019 at 1:05 AM Nir Soffer <nirsof@gmail.com> wrote:> > If waiting for the disk to become OK times out, try to remove it. This > is likely to fail.Daniel, do you think this can succeed?> --- > v2v/rhv-upload-plugin.py | 7 ++++++- > 1 file changed, 6 insertions(+), 1 deletion(-) > > diff --git a/v2v/rhv-upload-plugin.py b/v2v/rhv-upload-plugin.py > index 9b433bd7e..e79e7ddf7 100644 > --- a/v2v/rhv-upload-plugin.py > +++ b/v2v/rhv-upload-plugin.py > @@ -575,6 +575,11 @@ def create_disk(connection): > if disk.status == types.DiskStatus.OK: > break > if time.time() > endt: > - raise RuntimeError("timed out waiting for disk to become unlocked") > + try: > + disk_service.remove() > + except Exception as e: > + debug("error removing disk %s: %s" % (disk.id, e)) > + raise RuntimeError( > + "timed out waiting for disk %s to become unlocked" % disk.id) > > return disk > -- > 2.21.0 >
Richard W.M. Jones
2019-Nov-18 10:28 UTC
Re: [Libguestfs] [PATCH 00/18] rvh-upload: Various fixes and cleanups
The series looks fine to me, ACK. I pushed a few of the obvious fixes. I didn't push all of them yet because it sounds like you want Daniel to take a look. Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com Fedora Windows cross-compiler. Compile Windows programs, test, and build Windows installers. Over 100 libraries supported. http://fedoraproject.org/wiki/MinGW
Reasonably Related Threads
- [PATCH 00/18] rvh-upload: Various fixes and cleanups
- [PATCH] v2v: rhv-upload-plugin - improve wait logic after finalize (RHBZ#1680361)
- [PATCH] v2v: rhv-upload-plugin - improve wait logic after finalize
- Re: [PATCH 02/18] rhv-upload: Check status more frequently
- Re: [PATCH v2] v2v: rhv-upload-plugin - improve wait logic after finalize (RHBZ#1680361)