Richard W.M. Jones
2019-Aug-14  21:05 UTC
[Libguestfs] [PATCH libnbd 0/3] Use free callback to hold ref to AIO buffer.
Basically the same as this patch series, but for Python: https://www.redhat.com/archives/libguestfs/2019-August/msg00235.html plus adding the 590 asynch test at the end. Rich.
Richard W.M. Jones
2019-Aug-14  21:05 UTC
[Libguestfs] [PATCH libnbd 1/3] python: Refactor user_data into a struct.
Simple refactoring to use a struct to store the function pointer
passed to the callback wrapper.
---
 generator/generator | 69 ++++++++++++++++++++++++++++++++-------------
 1 file changed, 50 insertions(+), 19 deletions(-)
diff --git a/generator/generator b/generator/generator
index 16729cd..f6a4856 100755
--- a/generator/generator
+++ b/generator/generator
@@ -3979,6 +3979,7 @@ let print_python_closure_wrapper { cbname; cbargs }   
C.print_cbarg_list cbargs;
   pr "\n";
   pr "{\n";
+  pr "  const struct user_data *data = user_data;\n";
   pr "  int ret = 0;\n";
   pr "\n";
   pr "  PyGILState_STATE py_save = PyGILState_UNLOCKED;\n";
@@ -4037,7 +4038,7 @@ let print_python_closure_wrapper { cbname; cbargs }    pr
"  if (PyEval_ThreadsInitialized ())\n";
   pr "    py_save = PyGILState_Ensure ();\n";
   pr "\n";
-  pr "  py_ret = PyObject_CallObject ((PyObject *)user_data,
py_args);\n";
+  pr "  py_ret = PyObject_CallObject (data->fn, py_args);\n";
   pr "\n";
   pr "  if (PyEval_ThreadsInitialized ())\n";
   pr "    PyGILState_Release (py_save);\n";
@@ -4074,13 +4075,6 @@ let print_python_closure_wrapper { cbname; cbargs }    )
cbargs;
   pr "  return ret;\n";
   pr "}\n";
-  pr "\n";
-  pr "/* Free for %s callback. */\n" cbname;
-  pr "static void\n";
-  pr "%s_free (void *user_data)\n" cbname;
-  pr "{\n";
-  pr "  Py_DECREF (user_data);\n";
-  pr "}\n";
   pr "\n"
 
 (* Generate the Python binding. *)
@@ -4106,8 +4100,12 @@ let print_python_binding name { args; optargs; ret;
may_set_error }            n;
        pr "  struct py_aio_buffer *%s_buf;\n" n
     | Closure { cbname } ->
-       pr "  nbd_%s_callback %s = { .callback = %s_wrapper, .free =
%s_free };\n"
-         cbname cbname cbname cbname
+       pr "  struct user_data *%s_user_data = alloc_user_data ();\n"
cbname;
+       pr "  if (%s_user_data == NULL) return NULL;\n" cbname;
+       pr "  nbd_%s_callback %s = { .callback = %s_wrapper,\n"
+         cbname cbname cbname;
+       pr "                         .user_data = %s_user_data,\n"
cbname;
+       pr "                         .free = free_user_data };\n"
     | Enum (n, _) -> pr "  int %s;\n" n
     | Flags (n, _) ->
        pr "  uint32_t %s_u32;\n" n;
@@ -4139,8 +4137,12 @@ let print_python_binding name { args; optargs; ret;
may_set_error }    List.iter (
     function
     | OClosure { cbname } ->
-       pr "  nbd_%s_callback %s = { .callback = %s_wrapper, .free =
%s_free };\n"
-         cbname cbname cbname cbname
+       pr "  struct user_data *%s_user_data = alloc_user_data ();\n"
cbname;
+       pr "  if (%s_user_data == NULL) return NULL;\n" cbname;
+       pr "  nbd_%s_callback %s = { .callback = %s_wrapper,\n"
+         cbname cbname cbname;
+       pr "                         .user_data = %s_user_data,\n"
cbname;
+       pr "                         .free = free_user_data };\n"
     | OFlags (n, _) ->
        pr "  uint32_t %s_u32;\n" n;
        pr "  unsigned int %s; /* really uint32_t */\n" n
