Richard W.M. Jones
2019-Aug-22 14:39 UTC
[Libguestfs] [PATCH disk-sync 0/5] Misc cleanups and convert inner loop to asynch.
This is based on top of: https://github.com/nertpinx/v2v-conversion-host/commit/0bb2efdcacd975a2cae7380080991ac7fc238d2b The first 4 patches are fairly uncontroversial miscellaneous cleanups. Patch 5 is the interesting one. (Note it doesn't quite work yet, so it's for discussion only.) Patch 5 converts the inner loop to use asynchronous libnbd calls. performance improves quite a bit for me -- about 13 minutes down to 5 minutes for an "initial" run on a moderate sized Linux VM. We do this by changing the read call from nbd.pread to nbd.aio_pread and moving the writing code into a completion callback which runs when the NBD_CMD_READ has been executed by nbdkit. The problem with this patch, which is why I say it's for discussion only, is that we need to change it to throttle the number of commands in flight. Issuing large numbers of commands isn't in itself a problem. However with each command is an associated NBD.Buffer, and so the effect at the moment is that we need to allocate enough memory up front to store the whole disk image(!) By throttling the commands we can control exactly how much memory is used, and indeed control the trade-off between memory and parallelism. I checked the MD5 of the disk before and after and they were unchanged (my VM is turned off). Rich.
Richard W.M. Jones
2019-Aug-22 14:39 UTC
[Libguestfs] [PATCH disk-sync 1/5] Fix LD_LIBRARY_PATH.
The path must point to either the lib32 or lib64 subdirectory. As it stands this patch ignores 32 bit because we basically don't care about that. --- wrapper/disk_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py index 109cbc3..42fbbaf 100644 --- a/wrapper/disk_sync.py +++ b/wrapper/disk_sync.py @@ -344,7 +344,7 @@ def get_nbdkit_cmd(disk, key): state = State().instance logging.debug('Generating nbdkit command') - env = 'LD_LIBRARY_PATH=%s' % VDDK_LIBDIR + env = 'LD_LIBRARY_PATH=%s/lib64' % VDDK_LIBDIR if 'LD_LIBRARY_PATH' in os.environ: env += ':' + os.environ['LD_LIBRARY_PATH'] -- 2.22.0
Richard W.M. Jones
2019-Aug-22 14:39 UTC
[Libguestfs] [PATCH disk-sync 2/5] Rearrange nbdkit parameters.
No real change here, but: * Put the -s and --exit-with-parent options first. Keep filters and plugin together. * Remove --newstyle option, it's not needed in any sufficiently recent version. --- wrapper/disk_sync.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py index 42fbbaf..55f2ba9 100644 --- a/wrapper/disk_sync.py +++ b/wrapper/disk_sync.py @@ -354,13 +354,12 @@ def get_nbdkit_cmd(disk, key): 'env', env, 'nbdkit', + '-s', + '--exit-with-parent', '--readonly', - '--newstyle', '--exportname=/', - '-s', '--filter=log', '--filter=cacheextents', - '--exit-with-parent', 'vddk', 'libdir=%s' % VDDK_LIBDIR, 'vm=moref=' + state['vm_moid'], -- 2.22.0
Richard W.M. Jones
2019-Aug-22 14:39 UTC
[Libguestfs] [PATCH disk-sync 3/5] Fix minor typo in debug message.
--- wrapper/disk_sync.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py index 55f2ba9..e830193 100644 --- a/wrapper/disk_sync.py +++ b/wrapper/disk_sync.py @@ -445,7 +445,7 @@ def sync_data(): for extent in state['internal']['disk_extents'][key]: # Skip over extents smaller than 1MB if extent.length < 1 << 20: - logging.debug('Skibbing block status for extent of size %d B at offset %d B' % + logging.debug('Skipping block status for extent of size %d B at offset %d B' % (extent.length, extent.start)) data_blocks.append({ 'offset': extent.start, -- 2.22.0
Richard W.M. Jones
2019-Aug-22 14:39 UTC
[Libguestfs] [PATCH disk-sync 4/5] Require libnbd >= 0.9.8 and fail hard if it's an earlier version.
This was the first version with the stable API and all the fixes required to make the Python bindings not crash when used asynchronously. --- wrapper/disk_sync.py | 11 +++++++++++ 1 file changed, 11 insertions(+) diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py index e830193..e655ead 100644 --- a/wrapper/disk_sync.py +++ b/wrapper/disk_sync.py @@ -4,6 +4,8 @@ from pyVmomi import vim from pyVim.connect import SmartConnect, Disconnect from pyVim.task import WaitForTask +from packaging import version + import nbd from six.moves.urllib.parse import urlparse, unquote @@ -15,6 +17,8 @@ import ssl import sys import json +NBD_MIN_VERSION = version.parse("0.9.8") + LOG_FORMAT_TIME = '[%(asctime)s] ' LOG_FORMAT_MSG = ': %(message)s' @@ -582,6 +586,13 @@ def main(): '''TODO: Add some description here ''' args = parse_args() + + nbd_version = version.parse(nbd.NBD().get_version()) + if nbd_version < NBD_MIN_VERSION: + logging.error("version on libnbd is too old. Version found = %s. Min version required = %s" % + (nbd_version, NBD_MIN_VERSION)) + sys.exit(1) + state = State(args).instance validate_state(args.sync_type) parse_input(args) -- 2.22.0
Richard W.M. Jones
2019-Aug-22 14:39 UTC
[Libguestfs] [PATCH disk-sync 5/5] Convert disk_sync inner loop to asynchronous.
Previously the inner loop would issue nbd.pread() requests synchronously, meaning that we would issue a request for each data block from the nbdkit server (which would in turn synchronously request the data from VMware) and wait until nbdkit replies before continuing. This converts the inner loop so it issues as many pread requests asychronously to nbdkit as the server can handle (any extra are queued in nbd_handle). The server will answer these in parallel and possibly out of order. This results in somewhat better throughput (for me: 13 minutes down to 5 minutes for an "initial" run). Although unfortunately we are still limited by VDDK's braindead threading model. --- wrapper/disk_sync.py | 55 +++++++++++++++++++++++++++++++------------- 1 file changed, 39 insertions(+), 16 deletions(-) diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py index e655ead..e854009 100644 --- a/wrapper/disk_sync.py +++ b/wrapper/disk_sync.py @@ -409,6 +409,26 @@ def get_block_status(nbd_handle, extent): return blocks +# This is called back when nbd_aio_pread completes. +def read_completed(fd, buf, offset, err): + logging.debug('Writing %d B to offset %d B' % (buf.size(), offset)) + os.pwrite(fd, buf.to_bytearray(), offset) + # By returning 1 here we auto-retire the aio_pread command. + return 1 + + +# Process any AIO requests without blocking. +def process_aio_requests(nbd_handle): + while nbd_handle.poll(0) == 1: + pass + + +# Block until all AIO commands on the handle have finished. +def wait_for_aio_commands_to_finish(nbd_handle): + while nbd_handle.aio_in_flight() > 0: + nbd_handle.poll(-1) + + def sync_data(): state = State().instance for key, disk in state['disks'].items(): @@ -491,25 +511,28 @@ def sync_data(): (block['length'], block['offset'])) # Optimize for memory usage, maybe? os.pwrite(fd, [0] * block['length'], block['offset']) - copied += block['length'] - disk['progress']['copied'] = copied - state.write() else: - wrote = 0 - while wrote < block['length']: - length = min(block['length'] - wrote, MAX_PREAD_LEN) - offset = block['offset'] + wrote + count = 0 + while count < block['length']: + length = min(block['length'] - count, MAX_PREAD_LEN) + offset = block['offset'] + count + logging.debug('Reading %d B from offset %d B' % (length, offset)) - # Ideally use mmap() without any temporary buffer - data = nbd_handle.pread(length, offset) - logging.debug('Writing %d B to offset %d B' % - (length, offset)) - os.pwrite(fd, data, offset) - copied += length - wrote += length - disk['progress']['copied'] = copied - state.write() + buf = nbd.Buffer(length) + nbd_handle.aio_pread( + buf, offset, + lambda err, fd=fd, buf=buf, offset=offset: + read_completed(fd, buf, offset, err)) + count += length + + process_aio_requests(nbd_handle) + + copied += block['length'] + disk['progress']['copied'] = copied + state.write() + + wait_for_aio_commands_to_finish(nbd_handle) if copied == 0: logging.debug('Nothing to copy for disk: %s(key=%s)' % -- 2.22.0
Martin Kletzander
2019-Aug-26 11:21 UTC
Re: [Libguestfs] [PATCH disk-sync 4/5] Require libnbd >= 0.9.8 and fail hard if it's an earlier version.
On Thu, Aug 22, 2019 at 03:39:34PM +0100, Richard W.M. Jones wrote:>This was the first version with the stable API and all the fixes >required to make the Python bindings not crash when used >asynchronously. >--- > wrapper/disk_sync.py | 11 +++++++++++ > 1 file changed, 11 insertions(+) > >diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py >index e830193..e655ead 100644 >--- a/wrapper/disk_sync.py >+++ b/wrapper/disk_sync.py >@@ -4,6 +4,8 @@ from pyVmomi import vim > from pyVim.connect import SmartConnect, Disconnect > from pyVim.task import WaitForTask > >+from packaging import version >+This introduces a new dependency, but it should be fine. I added it to requirements.txt.> import nbd > > from six.moves.urllib.parse import urlparse, unquote >@@ -15,6 +17,8 @@ import ssl > import sys > import json > >+NBD_MIN_VERSION = version.parse("0.9.8") >+ > LOG_FORMAT_TIME = '[%(asctime)s] ' > LOG_FORMAT_MSG = ': %(message)s' > >@@ -582,6 +586,13 @@ def main(): > '''TODO: Add some description here ''' > > args = parse_args() >+ >+ nbd_version = version.parse(nbd.NBD().get_version()) >+ if nbd_version < NBD_MIN_VERSION: >+ logging.error("version on libnbd is too old. Version found = %s. Min version required = %s" % >+ (nbd_version, NBD_MIN_VERSION)) >+ sys.exit(1) >+I moved this after logging is initialized at which point it can just use error(). Thanks.> state = State(args).instance > validate_state(args.sync_type) > parse_input(args) >-- >2.22.0 >
Martin Kletzander
2019-Aug-26 11:36 UTC
Re: [Libguestfs] [PATCH disk-sync 5/5] Convert disk_sync inner loop to asynchronous.
On Thu, Aug 22, 2019 at 03:39:35PM +0100, Richard W.M. Jones wrote:>Previously the inner loop would issue nbd.pread() requests >synchronously, meaning that we would issue a request for each data >block from the nbdkit server (which would in turn synchronously >request the data from VMware) and wait until nbdkit replies before >continuing. > >This converts the inner loop so it issues as many pread requests >asychronously to nbdkit as the server can handle (any extra are queued >in nbd_handle). The server will answer these in parallel and possibly >out of order. > >This results in somewhat better throughput (for me: 13 minutes down to >5 minutes for an "initial" run). Although unfortunately we are still >limited by VDDK's braindead threading model. >--- > wrapper/disk_sync.py | 55 +++++++++++++++++++++++++++++++------------- > 1 file changed, 39 insertions(+), 16 deletions(-) > >diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py >index e655ead..e854009 100644 >--- a/wrapper/disk_sync.py >+++ b/wrapper/disk_sync.py >@@ -409,6 +409,26 @@ def get_block_status(nbd_handle, extent): > return blocks > > >+# This is called back when nbd_aio_pread completes. >+def read_completed(fd, buf, offset, err): >+ logging.debug('Writing %d B to offset %d B' % (buf.size(), offset)) >+ os.pwrite(fd, buf.to_bytearray(), offset) >+ # By returning 1 here we auto-retire the aio_pread command. >+ return 1 >+ >+ >+# Process any AIO requests without blocking. >+def process_aio_requests(nbd_handle): >+ while nbd_handle.poll(0) == 1: >+ pass >+ >+ >+# Block until all AIO commands on the handle have finished. >+def wait_for_aio_commands_to_finish(nbd_handle): >+ while nbd_handle.aio_in_flight() > 0: >+ nbd_handle.poll(-1) >+ >+ > def sync_data(): > state = State().instance > for key, disk in state['disks'].items(): >@@ -491,25 +511,28 @@ def sync_data(): > (block['length'], block['offset'])) > # Optimize for memory usage, maybe? > os.pwrite(fd, [0] * block['length'], block['offset']) >- copied += block['length'] >- disk['progress']['copied'] = copied >- state.write() > else: >- wrote = 0 >- while wrote < block['length']: >- length = min(block['length'] - wrote, MAX_PREAD_LEN) >- offset = block['offset'] + wrote >+ count = 0 >+ while count < block['length']: >+ length = min(block['length'] - count, MAX_PREAD_LEN) >+ offset = block['offset'] + count >+ > logging.debug('Reading %d B from offset %d B' % > (length, offset)) >- # Ideally use mmap() without any temporary buffer >- data = nbd_handle.pread(length, offset) >- logging.debug('Writing %d B to offset %d B' % >- (length, offset)) >- os.pwrite(fd, data, offset) >- copied += length >- wrote += length >- disk['progress']['copied'] = copied >- state.write() >+ buf = nbd.Buffer(length) >+ nbd_handle.aio_pread( >+ buf, offset, >+ lambda err, fd=fd, buf=buf, offset=offset: >+ read_completed(fd, buf, offset, err))If the order of parameters is changed, there is no need for the anonymous function here, but that's just a small thing I noticed.>+ count += length >+ >+ process_aio_requests(nbd_handle)In order to allow less requests in flight, would it be enough to just do something like this here (similarly to wait_for_aio_commands_to_finish)? while nbd_handle.aio_in_flight() > NUM_IN_FLIGHT: nbd_handle.poll(-1) Also, I presume all of the locking is left to libnbd to be done (and as you can see I don't concern myself with any locking in the whole file), but if that was to be improved, is there some python part that would require it? For example when cleaning up the code? Thanks for the fix, I do not know why I was expecting something way more complicated.>+ >+ copied += block['length'] >+ disk['progress']['copied'] = copied >+ state.write() >+ >+ wait_for_aio_commands_to_finish(nbd_handle) > > if copied == 0: > logging.debug('Nothing to copy for disk: %s(key=%s)' % >-- >2.22.0 >
Apparently Analagous Threads
- Re: [PATCH disk-sync 5/5] Convert disk_sync inner loop to asynchronous.
- Re: [PATCH disk-sync 5/5] Convert disk_sync inner loop to asynchronous.
- [PATCH disk-sync 0/5] Misc cleanups and convert inner loop to asynch.
- [PATCH disk-sync 4/5] Require libnbd >= 0.9.8 and fail hard if it's an earlier version.
- Re: [PATCH libnbd] api: Rename nbd_aio_*_callback to nbd_aio_*.