Martin Kletzander
2019-Aug-26 11:36 UTC
Re: [Libguestfs] [PATCH disk-sync 5/5] Convert disk_sync inner loop to asynchronous.
On Thu, Aug 22, 2019 at 03:39:35PM +0100, Richard W.M. Jones wrote:>Previously the inner loop would issue nbd.pread() requests >synchronously, meaning that we would issue a request for each data >block from the nbdkit server (which would in turn synchronously >request the data from VMware) and wait until nbdkit replies before >continuing. > >This converts the inner loop so it issues as many pread requests >asychronously to nbdkit as the server can handle (any extra are queued >in nbd_handle). The server will answer these in parallel and possibly >out of order. > >This results in somewhat better throughput (for me: 13 minutes down to >5 minutes for an "initial" run). Although unfortunately we are still >limited by VDDK's braindead threading model. >--- > wrapper/disk_sync.py | 55 +++++++++++++++++++++++++++++++------------- > 1 file changed, 39 insertions(+), 16 deletions(-) > >diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py >index e655ead..e854009 100644 >--- a/wrapper/disk_sync.py >+++ b/wrapper/disk_sync.py >@@ -409,6 +409,26 @@ def get_block_status(nbd_handle, extent): > return blocks > > >+# This is called back when nbd_aio_pread completes. >+def read_completed(fd, buf, offset, err): >+ logging.debug('Writing %d B to offset %d B' % (buf.size(), offset)) >+ os.pwrite(fd, buf.to_bytearray(), offset) >+ # By returning 1 here we auto-retire the aio_pread command. >+ return 1 >+ >+ >+# Process any AIO requests without blocking. >+def process_aio_requests(nbd_handle): >+ while nbd_handle.poll(0) == 1: >+ pass >+ >+ >+# Block until all AIO commands on the handle have finished. >+def wait_for_aio_commands_to_finish(nbd_handle): >+ while nbd_handle.aio_in_flight() > 0: >+ nbd_handle.poll(-1) >+ >+ > def sync_data(): > state = State().instance > for key, disk in state['disks'].items(): >@@ -491,25 +511,28 @@ def sync_data(): > (block['length'], block['offset'])) > # Optimize for memory usage, maybe? > os.pwrite(fd, [0] * block['length'], block['offset']) >- copied += block['length'] >- disk['progress']['copied'] = copied >- state.write() > else: >- wrote = 0 >- while wrote < block['length']: >- length = min(block['length'] - wrote, MAX_PREAD_LEN) >- offset = block['offset'] + wrote >+ count = 0 >+ while count < block['length']: >+ length = min(block['length'] - count, MAX_PREAD_LEN) >+ offset = block['offset'] + count >+ > logging.debug('Reading %d B from offset %d B' % > (length, offset)) >- # Ideally use mmap() without any temporary buffer >- data = nbd_handle.pread(length, offset) >- logging.debug('Writing %d B to offset %d B' % >- (length, offset)) >- os.pwrite(fd, data, offset) >- copied += length >- wrote += length >- disk['progress']['copied'] = copied >- state.write() >+ buf = nbd.Buffer(length) >+ nbd_handle.aio_pread( >+ buf, offset, >+ lambda err, fd=fd, buf=buf, offset=offset: >+ read_completed(fd, buf, offset, err))If the order of parameters is changed, there is no need for the anonymous function here, but that's just a small thing I noticed.>+ count += length >+ >+ process_aio_requests(nbd_handle)In order to allow less requests in flight, would it be enough to just do something like this here (similarly to wait_for_aio_commands_to_finish)? while nbd_handle.aio_in_flight() > NUM_IN_FLIGHT: nbd_handle.poll(-1) Also, I presume all of the locking is left to libnbd to be done (and as you can see I don't concern myself with any locking in the whole file), but if that was to be improved, is there some python part that would require it? For example when cleaning up the code? Thanks for the fix, I do not know why I was expecting something way more complicated.>+ >+ copied += block['length'] >+ disk['progress']['copied'] = copied >+ state.write() >+ >+ wait_for_aio_commands_to_finish(nbd_handle) > > if copied == 0: > logging.debug('Nothing to copy for disk: %s(key=%s)' % >-- >2.22.0 >
Richard W.M. Jones
2019-Aug-27 09:00 UTC
Re: [Libguestfs] [PATCH disk-sync 5/5] Convert disk_sync inner loop to asynchronous.
On Mon, Aug 26, 2019 at 01:36:41PM +0200, Martin Kletzander wrote:> On Thu, Aug 22, 2019 at 03:39:35PM +0100, Richard W.M. Jones wrote: > >Previously the inner loop would issue nbd.pread() requests > >synchronously, meaning that we would issue a request for each data > >block from the nbdkit server (which would in turn synchronously > >request the data from VMware) and wait until nbdkit replies before > >continuing. > > > >This converts the inner loop so it issues as many pread requests > >asychronously to nbdkit as the server can handle (any extra are queued > >in nbd_handle). The server will answer these in parallel and possibly > >out of order. > > > >This results in somewhat better throughput (for me: 13 minutes down to > >5 minutes for an "initial" run). Although unfortunately we are still > >limited by VDDK's braindead threading model. > >--- > >wrapper/disk_sync.py | 55 +++++++++++++++++++++++++++++++------------- > >1 file changed, 39 insertions(+), 16 deletions(-) > > > >diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py > >index e655ead..e854009 100644 > >--- a/wrapper/disk_sync.py > >+++ b/wrapper/disk_sync.py > >@@ -409,6 +409,26 @@ def get_block_status(nbd_handle, extent): > > return blocks > > > > > >+# This is called back when nbd_aio_pread completes. > >+def read_completed(fd, buf, offset, err): > >+ logging.debug('Writing %d B to offset %d B' % (buf.size(), offset)) > >+ os.pwrite(fd, buf.to_bytearray(), offset) > >+ # By returning 1 here we auto-retire the aio_pread command. > >+ return 1 > >+ > >+ > >+# Process any AIO requests without blocking. > >+def process_aio_requests(nbd_handle): > >+ while nbd_handle.poll(0) == 1: > >+ pass > >+ > >+ > >+# Block until all AIO commands on the handle have finished. > >+def wait_for_aio_commands_to_finish(nbd_handle): > >+ while nbd_handle.aio_in_flight() > 0: > >+ nbd_handle.poll(-1) > >+ > >+ > >def sync_data(): > > state = State().instance > > for key, disk in state['disks'].items(): > >@@ -491,25 +511,28 @@ def sync_data(): > > (block['length'], block['offset'])) > > # Optimize for memory usage, maybe? > > os.pwrite(fd, [0] * block['length'], block['offset']) > >- copied += block['length'] > >- disk['progress']['copied'] = copied > >- state.write() > > else: > >- wrote = 0 > >- while wrote < block['length']: > >- length = min(block['length'] - wrote, MAX_PREAD_LEN) > >- offset = block['offset'] + wrote > >+ count = 0 > >+ while count < block['length']: > >+ length = min(block['length'] - count, MAX_PREAD_LEN) > >+ offset = block['offset'] + count > >+ > > logging.debug('Reading %d B from offset %d B' % > > (length, offset)) > >- # Ideally use mmap() without any temporary buffer > >- data = nbd_handle.pread(length, offset) > >- logging.debug('Writing %d B to offset %d B' % > >- (length, offset)) > >- os.pwrite(fd, data, offset) > >- copied += length > >- wrote += length > >- disk['progress']['copied'] = copied > >- state.write() > >+ buf = nbd.Buffer(length) > >+ nbd_handle.aio_pread( > >+ buf, offset, > >+ lambda err, fd=fd, buf=buf, offset=offset: > >+ read_completed(fd, buf, offset, err)) > > If the order of parameters is changed, there is no need for the anonymous > function here, but that's just a small thing I noticed.The initialized parameters seem to be needed to work around a bug (more properly "serious implementation flaw") in Python: https://stackoverflow.com/questions/2295290> >+ count += length > >+ > >+ process_aio_requests(nbd_handle) > > In order to allow less requests in flight, would it be enough to just do > something like this here (similarly to wait_for_aio_commands_to_finish)? > > while nbd_handle.aio_in_flight() > NUM_IN_FLIGHT: > nbd_handle.poll(-1)Yes we could do this. The eventual solution may be to replace the whole loop with a polling loop, but this would be a more invasive change. For comparison of this approach see: https://github.com/libguestfs/libnbd/blob/7ef893735937cd7ae62d0b41171ec14195ef2710/examples/threaded-reads-and-writes.c#L234-L307> Also, I presume all of the locking is left to libnbd to be done (and > as you can see I don't concern myself with any locking in the whole > file), but if that was to be improved, is there some python part > that would require it? For example when cleaning up the code?There's a lock automatically held whenever any method on nbd_handle is called. I don't believe any other locking is needed. In any case it's all single-threaded, libnbd does not create any threads itself. Rich. -- Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones Read my programming and virtualization blog: http://rwmj.wordpress.com virt-df lists disk usage of guests without needing to install any software inside the virtual machine. Supports Linux and Windows. http://people.redhat.com/~rjones/virt-df/
Martin Kletzander
2019-Aug-27 14:49 UTC
Re: [Libguestfs] [PATCH disk-sync 5/5] Convert disk_sync inner loop to asynchronous.
On Tue, Aug 27, 2019 at 10:00:59AM +0100, Richard W.M. Jones wrote:>On Mon, Aug 26, 2019 at 01:36:41PM +0200, Martin Kletzander wrote: >> On Thu, Aug 22, 2019 at 03:39:35PM +0100, Richard W.M. Jones wrote: >> >Previously the inner loop would issue nbd.pread() requests >> >synchronously, meaning that we would issue a request for each data >> >block from the nbdkit server (which would in turn synchronously >> >request the data from VMware) and wait until nbdkit replies before >> >continuing. >> > >> >This converts the inner loop so it issues as many pread requests >> >asychronously to nbdkit as the server can handle (any extra are queued >> >in nbd_handle). The server will answer these in parallel and possibly >> >out of order. >> > >> >This results in somewhat better throughput (for me: 13 minutes down to >> >5 minutes for an "initial" run). Although unfortunately we are still >> >limited by VDDK's braindead threading model. >> >--- >> >wrapper/disk_sync.py | 55 +++++++++++++++++++++++++++++++------------- >> >1 file changed, 39 insertions(+), 16 deletions(-) >> > >> >diff --git a/wrapper/disk_sync.py b/wrapper/disk_sync.py >> >index e655ead..e854009 100644 >> >--- a/wrapper/disk_sync.py >> >+++ b/wrapper/disk_sync.py >> >@@ -409,6 +409,26 @@ def get_block_status(nbd_handle, extent): >> > return blocks >> > >> > >> >+# This is called back when nbd_aio_pread completes. >> >+def read_completed(fd, buf, offset, err): >> >+ logging.debug('Writing %d B to offset %d B' % (buf.size(), offset)) >> >+ os.pwrite(fd, buf.to_bytearray(), offset) >> >+ # By returning 1 here we auto-retire the aio_pread command. >> >+ return 1 >> >+ >> >+ >> >+# Process any AIO requests without blocking. >> >+def process_aio_requests(nbd_handle): >> >+ while nbd_handle.poll(0) == 1: >> >+ pass >> >+ >> >+ >> >+# Block until all AIO commands on the handle have finished. >> >+def wait_for_aio_commands_to_finish(nbd_handle): >> >+ while nbd_handle.aio_in_flight() > 0: >> >+ nbd_handle.poll(-1) >> >+ >> >+ >> >def sync_data(): >> > state = State().instance >> > for key, disk in state['disks'].items(): >> >@@ -491,25 +511,28 @@ def sync_data(): >> > (block['length'], block['offset'])) >> > # Optimize for memory usage, maybe? >> > os.pwrite(fd, [0] * block['length'], block['offset']) >> >- copied += block['length'] >> >- disk['progress']['copied'] = copied >> >- state.write() >> > else: >> >- wrote = 0 >> >- while wrote < block['length']: >> >- length = min(block['length'] - wrote, MAX_PREAD_LEN) >> >- offset = block['offset'] + wrote >> >+ count = 0 >> >+ while count < block['length']: >> >+ length = min(block['length'] - count, MAX_PREAD_LEN) >> >+ offset = block['offset'] + count >> >+ >> > logging.debug('Reading %d B from offset %d B' % >> > (length, offset)) >> >- # Ideally use mmap() without any temporary buffer >> >- data = nbd_handle.pread(length, offset) >> >- logging.debug('Writing %d B to offset %d B' % >> >- (length, offset)) >> >- os.pwrite(fd, data, offset) >> >- copied += length >> >- wrote += length >> >- disk['progress']['copied'] = copied >> >- state.write() >> >+ buf = nbd.Buffer(length) >> >+ nbd_handle.aio_pread( >> >+ buf, offset, >> >+ lambda err, fd=fd, buf=buf, offset=offset: >> >+ read_completed(fd, buf, offset, err)) >> >> If the order of parameters is changed, there is no need for the anonymous >> function here, but that's just a small thing I noticed. > >The initialized parameters seem to be needed to work around a bug >(more properly "serious implementation flaw") in Python: > >https://stackoverflow.com/questions/2295290 >Oh, wow, that's really good to know.>> >+ count += length >> >+ >> >+ process_aio_requests(nbd_handle) >> >> In order to allow less requests in flight, would it be enough to just do >> something like this here (similarly to wait_for_aio_commands_to_finish)? >> >> while nbd_handle.aio_in_flight() > NUM_IN_FLIGHT: >> nbd_handle.poll(-1) > >Yes we could do this. > >The eventual solution may be to replace the whole loop with a polling >loop, but this would be a more invasive change. For comparison of >this approach see: > >https://github.com/libguestfs/libnbd/blob/7ef893735937cd7ae62d0b41171ec14195ef2710/examples/threaded-reads-and-writes.c#L234-L307 > >> Also, I presume all of the locking is left to libnbd to be done (and >> as you can see I don't concern myself with any locking in the whole >> file), but if that was to be improved, is there some python part >> that would require it? For example when cleaning up the code? > >There's a lock automatically held whenever any method on nbd_handle is >called. I don't believe any other locking is needed. In any case >it's all single-threaded, libnbd does not create any threads itself. > >Rich. > >-- >Richard Jones, Virtualization Group, Red Hat http://people.redhat.com/~rjones >Read my programming and virtualization blog: http://rwmj.wordpress.com >virt-df lists disk usage of guests without needing to install any >software inside the virtual machine. Supports Linux and Windows. >http://people.redhat.com/~rjones/virt-df/
Maybe Matching Threads
- [PATCH disk-sync 5/5] Convert disk_sync inner loop to asynchronous.
- Re: [PATCH disk-sync 5/5] Convert disk_sync inner loop to asynchronous.
- [PATCH disk-sync 0/5] Misc cleanups and convert inner loop to asynch.
- [PATCH libnbd] api: Rename nbd_aio_*_callback to nbd_aio_*.
- [PATCH disk-sync 4/5] Require libnbd >= 0.9.8 and fail hard if it's an earlier version.