@@ -4183,7 +4185,7 @@ let print_python_binding name { args; optargs; ret;
may_set_error }      | BytesIn (n, _) | BytesPersistIn (n, _)
     | BytesPersistOut (n, _) -> pr ", &%s" n
     | BytesOut (_, count) -> pr ", &%s" count
-    | Closure { cbname } -> pr ", &%s.user_data" cbname
+    | Closure { cbname } -> pr ", &%s_user_data->fn" cbname
     | Enum (n, _) -> pr ", &%s" n
     | Flags (n, _) -> pr ", &%s" n
     | Int n -> pr ", &%s" n
@@ -4198,7 +4200,7 @@ let print_python_binding name { args; optargs; ret;
may_set_error }    ) args;
   List.iter (
     function
-    | OClosure { cbname } -> pr ", &%s.user_data" cbname
+    | OClosure { cbname } -> pr ", &%s_user_data->fn"
cbname
     | OFlags (n, _) -> pr ", &%s" n
   ) optargs;
   pr "))\n";
@@ -4215,8 +4217,8 @@ let print_python_binding name { args; optargs; ret;
may_set_error }         pr "  %s_buf = nbd_internal_py_get_aio_buffer
(%s);\n" n n
     | Closure { cbname } ->
        pr "  /* Increment refcount since pointer may be saved by libnbd.
*/\n";
-       pr "  Py_INCREF (%s.user_data);\n" cbname;
-       pr "  if (!PyCallable_Check (%s.user_data)) {\n" cbname;
+       pr "  Py_INCREF (%s_user_data->fn);\n" cbname;
+       pr "  if (!PyCallable_Check (%s_user_data->fn)) {\n"
cbname;
        pr "    PyErr_SetString (PyExc_TypeError,\n";
        pr "                     \"callback parameter %s is not
callable\");\n" cbname;
        pr "    return NULL;\n";
