Hi all: This series tries to implement rx batching for vhost-net. This is done by batching the dequeuing from skb_array which was exported by underlayer socket and pass the sbk back through msg_control to finish userspace copying. Tests shows at most 19% improvment on rx pps. Please review. Thanks Jason Wang (8): ptr_ring: introduce batch dequeuing skb_array: introduce batch dequeuing tun: export skb_array tap: export skb_array tun: support receiving skb through msg_control tap: support receiving skb from msg_control vhost_net: try batch dequing from skb array vhost_net: use lockless peeking for skb array during busy polling drivers/net/tap.c | 25 ++++++++++++++--- drivers/net/tun.c | 31 +++++++++++++++------ drivers/vhost/net.c | 71 +++++++++++++++++++++++++++++++++++++++++++---- include/linux/if_tap.h | 5 ++++ include/linux/if_tun.h | 5 ++++ include/linux/ptr_ring.h | 65 +++++++++++++++++++++++++++++++++++++++++++ include/linux/skb_array.h | 25 +++++++++++++++++ 7 files changed, 209 insertions(+), 18 deletions(-) -- 2.7.4
Signed-off-by: Jason Wang <jasowang at redhat.com>
---
 include/linux/ptr_ring.h | 65 ++++++++++++++++++++++++++++++++++++++++++++++++
 1 file changed, 65 insertions(+)
diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
index 6c70444..4771ded 100644
--- a/include/linux/ptr_ring.h
+++ b/include/linux/ptr_ring.h
@@ -247,6 +247,22 @@ static inline void *__ptr_ring_consume(struct ptr_ring *r)
 	return ptr;
 }
 