@@ -4241,10 +4243,10 @@ let print_python_binding name { args; optargs; ret;
may_set_error }    List.iter (
     function
     | OClosure { cbname } ->
-       pr "  if (%s.user_data != Py_None) {\n" cbname;
+       pr "  if (%s_user_data->fn != Py_None) {\n" cbname;
        pr "    /* Increment refcount since pointer may be saved by libnbd.
*/\n";
-       pr "    Py_INCREF (%s.user_data);\n" cbname;
-       pr "    if (!PyCallable_Check (%s.user_data)) {\n" cbname;
+       pr "    Py_INCREF (%s_user_data->fn);\n" cbname;
+       pr "    if (!PyCallable_Check (%s_user_data->fn)) {\n"
cbname;
        pr "      PyErr_SetString (PyExc_TypeError,\n";
        pr "                       \"callback parameter %s is not
callable\");\n" cbname;
        pr "      return NULL;\n";
@@ -4377,6 +4379,35 @@ let generate_python_methods_c ()    pr "\n";
   pr "#include <methods.h>\n";
   pr "\n";
+
+  pr "/* This is passed to *_wrapper as the user_data pointer";
+  pr " * and freed in the free_user_data function below.\n";
+  pr " */\n";
+  pr "struct user_data {\n";
+  pr "  PyObject *fn;    /* Pointer to Python function. */\n";
+  pr "};\n";
+  pr "\n";
+  pr "static struct user_data *\n";
+  pr "alloc_user_data (void)\n";
+  pr "{\n";
+  pr "  struct user_data *data = calloc (1, sizeof *data);\n";
+  pr "  if (data == NULL) {\n";
+  pr "    PyErr_NoMemory ();\n";
+  pr "    return NULL;\n";
+  pr "  }\n";
+  pr "  return data;\n";
+  pr "}\n";
+  pr "\n";
+  pr "static void\n";
+  pr "free_user_data (void *user_data)\n";
+  pr "{\n";
+  pr "  struct user_data *data = user_data;\n";
+  pr "\n";
+  pr "  Py_DECREF (data->fn);\n";
+  pr "  free (data);\n";
+  pr "}\n";
+  pr "\n";
+
   List.iter print_python_closure_wrapper all_closures;
   List.iter (
     fun (name, fn) ->
-- 
2.22.0
Richard W.M. Jones
2019-Aug-14  21:05 UTC
[Libguestfs] [PATCH libnbd 2/3] python: Hold a refcount to persistent AIO buffer until command completion.
---
 generator/generator | 31 ++++++++++++++++++++++++++++---
 1 file changed, 28 insertions(+), 3 deletions(-)
diff --git a/generator/generator b/generator/generator
index f6a4856..9bcb08d 100755
--- a/generator/generator
+++ b/generator/generator
@@ -3982,6 +3982,13 @@ let print_python_closure_wrapper { cbname; cbargs }    pr
"  const struct user_data *data = user_data;\n";
   pr "  int ret = 0;\n";
   pr "\n";
+  pr "  /* The C callback is always registered, even if there's no
Python\n";
+  pr "   * callback.  This is because we may need to unregister
an\n";
+  pr "   * associated persistent buffer.\n";
+  pr "   */\n";
+  pr "  if (data->fn == NULL)\n";
+  pr "    return 0;\n";
+  pr "\n";
   pr "  PyGILState_STATE py_save = PyGILState_UNLOCKED;\n";
   pr "  PyObject *py_args, *py_ret;\n";
   List.iter (
@@ -4253,10 +4260,24 @@ let print_python_binding name { args; optargs; ret;
may_set_error }         pr "    }\n";
        pr "  }\n";
        pr "  else\n";
-       pr "    %s.callback = NULL; /* we're not going to call it
*/\n" cbname
+       pr "    %s_user_data->fn = NULL;\n" cbname
     | OFlags (n, _) -> pr "  %s_u32 = %s;\n" n n
   ) optargs;
 
+  (* If there is a BytesPersistIn/Out parameter then we need to
+   * increment the refcount and save the pointer into
+   * completion_callback.user_data so we can decrement the
+   * refcount on command completion.
+   *)
+  List.iter (
+    function
+    | BytesPersistIn (n, _) | BytesPersistOut (n, _) ->
+       pr "  /* Increment refcount since buffer may be saved by libnbd.
*/\n";
+       pr "  Py_INCREF (%s);\n" n;
+       pr "  completion_user_data->buf = %s;\n" n;
+    | _ -> ()
+  ) args;
+
   (* Call the underlying C function. *)
   pr "  ret = nbd_%s (h" name;
   List.iter (
@@ -4384,7 +4405,8 @@ let generate_python_methods_c ()    pr " * and freed
in the free_user_data function below.\n";
   pr " */\n";
   pr "struct user_data {\n";
-  pr "  PyObject *fn;    /* Pointer to Python function. */\n";
+  pr "  PyObject *fn;    /* Optional pointer to Python function.
*/\n";
+  pr "  PyObject *buf;   /* Optional pointer to persistent buffer.
*/\n";
   pr "};\n";
   pr "\n";
   pr "static struct user_data *\n";
@@ -4403,7 +4425,10 @@ let generate_python_methods_c ()    pr "{\n";
   pr "  struct user_data *data = user_data;\n";
   pr "\n";
-  pr "  Py_DECREF (data->fn);\n";
+  pr "  if (data->fn != NULL)\n";
+  pr "    Py_DECREF (data->fn);\n";
+  pr "  if (data->buf != NULL)\n";
+  pr "    Py_DECREF (data->buf);\n";
   pr "  free (data);\n";
   pr "}\n";
   pr "\n";
-- 
2.22.0
Richard W.M. Jones
2019-Aug-14  21:05 UTC
[Libguestfs] [PATCH libnbd 3/3] python: Add test for doing asynch copy from one handle to another.
---
 python/t/590-aio-copy.py | 122 +++++++++++++++++++++++++++++++++++++++
 1 file changed, 122 insertions(+)
diff --git a/python/t/590-aio-copy.py b/python/t/590-aio-copy.py
new file mode 100644
index 0000000..129dde1
--- /dev/null
+++ b/python/t/590-aio-copy.py
@@ -0,0 +1,122 @@
+# libnbd Python bindings
+# Copyright (C) 2010-2019 Red Hat Inc.
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+
+import select
+import nbd
+
+disk_size = 512 * 1024 * 1024
+bs = 65536
+max_reads_in_flight = 16
+bytes_read = 0
+bytes_written = 0
+
+def asynch_copy (src, dst):
+    size = src.get_size ()
+
+    # This is our reading position in the source.
+    soff = 0
+
+    # This callback is called when any pread from the source
+    # has completed.
+    writes = []
+    def read_completed (buf, offset, error):
+        global bytes_read
+        bytes_read += buf.size ()
+        wr = (buf, offset)
+        writes.append (wr)
+        # By returning 1 here we auto-retire the pread command.
+        return 1
+
+    # This callback is called when any pwrite to the destination
+    # has completed.
+    def write_completed (buf, error):
+        global bytes_written
+        bytes_written += buf.size ()
+        # By returning 1 here we auto-retire the pwrite command.
+        return 1
+
+    # The main loop which runs until we have finished reading and
+    # there are no more commands in flight.
+    while soff < size or dst.aio_in_flight () > 0:
+        # If we're able to submit more reads from the source
+        # then do so now.
+        if soff < size and src.aio_in_flight () < max_reads_in_flight:
+            bufsize = min (bs, size - soff)
+            buf = nbd.Buffer (bufsize)
+            # NB: Python lambdas are BROKEN.
+            # https://stackoverflow.com/questions/2295290
+            src.aio_pread (buf, soff,
+                           lambda err, buf=buf, soff=soff:
+                           read_completed (buf, soff, err))
+            soff += bufsize
+
+        # If there are any write commands waiting to be issued
+        # to the destination, send them now.
+        for buf, offset in writes:
+            # See above link about broken Python lambdas.
+            dst.aio_pwrite (buf, offset,
+                            lambda err, buf=buf:
+                            write_completed (buf, err))
+        writes = []
+
+        poll = select.poll ()
+
+        sfd = src.aio_get_fd ()
+        dfd = dst.aio_get_fd ()
+
+        sevents = 0
+        devents = 0
+        if src.aio_get_direction () & nbd.AIO_DIRECTION_READ:
+            sevents += select.POLLIN
+        if src.aio_get_direction () & nbd.AIO_DIRECTION_WRITE:
+            sevents += select.POLLOUT
+        if dst.aio_get_direction () & nbd.AIO_DIRECTION_READ:
+            devents += select.POLLIN
+        if dst.aio_get_direction () & nbd.AIO_DIRECTION_WRITE:
+            devents += select.POLLOUT
+        poll.register (sfd, sevents)
+        poll.register (dfd, devents)
+        for (fd, revents) in poll.poll ():
+            # The direction of each handle can change since we
+            # slept in the select.
+            if fd == sfd and revents & select.POLLIN and \
+               src.aio_get_direction () & nbd.AIO_DIRECTION_READ:
+                src.aio_notify_read ()
+            elif fd == sfd and revents & select.POLLOUT and \
+                 src.aio_get_direction () & nbd.AIO_DIRECTION_WRITE:
+                src.aio_notify_write ()
+            elif fd == dfd and revents & select.POLLIN and \
+                 dst.aio_get_direction () & nbd.AIO_DIRECTION_READ:
+                dst.aio_notify_read ()
+            elif fd == dfd and revents & select.POLLOUT and \
+                 dst.aio_get_direction () & nbd.AIO_DIRECTION_WRITE:
+                dst.aio_notify_write ()
+
+src = nbd.NBD ()
+src.set_handle_name ("src")
+dst = nbd.NBD ()
+dst.set_handle_name ("dst")
+src.connect_command (["nbdkit", "-s",
"--exit-with-parent", "-r",
+                      "pattern", "size=%d" % disk_size])
+dst.connect_command (["nbdkit", "-s",
"--exit-with-parent",
+                      "memory", "size=%d" % disk_size])
+asynch_copy (src, dst)
+
+print ("bytes read: %d written: %d size: %d" %
+       (bytes_read, bytes_written, disk_size))
+assert bytes_read == disk_size
+assert bytes_written == disk_size
-- 
2.22.0
Eric Blake
2019-Aug-14  21:36 UTC
Re: [Libguestfs] [PATCH libnbd 2/3] python: Hold a refcount to persistent AIO buffer until command completion.
On 8/14/19 4:05 PM, Richard W.M. Jones wrote:> --- > generator/generator | 31 ++++++++++++++++++++++++++++--- > 1 file changed, 28 insertions(+), 3 deletions(-) > > diff --git a/generator/generator b/generator/generator> > + (* If there is a BytesPersistIn/Out parameter then we need to > + * increment the refcount and save the pointer into > + * completion_callback.user_data so we can decrement the > + * refcount on command completion. > + *) > + List.iter ( > + function > + | BytesPersistIn (n, _) | BytesPersistOut (n, _) -> > + pr " /* Increment refcount since buffer may be saved by libnbd. */\n"; > + pr " Py_INCREF (%s);\n" n; > + pr " completion_user_data->buf = %s;\n" n; > + | _ -> () > + ) args; > +Is it worth a sanity check that we never have more than one BytesPersist* parameter (as otherwise, we'd have to store more than one pointer in the user_data struct)? Doesn't change current code generation (since we satisfy that constraint), but would make future additions less likely to mess up (and as it affects both python and ocaml, doing it in a separate patch is fine).> @@ -4403,7 +4425,10 @@ let generate_python_methods_c () > pr "{\n"; > pr " struct user_data *data = user_data;\n"; > pr "\n"; > - pr " Py_DECREF (data->fn);\n"; > + pr " if (data->fn != NULL)\n"; > + pr " Py_DECREF (data->fn);\n"; > + pr " if (data->buf != NULL)\n"; > + pr " Py_DECREF (data->buf);\n";Spell these Py_XDECREF() and you can avoid the 'if's. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Eric Blake
2019-Aug-14  21:41 UTC
Re: [Libguestfs] [PATCH libnbd 3/3] python: Add test for doing asynch copy from one handle to another.
On 8/14/19 4:05 PM, Richard W.M. Jones wrote:> --- > python/t/590-aio-copy.py | 122 +++++++++++++++++++++++++++++++++++++++ > 1 file changed, 122 insertions(+) > > diff --git a/python/t/590-aio-copy.py b/python/t/590-aio-copy.py > new file mode 100644 > index 0000000..129dde1 > --- /dev/null > +++ b/python/t/590-aio-copy.py > @@ -0,0 +1,122 @@ > +# libnbd Python bindings > +# Copyright (C) 2010-2019 Red Hat Inc.Obviously some copy-paste history for a new file to be able to trace origins of some of its code back to 2010 :) I can't say it's wrong, but it does look funny. Looks good; and glad we finally got here after several detours. -- Eric Blake, Principal Software Engineer Red Hat, Inc. +1-919-301-3226 Virtualization: qemu.org | libvirt.org
Seemingly Similar Threads
- [PATCH libnbd 0/3] Use free callback to hold ref to AIO buffer.
- [libnbd PATCH] python: Plug some memory leaks on error paths
- [PATCH libnbd 2/6] generator: Create only one Python wrapper per closure.
- [PATCH libnbd 6/7] python: Use free callback to free closure root.
- [libnbd PATCH] python: Fix more memory leaks