+static inline int __ptr_ring_consume_batched(struct ptr_ring *r,
+					     void **array, int n)
+{
+	void *ptr;
+	int i = 0;
+
+	while (i < n) {
+		ptr = __ptr_ring_consume(r);
+		if (!ptr)
+			break;
+		array[i++] = ptr;
+	}
+
+	return i;
+}
+
 /*
  * Note: resize (below) nests producer lock within consumer lock, so if you
  * call this in interrupt or BH context, you must disable interrupts/BH when
@@ -297,6 +313,55 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r)
 	return ptr;
 }
 
+static inline int ptr_ring_consume_batched(struct ptr_ring *r,
+					   void **array, int n)
+{
+	int ret;
+
+	spin_lock(&r->consumer_lock);
+	ret = __ptr_ring_consume_batched(r, array, n);
+	spin_unlock(&r->consumer_lock);
+
+	return ret;
+}
+
+static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r,
+					       void **array, int n)
+{
+	int ret;
+
+	spin_lock_irq(&r->consumer_lock);
+	ret = __ptr_ring_consume_batched(r, array, n);
+	spin_unlock_irq(&r->consumer_lock);
+
+	return ret;
+}
+
+static inline int ptr_ring_consume_batched_any(struct ptr_ring *r,
+					       void **array, int n)
+{
+	unsigned long flags;
+	int ret;
+
+	spin_lock_irqsave(&r->consumer_lock, flags);
+	ret = __ptr_ring_consume_batched(r, array, n);
+	spin_unlock_irqrestore(&r->consumer_lock, flags);
+
+	return ret;
+}
+
+static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r,
+					      void **array, int n)
+{
+	int ret;
+
+	spin_lock_bh(&r->consumer_lock);
+	ret = __ptr_ring_consume_batched(r, array, n);
+	spin_unlock_bh(&r->consumer_lock);
+
+	return ret;
+}
+
 /* Cast to structure type and call a function without discarding from FIFO.
  * Function must return a value.
  * Callers must take consumer_lock.
-- 
2.7.4
Jason Wang
2017-Mar-21  04:04 UTC
[PATCH net-next 2/8] skb_array: introduce batch dequeuing
Signed-off-by: Jason Wang <jasowang at redhat.com>
---
 include/linux/skb_array.h | 25 +++++++++++++++++++++++++
 1 file changed, 25 insertions(+)
diff --git a/include/linux/skb_array.h b/include/linux/skb_array.h
index f4dfade..90e44b9 100644
--- a/include/linux/skb_array.h
+++ b/include/linux/skb_array.h
@@ -97,21 +97,46 @@ static inline struct sk_buff *skb_array_consume(struct
skb_array *a)
 	return ptr_ring_consume(&a->ring);
 }
 
+static inline int skb_array_consume_batched(struct skb_array *a,
+					    void **array, int n)
+{
+	return ptr_ring_consume_batched(&a->ring, array, n);
+}
+
 static inline struct sk_buff *skb_array_consume_irq(struct skb_array *a)
 {
 	return ptr_ring_consume_irq(&a->ring);
 }
 
+static inline int skb_array_consume_batched_irq(struct skb_array *a,
+						void **array, int n)
+{
+	return ptr_ring_consume_batched_irq(&a->ring, array, n);
+}
+
 static inline struct sk_buff *skb_array_consume_any(struct skb_array *a)
 {
 	return ptr_ring_consume_any(&a->ring);
 }
 
+static inline int skb_array_consume_batched_any(struct skb_array *a,
+						void **array, int n)
+{
+	return ptr_ring_consume_batched_any(&a->ring, array, n);
+}
+
+
 static inline struct sk_buff *skb_array_consume_bh(struct skb_array *a)
 {
 	return ptr_ring_consume_bh(&a->ring);
 }
 
+static inline int skb_array_consume_batched_bh(struct skb_array *a,
+					       void **array, int n)
+{
+	return ptr_ring_consume_batched_bh(&a->ring, array, n);
+}
+
 static inline int __skb_array_len_with_tag(struct sk_buff *skb)
 {
 	if (likely(skb)) {
-- 
2.7.4
This patch exports skb_array through tun_get_skb_array(). Caller can
then manipulate skb array directly.
Signed-off-by: Jason Wang <jasowang at redhat.com>
---
 drivers/net/tun.c      | 13 +++++++++++++
 include/linux/if_tun.h |  5 +++++
 2 files changed, 18 insertions(+)
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index c418f0a..70dd9ec 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -2613,6 +2613,19 @@ struct socket *tun_get_socket(struct file *file)
 }
 EXPORT_SYMBOL_GPL(tun_get_socket);
 
+struct skb_array *tun_get_skb_array(struct file *file)
+{
+	struct tun_file *tfile;
+
+	if (file->f_op != &tun_fops)
+		return ERR_PTR(-EINVAL);
+	tfile = file->private_data;
+	if (!tfile)
+		return ERR_PTR(-EBADFD);
+	return &tfile->tx_array;
+}
+EXPORT_SYMBOL_GPL(tun_get_skb_array);
+
 module_init(tun_init);
 module_exit(tun_cleanup);
 MODULE_DESCRIPTION(DRV_DESCRIPTION);
diff --git a/include/linux/if_tun.h b/include/linux/if_tun.h
index ed6da2e..bf9bdf4 100644
--- a/include/linux/if_tun.h
+++ b/include/linux/if_tun.h
@@ -19,6 +19,7 @@
 
 #if defined(CONFIG_TUN) || defined(CONFIG_TUN_MODULE)
 struct socket *tun_get_socket(struct file *);
+struct skb_array *tun_get_skb_array(struct file *file);
 #else
 #include <linux/err.h>
 #include <linux/errno.h>
@@ -28,5 +29,9 @@ static inline struct socket *tun_get_socket(struct file *f)
 {
 	return ERR_PTR(-EINVAL);
 }
+static inline struct skb_array *tun_get_skb_array(struct file *f)
+{
+	return ERR_PTR(-EINVAL);
+}
 #endif /* CONFIG_TUN */
 #endif /* __IF_TUN_H */
-- 
2.7.4
This patch exports skb_array through tap_get_skb_array(). Caller can
then manipulate skb array directly.
Signed-off-by: Jason Wang <jasowang at redhat.com>
---
 drivers/net/tap.c      | 13 +++++++++++++
 include/linux/if_tap.h |  5 +++++
 2 files changed, 18 insertions(+)
diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index 4d4173d..abdaf86 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -1193,6 +1193,19 @@ struct socket *tap_get_socket(struct file *file)
 }
 EXPORT_SYMBOL_GPL(tap_get_socket);
 
+struct skb_array *tap_get_skb_array(struct file *file)
+{
+	struct tap_queue *q;
+
+	if (file->f_op != &tap_fops)
+		return ERR_PTR(-EINVAL);
+	q = file->private_data;
+	if (!q)
+		return ERR_PTR(-EBADFD);
+	return &q->skb_array;
+}
+EXPORT_SYMBOL_GPL(tap_get_skb_array);
+
 int tap_queue_resize(struct tap_dev *tap)
 {
 	struct net_device *dev = tap->dev;
diff --git a/include/linux/if_tap.h b/include/linux/if_tap.h
index 3482c3c..4837157 100644
--- a/include/linux/if_tap.h
+++ b/include/linux/if_tap.h
@@ -3,6 +3,7 @@
 
 #if IS_ENABLED(CONFIG_TAP)
 struct socket *tap_get_socket(struct file *);
+struct skb_array *tap_get_skb_array(struct file *file);
 #else
 #include <linux/err.h>
 #include <linux/errno.h>
@@ -12,6 +13,10 @@ static inline struct socket *tap_get_socket(struct file *f)
 {
 	return ERR_PTR(-EINVAL);
 }
+static inline struct skb_array *tap_get_skb_array(struct file *f)
+{
+	return ERR_PTR(-EINVAL);
+}
 #endif /* CONFIG_TAP */
 
 #include <net/sock.h>
-- 
2.7.4
Jason Wang
2017-Mar-21  04:04 UTC
[PATCH net-next 5/8] tun: support receiving skb through msg_control
This patch makes tun_recvmsg() can receive from skb from its caller
through msg_control. Vhost_net will be the first user.
Signed-off-by: Jason Wang <jasowang at redhat.com>
---
 drivers/net/tun.c | 18 ++++++++++--------
 1 file changed, 10 insertions(+), 8 deletions(-)
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index 70dd9ec..a82bced 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -1498,9 +1498,8 @@ static struct sk_buff *tun_ring_recv(struct tun_file
*tfile, int noblock,
 
 static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
 			   struct iov_iter *to,
-			   int noblock)
+			   int noblock, struct sk_buff *skb)
 {
-	struct sk_buff *skb;
 	ssize_t ret;
 	int err;
 
@@ -1509,10 +1508,12 @@ static ssize_t tun_do_read(struct tun_struct *tun,
struct tun_file *tfile,
 	if (!iov_iter_count(to))
 		return 0;
 
-	/* Read frames from ring */
-	skb = tun_ring_recv(tfile, noblock, &err);
-	if (!skb)
-		return err;
+	if (!skb) {
+		/* Read frames from ring */
+		skb = tun_ring_recv(tfile, noblock, &err);
+		if (!skb)
+			return err;
+	}
 
 	ret = tun_put_user(tun, tfile, skb, to);
 	if (unlikely(ret < 0))
@@ -1532,7 +1533,7 @@ static ssize_t tun_chr_read_iter(struct kiocb *iocb,
struct iov_iter *to)
 
 	if (!tun)
 		return -EBADFD;
-	ret = tun_do_read(tun, tfile, to, file->f_flags & O_NONBLOCK);
+	ret = tun_do_read(tun, tfile, to, file->f_flags & O_NONBLOCK, NULL);
 	ret = min_t(ssize_t, ret, len);
 	if (ret > 0)
 		iocb->ki_pos = ret;
@@ -1634,7 +1635,8 @@ static int tun_recvmsg(struct socket *sock, struct msghdr
*m, size_t total_len,
 					 SOL_PACKET, TUN_TX_TIMESTAMP);
 		goto out;
 	}
-	ret = tun_do_read(tun, tfile, &m->msg_iter, flags & MSG_DONTWAIT);
+	ret = tun_do_read(tun, tfile, &m->msg_iter, flags & MSG_DONTWAIT,
+			  m->msg_control);
 	if (ret > (ssize_t)total_len) {
 		m->msg_flags |= MSG_TRUNC;
 		ret = flags & MSG_TRUNC ? ret : total_len;
-- 
2.7.4
Jason Wang
2017-Mar-21  04:04 UTC
[PATCH net-next 6/8] tap: support receiving skb from msg_control
This patch makes tap_recvmsg() can receive from skb from its caller
through msg_control. Vhost_net will be the first user.
Signed-off-by: Jason Wang <jasowang at redhat.com>
---
 drivers/net/tap.c | 12 ++++++++----
 1 file changed, 8 insertions(+), 4 deletions(-)
diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index abdaf86..07d9174 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -824,15 +824,17 @@ static ssize_t tap_put_user(struct tap_queue *q,
 
 static ssize_t tap_do_read(struct tap_queue *q,
 			   struct iov_iter *to,
-			   int noblock)
+			   int noblock, struct sk_buff *skb)
 {
 	DEFINE_WAIT(wait);
-	struct sk_buff *skb;
 	ssize_t ret = 0;
 
 	if (!iov_iter_count(to))
 		return 0;
 
+	if (skb)
+		goto done;
+
 	while (1) {
 		if (!noblock)
 			prepare_to_wait(sk_sleep(&q->sk), &wait,
@@ -856,6 +858,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
 	if (!noblock)
 		finish_wait(sk_sleep(&q->sk), &wait);
 
+done:
 	if (skb) {
 		ret = tap_put_user(q, skb, to);
 		if (unlikely(ret < 0))
@@ -872,7 +875,7 @@ static ssize_t tap_read_iter(struct kiocb *iocb, struct
iov_iter *to)
 	struct tap_queue *q = file->private_data;
 	ssize_t len = iov_iter_count(to), ret;
 
-	ret = tap_do_read(q, to, file->f_flags & O_NONBLOCK);
+	ret = tap_do_read(q, to, file->f_flags & O_NONBLOCK, NULL);
 	ret = min_t(ssize_t, ret, len);
 	if (ret > 0)
 		iocb->ki_pos = ret;
@@ -1155,7 +1158,8 @@ static int tap_recvmsg(struct socket *sock, struct msghdr
*m,
 	int ret;
 	if (flags & ~(MSG_DONTWAIT|MSG_TRUNC))
 		return -EINVAL;
-	ret = tap_do_read(q, &m->msg_iter, flags & MSG_DONTWAIT);
+	ret = tap_do_read(q, &m->msg_iter, flags & MSG_DONTWAIT,
+			  m->msg_control);
 	if (ret > total_len) {
 		m->msg_flags |= MSG_TRUNC;
 		ret = flags & MSG_TRUNC ? ret : total_len;
-- 
2.7.4
Jason Wang
2017-Mar-21  04:04 UTC
[PATCH net-next 7/8] vhost_net: try batch dequing from skb array
We used to dequeue one skb during recvmsg() from skb_array, this could
be inefficient because of the bad cache utilization and spinlock
touching for each packet. This patch tries to batch them by calling
batch dequeuing helpers explicitly on the exported skb array and pass
the skb back through msg_control for underlayer socket to finish the
userspace copying.
Tests were done by XDP1:
- small buffer:
  Before: 1.88Mpps
  After : 2.25Mpps (+19.6%)
- mergeable buffer:
  Before: 1.83Mpps
  After : 2.10Mpps (+14.7%)
Signed-off-by: Jason Wang <jasowang at redhat.com>
---
 drivers/vhost/net.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++----
 1 file changed, 60 insertions(+), 4 deletions(-)
diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index 9b51989..53f09f2 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -28,6 +28,8 @@
 #include <linux/if_macvlan.h>
 #include <linux/if_tap.h>
 #include <linux/if_vlan.h>
+#include <linux/skb_array.h>
+#include <linux/skbuff.h>
 
 #include <net/sock.h>
 
@@ -85,6 +87,7 @@ struct vhost_net_ubuf_ref {
 	struct vhost_virtqueue *vq;
 };
 
+#define VHOST_RX_BATCH 64
 struct vhost_net_virtqueue {
 	struct vhost_virtqueue vq;
 	size_t vhost_hlen;
@@ -99,6 +102,10 @@ struct vhost_net_virtqueue {
 	/* Reference counting for outstanding ubufs.
 	 * Protected by vq mutex. Writers must also take device mutex. */
 	struct vhost_net_ubuf_ref *ubufs;
+	struct skb_array *rx_array;
+	void *rxq[VHOST_RX_BATCH];
+	int rt;
+	int rh;
 };
 
 struct vhost_net {
@@ -201,6 +208,8 @@ static void vhost_net_vq_reset(struct vhost_net *n)
 		n->vqs[i].ubufs = NULL;
 		n->vqs[i].vhost_hlen = 0;
 		n->vqs[i].sock_hlen = 0;
+		n->vqs[i].rt = 0;
+		n->vqs[i].rh = 0;
 	}
 
 }
@@ -503,13 +512,30 @@ static void handle_tx(struct vhost_net *net)
 	mutex_unlock(&vq->mutex);
 }
 
-static int peek_head_len(struct sock *sk)
+static int peek_head_len_batched(struct vhost_net_virtqueue *rvq)
+{
+	if (rvq->rh != rvq->rt)
+		goto out;
+
+	rvq->rh = rvq->rt = 0;
+	rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq,
+						VHOST_RX_BATCH);
+	if (!rvq->rt)
+		return 0;
+out:
+	return __skb_array_len_with_tag(rvq->rxq[rvq->rh]);
+}
+
+static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk)
 {
 	struct socket *sock = sk->sk_socket;
 	struct sk_buff *head;
 	int len = 0;
 	unsigned long flags;
 
+	if (rvq->rx_array)
+		return peek_head_len_batched(rvq);
+
 	if (sock->ops->peek_len)
 		return sock->ops->peek_len(sock);
 
@@ -535,12 +561,14 @@ static int sk_has_rx_data(struct sock *sk)
 	return skb_queue_empty(&sk->sk_receive_queue);
 }
 
-static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
+static int vhost_net_rx_peek_head_len(struct vhost_net *net,
+				      struct sock *sk)
 {
+	struct vhost_net_virtqueue *rvq = &net->vqs[VHOST_NET_VQ_RX];
 	struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
 	struct vhost_virtqueue *vq = &nvq->vq;
 	unsigned long uninitialized_var(endtime);
-	int len = peek_head_len(sk);
+	int len = peek_head_len(rvq, sk);
 
 	if (!len && vq->busyloop_timeout) {
 		/* Both tx vq and rx socket were polled here */
@@ -561,7 +589,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net,
struct sock *sk)
 			vhost_poll_queue(&vq->poll);
 		mutex_unlock(&vq->mutex);
 
-		len = peek_head_len(sk);
+		len = peek_head_len(rvq, sk);
 	}
 
 	return len;
@@ -699,6 +727,8 @@ static void handle_rx(struct vhost_net *net)
 		/* On error, stop handling until the next kick. */
 		if (unlikely(headcount < 0))
 			goto out;
+		if (nvq->rx_array)
+			msg.msg_control = nvq->rxq[nvq->rh++];
 		/* On overrun, truncate and discard */
 		if (unlikely(headcount > UIO_MAXIOV)) {
 			iov_iter_init(&msg.msg_iter, READ, vq->iov, 1, 1);
@@ -841,6 +871,8 @@ static int vhost_net_open(struct inode *inode, struct file
*f)
 		n->vqs[i].done_idx = 0;
 		n->vqs[i].vhost_hlen = 0;
 		n->vqs[i].sock_hlen = 0;
+		n->vqs[i].rt = 0;
+		n->vqs[i].rh = 0;
 	}
 	vhost_dev_init(dev, vqs, VHOST_NET_VQ_MAX);
 
@@ -856,11 +888,15 @@ static struct socket *vhost_net_stop_vq(struct vhost_net
*n,
 					struct vhost_virtqueue *vq)
 {
 	struct socket *sock;
+	struct vhost_net_virtqueue *nvq +		container_of(vq, struct
vhost_net_virtqueue, vq);
 
 	mutex_lock(&vq->mutex);
 	sock = vq->private_data;
 	vhost_net_disable_vq(n, vq);
 	vq->private_data = NULL;
+	while (nvq->rh != nvq->rt)
+		kfree_skb(nvq->rxq[nvq->rh++]);
 	mutex_unlock(&vq->mutex);
 	return sock;
 }
@@ -953,6 +989,25 @@ static struct socket *get_raw_socket(int fd)
 	return ERR_PTR(r);
 }
 
+static struct skb_array *get_tap_skb_array(int fd)
+{
+	struct skb_array *array;
+	struct file *file = fget(fd);
+
+	if (!file)
+		return NULL;
+	array = tun_get_skb_array(file);
+	if (!IS_ERR(array))
+		goto out;
+	array = tap_get_skb_array(file);
+	if (!IS_ERR(array))
+		goto out;
+	array = NULL;
+out:
+	fput(file);
+	return array;
+}
+
 static struct socket *get_tap_socket(int fd)
 {
 	struct file *file = fget(fd);
@@ -1029,6 +1084,7 @@ static long vhost_net_set_backend(struct vhost_net *n,
unsigned index, int fd)
 
 		vhost_net_disable_vq(n, vq);
 		vq->private_data = sock;
+		nvq->rx_array = get_tap_skb_array(fd);
 		r = vhost_vq_init_access(vq);
 		if (r)
 			goto err_used;
-- 
2.7.4
Jason Wang
2017-Mar-21  04:04 UTC
[PATCH net-next 8/8] vhost_net: use lockless peeking for skb array during busy polling
For the socket that exports its skb array, we can use lockless polling
to avoid touching spinlock during busy polling.
Signed-off-by: Jason Wang <jasowang at redhat.com>
---
 drivers/vhost/net.c | 7 +++++--
 1 file changed, 5 insertions(+), 2 deletions(-)
diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
index 53f09f2..41153a3 100644
--- a/drivers/vhost/net.c
+++ b/drivers/vhost/net.c
@@ -551,10 +551,13 @@ static int peek_head_len(struct vhost_net_virtqueue *rvq,
struct sock *sk)
 	return len;
 }
 
-static int sk_has_rx_data(struct sock *sk)
+static int sk_has_rx_data(struct vhost_net_virtqueue *rvq, struct sock *sk)
 {
 	struct socket *sock = sk->sk_socket;
 
+	if (rvq->rx_array)
+		return !__skb_array_empty(rvq->rx_array);
+
 	if (sock->ops->peek_len)
 		return sock->ops->peek_len(sock);
 
@@ -579,7 +582,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net,
 		endtime = busy_clock() + vq->busyloop_timeout;
 
 		while (vhost_can_busy_poll(&net->dev, endtime) &&
-		       !sk_has_rx_data(sk) &&
+		       !sk_has_rx_data(rvq, sk) &&
 		       vhost_vq_avail_empty(&net->dev, vq))
 			cpu_relax();
 
-- 
2.7.4
Sergei Shtylyov
2017-Mar-21  10:25 UTC
[PATCH net-next 1/8] ptr_ring: introduce batch dequeuing
Hello! On 3/21/2017 7:04 AM, Jason Wang wrote:> Signed-off-by: Jason Wang <jasowang at redhat.com> > --- > include/linux/ptr_ring.h | 65 ++++++++++++++++++++++++++++++++++++++++++++++++ > 1 file changed, 65 insertions(+) > > diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h > index 6c70444..4771ded 100644 > --- a/include/linux/ptr_ring.h > +++ b/include/linux/ptr_ring.h > @@ -247,6 +247,22 @@ static inline void *__ptr_ring_consume(struct ptr_ring *r) > return ptr; > } > > +static inline int __ptr_ring_consume_batched(struct ptr_ring *r, > + void **array, int n) > +{ > + void *ptr; > + int i = 0; > + > + while (i < n) {Hm, why not *for*?> + ptr = __ptr_ring_consume(r); > + if (!ptr) > + break; > + array[i++] = ptr; > + } > + > + return i; > +} > + > /* > * Note: resize (below) nests producer lock within consumer lock, so if you > * call this in interrupt or BH context, you must disable interrupts/BH when > @@ -297,6 +313,55 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r)[...] MBR, Sergei
Michael S. Tsirkin
2017-Mar-22  13:43 UTC
[PATCH net-next 1/8] ptr_ring: introduce batch dequeuing
On Tue, Mar 21, 2017 at 12:04:40PM +0800, Jason Wang wrote:> Signed-off-by: Jason Wang <jasowang at redhat.com> > --- > include/linux/ptr_ring.h | 65 ++++++++++++++++++++++++++++++++++++++++++++++++ > 1 file changed, 65 insertions(+) > > diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h > index 6c70444..4771ded 100644 > --- a/include/linux/ptr_ring.h > +++ b/include/linux/ptr_ring.h > @@ -247,6 +247,22 @@ static inline void *__ptr_ring_consume(struct ptr_ring *r) > return ptr; > } > > +static inline int __ptr_ring_consume_batched(struct ptr_ring *r, > + void **array, int n) > +{ > + void *ptr; > + int i = 0; > + > + while (i < n) { > + ptr = __ptr_ring_consume(r); > + if (!ptr) > + break; > + array[i++] = ptr; > + } > + > + return i; > +} > + > /* > * Note: resize (below) nests producer lock within consumer lock, so if you > * call this in interrupt or BH context, you must disable interrupts/BH whenThis ignores the comment above that function: /* Note: callers invoking this in a loop must use a compiler barrier, * for example cpu_relax(). */ Also - it looks like it shouldn't matter if reads are reordered but I wonder. Thoughts? Including some reasoning about it in commit log would be nice.> @@ -297,6 +313,55 @@ static inline void *ptr_ring_consume_bh(struct ptr_ring *r) > return ptr; > } > > +static inline int ptr_ring_consume_batched(struct ptr_ring *r, > + void **array, int n) > +{ > + int ret; > + > + spin_lock(&r->consumer_lock); > + ret = __ptr_ring_consume_batched(r, array, n); > + spin_unlock(&r->consumer_lock); > + > + return ret; > +} > + > +static inline int ptr_ring_consume_batched_irq(struct ptr_ring *r, > + void **array, int n) > +{ > + int ret; > + > + spin_lock_irq(&r->consumer_lock); > + ret = __ptr_ring_consume_batched(r, array, n); > + spin_unlock_irq(&r->consumer_lock); > + > + return ret; > +} > + > +static inline int ptr_ring_consume_batched_any(struct ptr_ring *r, > + void **array, int n) > +{ > + unsigned long flags; > + int ret; > + > + spin_lock_irqsave(&r->consumer_lock, flags); > + ret = __ptr_ring_consume_batched(r, array, n); > + spin_unlock_irqrestore(&r->consumer_lock, flags); > + > + return ret; > +} > + > +static inline int ptr_ring_consume_batched_bh(struct ptr_ring *r, > + void **array, int n) > +{ > + int ret; > + > + spin_lock_bh(&r->consumer_lock); > + ret = __ptr_ring_consume_batched(r, array, n); > + spin_unlock_bh(&r->consumer_lock); > + > + return ret; > +} > + > /* Cast to structure type and call a function without discarding from FIFO. > * Function must return a value. > * Callers must take consumer_lock. > -- > 2.7.4
Michael S. Tsirkin
2017-Mar-22  14:16 UTC
[PATCH net-next 7/8] vhost_net: try batch dequing from skb array
On Tue, Mar 21, 2017 at 12:04:46PM +0800, Jason Wang wrote:> We used to dequeue one skb during recvmsg() from skb_array, this could > be inefficient because of the bad cache utilization and spinlock > touching for each packet. This patch tries to batch them by calling > batch dequeuing helpers explicitly on the exported skb array and pass > the skb back through msg_control for underlayer socket to finish the > userspace copying. > > Tests were done by XDP1: > - small buffer: > Before: 1.88Mpps > After : 2.25Mpps (+19.6%) > - mergeable buffer: > Before: 1.83Mpps > After : 2.10Mpps (+14.7%) > > Signed-off-by: Jason Wang <jasowang at redhat.com> > --- > drivers/vhost/net.c | 64 +++++++++++++++++++++++++++++++++++++++++++++++++---- > 1 file changed, 60 insertions(+), 4 deletions(-) > > diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c > index 9b51989..53f09f2 100644 > --- a/drivers/vhost/net.c > +++ b/drivers/vhost/net.c > @@ -28,6 +28,8 @@ > #include <linux/if_macvlan.h> > #include <linux/if_tap.h> > #include <linux/if_vlan.h> > +#include <linux/skb_array.h> > +#include <linux/skbuff.h> > > #include <net/sock.h> > > @@ -85,6 +87,7 @@ struct vhost_net_ubuf_ref { > struct vhost_virtqueue *vq; > }; > > +#define VHOST_RX_BATCH 64 > struct vhost_net_virtqueue { > struct vhost_virtqueue vq; > size_t vhost_hlen; > @@ -99,6 +102,10 @@ struct vhost_net_virtqueue { > /* Reference counting for outstanding ubufs. > * Protected by vq mutex. Writers must also take device mutex. */ > struct vhost_net_ubuf_ref *ubufs; > + struct skb_array *rx_array; > + void *rxq[VHOST_RX_BATCH]; > + int rt; > + int rh; > }; > > struct vhost_net { > @@ -201,6 +208,8 @@ static void vhost_net_vq_reset(struct vhost_net *n) > n->vqs[i].ubufs = NULL; > n->vqs[i].vhost_hlen = 0; > n->vqs[i].sock_hlen = 0; > + n->vqs[i].rt = 0; > + n->vqs[i].rh = 0; > } > > } > @@ -503,13 +512,30 @@ static void handle_tx(struct vhost_net *net) > mutex_unlock(&vq->mutex); > } > > -static int peek_head_len(struct sock *sk) > +static int peek_head_len_batched(struct vhost_net_virtqueue *rvq)Pls rename to say what it actually does: fetch skbs> +{ > + if (rvq->rh != rvq->rt) > + goto out; > + > + rvq->rh = rvq->rt = 0; > + rvq->rt = skb_array_consume_batched_bh(rvq->rx_array, rvq->rxq, > + VHOST_RX_BATCH);A comment explaining why is is -bh would be helpful.> + if (!rvq->rt) > + return 0; > +out: > + return __skb_array_len_with_tag(rvq->rxq[rvq->rh]); > +} > + > +static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk) > { > struct socket *sock = sk->sk_socket; > struct sk_buff *head; > int len = 0; > unsigned long flags; > > + if (rvq->rx_array) > + return peek_head_len_batched(rvq); > + > if (sock->ops->peek_len) > return sock->ops->peek_len(sock); > > @@ -535,12 +561,14 @@ static int sk_has_rx_data(struct sock *sk) > return skb_queue_empty(&sk->sk_receive_queue); > } > > -static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk) > +static int vhost_net_rx_peek_head_len(struct vhost_net *net, > + struct sock *sk) > { > + struct vhost_net_virtqueue *rvq = &net->vqs[VHOST_NET_VQ_RX]; > struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX]; > struct vhost_virtqueue *vq = &nvq->vq; > unsigned long uninitialized_var(endtime); > - int len = peek_head_len(sk); > + int len = peek_head_len(rvq, sk); > > if (!len && vq->busyloop_timeout) { > /* Both tx vq and rx socket were polled here */ > @@ -561,7 +589,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk) > vhost_poll_queue(&vq->poll); > mutex_unlock(&vq->mutex); > > - len = peek_head_len(sk); > + len = peek_head_len(rvq, sk); > } > > return len; > @@ -699,6 +727,8 @@ static void handle_rx(struct vhost_net *net) > /* On error, stop handling until the next kick. */ > if (unlikely(headcount < 0)) > goto out; > + if (nvq->rx_array) > + msg.msg_control = nvq->rxq[nvq->rh++]; > /* On overrun, truncate and discard */ > if (unlikely(headcount > UIO_MAXIOV)) { > iov_iter_init(&msg.msg_iter, READ, vq->iov, 1, 1); > @@ -841,6 +871,8 @@ static int vhost_net_open(struct inode *inode, struct file *f) > n->vqs[i].done_idx = 0; > n->vqs[i].vhost_hlen = 0; > n->vqs[i].sock_hlen = 0; > + n->vqs[i].rt = 0; > + n->vqs[i].rh = 0; > } > vhost_dev_init(dev, vqs, VHOST_NET_VQ_MAX); > > @@ -856,11 +888,15 @@ static struct socket *vhost_net_stop_vq(struct vhost_net *n, > struct vhost_virtqueue *vq) > { > struct socket *sock; > + struct vhost_net_virtqueue *nvq > + container_of(vq, struct vhost_net_virtqueue, vq); > > mutex_lock(&vq->mutex); > sock = vq->private_data; > vhost_net_disable_vq(n, vq); > vq->private_data = NULL; > + while (nvq->rh != nvq->rt) > + kfree_skb(nvq->rxq[nvq->rh++]); > mutex_unlock(&vq->mutex); > return sock; > } > @@ -953,6 +989,25 @@ static struct socket *get_raw_socket(int fd) > return ERR_PTR(r); > } > > +static struct skb_array *get_tap_skb_array(int fd) > +{ > + struct skb_array *array; > + struct file *file = fget(fd); > + > + if (!file) > + return NULL; > + array = tun_get_skb_array(file); > + if (!IS_ERR(array)) > + goto out; > + array = tap_get_skb_array(file); > + if (!IS_ERR(array)) > + goto out; > + array = NULL; > +out: > + fput(file); > + return array; > +} > + > static struct socket *get_tap_socket(int fd) > { > struct file *file = fget(fd); > @@ -1029,6 +1084,7 @@ static long vhost_net_set_backend(struct vhost_net *n, unsigned index, int fd) > > vhost_net_disable_vq(n, vq); > vq->private_data = sock; > + nvq->rx_array = get_tap_skb_array(fd); > r = vhost_vq_init_access(vq); > if (r) > goto err_used; > -- > 2.7.4
Michael S. Tsirkin
2017-Mar-29  12:07 UTC
[PATCH net-next 8/8] vhost_net: use lockless peeking for skb array during busy polling
On Tue, Mar 21, 2017 at 12:04:47PM +0800, Jason Wang wrote:> For the socket that exports its skb array, we can use lockless polling > to avoid touching spinlock during busy polling. > > Signed-off-by: Jason Wang <jasowang at redhat.com> > --- > drivers/vhost/net.c | 7 +++++-- > 1 file changed, 5 insertions(+), 2 deletions(-) > > diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c > index 53f09f2..41153a3 100644 > --- a/drivers/vhost/net.c > +++ b/drivers/vhost/net.c > @@ -551,10 +551,13 @@ static int peek_head_len(struct vhost_net_virtqueue *rvq, struct sock *sk) > return len; > } > > -static int sk_has_rx_data(struct sock *sk) > +static int sk_has_rx_data(struct vhost_net_virtqueue *rvq, struct sock *sk) > { > struct socket *sock = sk->sk_socket; > > + if (rvq->rx_array) > + return !__skb_array_empty(rvq->rx_array); > + > if (sock->ops->peek_len) > return sock->ops->peek_len(sock); >I don't see which patch adds __skb_array_empty.> @@ -579,7 +582,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, > endtime = busy_clock() + vq->busyloop_timeout; > > while (vhost_can_busy_poll(&net->dev, endtime) && > - !sk_has_rx_data(sk) && > + !sk_has_rx_data(rvq, sk) && > vhost_vq_avail_empty(&net->dev, vq)) > cpu_relax(); > > -- > 2.7.4
Apparently Analagous Threads
- [PATCH net-next 7/8] vhost_net: try batch dequing from skb array
- [PATCH net-next 7/8] vhost_net: try batch dequing from skb array
- [PATCH net-next 7/8] vhost_net: try batch dequing from skb array
- [PATCH net-next 0/8] vhost-net rx batching
- [PATCH net-next 0/8] vhost-net rx batching