Michael S. Tsirkin
2018-Sep-06 16:46 UTC
[PATCH net-next 11/11] vhost_net: batch submitting XDP buffers to underlayer sockets
On Thu, Sep 06, 2018 at 12:05:26PM +0800, Jason Wang wrote:> This patch implements XDP batching for vhost_net. The idea is first to > try to do userspace copy and build XDP buff directly in vhost. Instead > of submitting the packet immediately, vhost_net will batch them in an > array and submit every 64 (VHOST_NET_BATCH) packets to the under layer > sockets through msg_control of sendmsg(). > > When XDP is enabled on the TUN/TAP, TUN/TAP can process XDP inside a > loop without caring GUP thus it can do batch map flushing. When XDP is > not enabled or not supported, the underlayer socket need to build skb > and pass it to network core. The batched packet submission allows us > to do batching like netif_receive_skb_list() in the future. > > This saves lots of indirect calls for better cache utilization. For > the case that we can't so batching e.g when sndbuf is limited or > packet size is too large, we will go for usual one packet per > sendmsg() way. > > Doing testpmd on various setups gives us: > > Test /+pps% > XDP_DROP on TAP /+44.8% > XDP_REDIRECT on TAP /+29% > macvtap (skb) /+26% > > Netperf tests shows obvious improvements for small packet transmission: > > size/session/+thu%/+normalize% > 64/ 1/ +2%/ 0% > 64/ 2/ +3%/ +1% > 64/ 4/ +7%/ +5% > 64/ 8/ +8%/ +6% > 256/ 1/ +3%/ 0% > 256/ 2/ +10%/ +7% > 256/ 4/ +26%/ +22% > 256/ 8/ +27%/ +23% > 512/ 1/ +3%/ +2% > 512/ 2/ +19%/ +14% > 512/ 4/ +43%/ +40% > 512/ 8/ +45%/ +41% > 1024/ 1/ +4%/ 0% > 1024/ 2/ +27%/ +21% > 1024/ 4/ +38%/ +73% > 1024/ 8/ +15%/ +24% > 2048/ 1/ +10%/ +7% > 2048/ 2/ +16%/ +12% > 2048/ 4/ 0%/ +2% > 2048/ 8/ 0%/ +2% > 4096/ 1/ +36%/ +60% > 4096/ 2/ -11%/ -26% > 4096/ 4/ 0%/ +14% > 4096/ 8/ 0%/ +4% > 16384/ 1/ -1%/ +5% > 16384/ 2/ 0%/ +2% > 16384/ 4/ 0%/ -3% > 16384/ 8/ 0%/ +4% > 65535/ 1/ 0%/ +10% > 65535/ 2/ 0%/ +8% > 65535/ 4/ 0%/ +1% > 65535/ 8/ 0%/ +3% > > Signed-off-by: Jason Wang <jasowang at redhat.com> > --- > drivers/vhost/net.c | 164 ++++++++++++++++++++++++++++++++++++++++---- > 1 file changed, 151 insertions(+), 13 deletions(-) > > diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c > index fb01ce6d981c..1dd4239cbff8 100644 > --- a/drivers/vhost/net.c > +++ b/drivers/vhost/net.c > @@ -116,6 +116,7 @@ struct vhost_net_virtqueue { > * For RX, number of batched heads > */ > int done_idx; > + int batched_xdp;Pls add a comment documenting what does this new field do.> /* an array of userspace buffers info */ > struct ubuf_info *ubuf_info; > /* Reference counting for outstanding ubufs. > @@ -123,6 +124,7 @@ struct vhost_net_virtqueue { > struct vhost_net_ubuf_ref *ubufs; > struct ptr_ring *rx_ring; > struct vhost_net_buf rxq; > + struct xdp_buff xdp[VHOST_NET_BATCH];This is a bit too much to have inline. 64 entries 48 bytes each, and we have 2 of these structures, that's already >4K.> }; > > struct vhost_net { > @@ -338,6 +340,11 @@ static bool vhost_sock_zcopy(struct socket *sock) > sock_flag(sock->sk, SOCK_ZEROCOPY); > } > > +static bool vhost_sock_xdp(struct socket *sock) > +{ > + return sock_flag(sock->sk, SOCK_XDP);what if an xdp program is attached while this processing is going on? Flag value will be wrong - is this safe and why? Pls add a comment.> +} > + > /* In case of DMA done not in order in lower device driver for some reason. > * upend_idx is used to track end of used idx, done_idx is used to track head > * of used idx. Once lower device DMA done contiguously, we will signal KVM > @@ -444,10 +451,36 @@ static void vhost_net_signal_used(struct vhost_net_virtqueue *nvq) > nvq->done_idx = 0; > } > > +static void vhost_tx_batch(struct vhost_net *net, > + struct vhost_net_virtqueue *nvq, > + struct socket *sock, > + struct msghdr *msghdr) > +{ > + struct tun_msg_ctl ctl = { > + .type = nvq->batched_xdp << 16 | TUN_MSG_PTR, > + .ptr = nvq->xdp, > + }; > + int err; > + > + if (nvq->batched_xdp == 0) > + goto signal_used; > + > + msghdr->msg_control = &ctl; > + err = sock->ops->sendmsg(sock, msghdr, 0); > + if (unlikely(err < 0)) { > + vq_err(&nvq->vq, "Fail to batch sending packets\n"); > + return; > + } > + > +signal_used: > + vhost_net_signal_used(nvq); > + nvq->batched_xdp = 0; > +} > + > static int vhost_net_tx_get_vq_desc(struct vhost_net *net, > struct vhost_net_virtqueue *nvq, > unsigned int *out_num, unsigned int *in_num, > - bool *busyloop_intr) > + struct msghdr *msghdr, bool *busyloop_intr) > { > struct vhost_virtqueue *vq = &nvq->vq; > unsigned long uninitialized_var(endtime); > @@ -455,8 +488,9 @@ static int vhost_net_tx_get_vq_desc(struct vhost_net *net, > out_num, in_num, NULL, NULL); > > if (r == vq->num && vq->busyloop_timeout) { > + /* Flush batched packets first */ > if (!vhost_sock_zcopy(vq->private_data)) > - vhost_net_signal_used(nvq); > + vhost_tx_batch(net, nvq, vq->private_data, msghdr); > preempt_disable(); > endtime = busy_clock() + vq->busyloop_timeout; > while (vhost_can_busy_poll(endtime)) { > @@ -512,7 +546,7 @@ static int get_tx_bufs(struct vhost_net *net, > struct vhost_virtqueue *vq = &nvq->vq; > int ret; > > - ret = vhost_net_tx_get_vq_desc(net, nvq, out, in, busyloop_intr); > + ret = vhost_net_tx_get_vq_desc(net, nvq, out, in, msg, busyloop_intr); > > if (ret < 0 || ret == vq->num) > return ret; > @@ -540,6 +574,83 @@ static bool tx_can_batch(struct vhost_virtqueue *vq, size_t total_len) > !vhost_vq_avail_empty(vq->dev, vq); > } > > +#define VHOST_NET_RX_PAD (NET_IP_ALIGN + NET_SKB_PAD)I wonder whether NET_IP_ALIGN make sense for XDP.> + > +static int vhost_net_build_xdp(struct vhost_net_virtqueue *nvq, > + struct iov_iter *from) > +{ > + struct vhost_virtqueue *vq = &nvq->vq; > + struct socket *sock = vq->private_data; > + struct page_frag *alloc_frag = ¤t->task_frag; > + struct virtio_net_hdr *gso; > + struct xdp_buff *xdp = &nvq->xdp[nvq->batched_xdp]; > + size_t len = iov_iter_count(from); > + int headroom = vhost_sock_xdp(sock) ? XDP_PACKET_HEADROOM : 0; > + int buflen = SKB_DATA_ALIGN(sizeof(struct skb_shared_info)); > + int pad = SKB_DATA_ALIGN(VHOST_NET_RX_PAD + headroom + nvq->sock_hlen); > + int sock_hlen = nvq->sock_hlen; > + void *buf; > + int copied; > + > + if (unlikely(len < nvq->sock_hlen)) > + return -EFAULT; > + > + if (SKB_DATA_ALIGN(len + pad) + > + SKB_DATA_ALIGN(sizeof(struct skb_shared_info)) > PAGE_SIZE) > + return -ENOSPC; > + > + buflen += SKB_DATA_ALIGN(len + pad); > + alloc_frag->offset = ALIGN((u64)alloc_frag->offset, SMP_CACHE_BYTES); > + if (unlikely(!skb_page_frag_refill(buflen, alloc_frag, GFP_KERNEL))) > + return -ENOMEM; > + > + buf = (char *)page_address(alloc_frag->page) + alloc_frag->offset; > + > + /* We store two kinds of metadata in the header which will be > + * used for XDP_PASS to do build_skb(): > + * offset 0: buflen > + * offset sizeof(int): vnet headerDefine a struct for the metadata then?> + */ > + copied = copy_page_from_iter(alloc_frag->page, > + alloc_frag->offset + sizeof(int), > + sock_hlen, from); > + if (copied != sock_hlen) > + return -EFAULT; > + > + gso = (struct virtio_net_hdr *)(buf + sizeof(int)); > + > + if ((gso->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) && > + vhost16_to_cpu(vq, gso->csum_start) + > + vhost16_to_cpu(vq, gso->csum_offset) + 2 > > + vhost16_to_cpu(vq, gso->hdr_len)) { > + gso->hdr_len = cpu_to_vhost16(vq, > + vhost16_to_cpu(vq, gso->csum_start) + > + vhost16_to_cpu(vq, gso->csum_offset) + 2); > + > + if (vhost16_to_cpu(vq, gso->hdr_len) > len) > + return -EINVAL; > + } > + > + len -= sock_hlen; > + copied = copy_page_from_iter(alloc_frag->page, > + alloc_frag->offset + pad, > + len, from); > + if (copied != len) > + return -EFAULT; > + > + xdp->data_hard_start = buf; > + xdp->data = buf + pad; > + xdp->data_end = xdp->data + len; > + *(int *)(xdp->data_hard_start) = buflen; > + > + get_page(alloc_frag->page); > + alloc_frag->offset += buflen; > + > + ++nvq->batched_xdp; > + > + return 0; > +} > + > static void handle_tx_copy(struct vhost_net *net, struct socket *sock) > { > struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX]; > @@ -556,10 +667,14 @@ static void handle_tx_copy(struct vhost_net *net, struct socket *sock) > size_t len, total_len = 0; > int err; > int sent_pkts = 0; > + bool bulking = (sock->sk->sk_sndbuf == INT_MAX);What does bulking mean?> > for (;;) { > bool busyloop_intr = false; > > + if (nvq->done_idx == VHOST_NET_BATCH) > + vhost_tx_batch(net, nvq, sock, &msg); > + > head = get_tx_bufs(net, nvq, &msg, &out, &in, &len, > &busyloop_intr); > /* On error, stop handling until the next kick. */ > @@ -577,14 +692,34 @@ static void handle_tx_copy(struct vhost_net *net, struct socket *sock) > break; > } > > - vq->heads[nvq->done_idx].id = cpu_to_vhost32(vq, head); > - vq->heads[nvq->done_idx].len = 0; > - > total_len += len; > - if (tx_can_batch(vq, total_len)) > - msg.msg_flags |= MSG_MORE; > - else > - msg.msg_flags &= ~MSG_MORE; > + > + /* For simplicity, TX batching is only enabled if > + * sndbuf is unlimited.What if sndbuf changes while this processing is going on?> + */ > + if (bulking) { > + err = vhost_net_build_xdp(nvq, &msg.msg_iter); > + if (!err) { > + goto done; > + } else if (unlikely(err != -ENOSPC)) { > + vhost_tx_batch(net, nvq, sock, &msg); > + vhost_discard_vq_desc(vq, 1); > + vhost_net_enable_vq(net, vq); > + break; > + } > + > + /* We can't build XDP buff, go for single > + * packet path but let's flush batched > + * packets. > + */ > + vhost_tx_batch(net, nvq, sock, &msg); > + msg.msg_control = NULL; > + } else { > + if (tx_can_batch(vq, total_len)) > + msg.msg_flags |= MSG_MORE; > + else > + msg.msg_flags &= ~MSG_MORE; > + } > > /* TODO: Check specific error and bomb out unless ENOBUFS? */ > err = sock->ops->sendmsg(sock, &msg, len); > @@ -596,15 +731,17 @@ static void handle_tx_copy(struct vhost_net *net, struct socket *sock) > if (err != len) > pr_debug("Truncated TX packet: len %d != %zd\n", > err, len); > - if (++nvq->done_idx >= VHOST_NET_BATCH) > - vhost_net_signal_used(nvq); > +done: > + vq->heads[nvq->done_idx].id = cpu_to_vhost32(vq, head); > + vq->heads[nvq->done_idx].len = 0; > + ++nvq->done_idx; > if (vhost_exceeds_weight(++sent_pkts, total_len)) { > vhost_poll_queue(&vq->poll); > break; > } > } > > - vhost_net_signal_used(nvq); > + vhost_tx_batch(net, nvq, sock, &msg); > } > > static void handle_tx_zerocopy(struct vhost_net *net, struct socket *sock) > @@ -1111,6 +1248,7 @@ static int vhost_net_open(struct inode *inode, struct file *f) > n->vqs[i].ubuf_info = NULL; > n->vqs[i].upend_idx = 0; > n->vqs[i].done_idx = 0; > + n->vqs[i].batched_xdp = 0; > n->vqs[i].vhost_hlen = 0; > n->vqs[i].sock_hlen = 0; > n->vqs[i].rx_ring = NULL; > -- > 2.17.1
Jason Wang
2018-Sep-07 07:41 UTC
[PATCH net-next 11/11] vhost_net: batch submitting XDP buffers to underlayer sockets
On 2018?09?07? 00:46, Michael S. Tsirkin wrote:> On Thu, Sep 06, 2018 at 12:05:26PM +0800, Jason Wang wrote: >> This patch implements XDP batching for vhost_net. The idea is first to >> try to do userspace copy and build XDP buff directly in vhost. Instead >> of submitting the packet immediately, vhost_net will batch them in an >> array and submit every 64 (VHOST_NET_BATCH) packets to the under layer >> sockets through msg_control of sendmsg(). >> >> When XDP is enabled on the TUN/TAP, TUN/TAP can process XDP inside a >> loop without caring GUP thus it can do batch map flushing. When XDP is >> not enabled or not supported, the underlayer socket need to build skb >> and pass it to network core. The batched packet submission allows us >> to do batching like netif_receive_skb_list() in the future. >> >> This saves lots of indirect calls for better cache utilization. For >> the case that we can't so batching e.g when sndbuf is limited or >> packet size is too large, we will go for usual one packet per >> sendmsg() way. >> >> Doing testpmd on various setups gives us: >> >> Test /+pps% >> XDP_DROP on TAP /+44.8% >> XDP_REDIRECT on TAP /+29% >> macvtap (skb) /+26% >> >> Netperf tests shows obvious improvements for small packet transmission: >> >> size/session/+thu%/+normalize% >> 64/ 1/ +2%/ 0% >> 64/ 2/ +3%/ +1% >> 64/ 4/ +7%/ +5% >> 64/ 8/ +8%/ +6% >> 256/ 1/ +3%/ 0% >> 256/ 2/ +10%/ +7% >> 256/ 4/ +26%/ +22% >> 256/ 8/ +27%/ +23% >> 512/ 1/ +3%/ +2% >> 512/ 2/ +19%/ +14% >> 512/ 4/ +43%/ +40% >> 512/ 8/ +45%/ +41% >> 1024/ 1/ +4%/ 0% >> 1024/ 2/ +27%/ +21% >> 1024/ 4/ +38%/ +73% >> 1024/ 8/ +15%/ +24% >> 2048/ 1/ +10%/ +7% >> 2048/ 2/ +16%/ +12% >> 2048/ 4/ 0%/ +2% >> 2048/ 8/ 0%/ +2% >> 4096/ 1/ +36%/ +60% >> 4096/ 2/ -11%/ -26% >> 4096/ 4/ 0%/ +14% >> 4096/ 8/ 0%/ +4% >> 16384/ 1/ -1%/ +5% >> 16384/ 2/ 0%/ +2% >> 16384/ 4/ 0%/ -3% >> 16384/ 8/ 0%/ +4% >> 65535/ 1/ 0%/ +10% >> 65535/ 2/ 0%/ +8% >> 65535/ 4/ 0%/ +1% >> 65535/ 8/ 0%/ +3% >> >> Signed-off-by: Jason Wang <jasowang at redhat.com> >> --- >> drivers/vhost/net.c | 164 ++++++++++++++++++++++++++++++++++++++++---- >> 1 file changed, 151 insertions(+), 13 deletions(-) >> >> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c >> index fb01ce6d981c..1dd4239cbff8 100644 >> --- a/drivers/vhost/net.c >> +++ b/drivers/vhost/net.c >> @@ -116,6 +116,7 @@ struct vhost_net_virtqueue { >> * For RX, number of batched heads >> */ >> int done_idx; >> + int batched_xdp; > Pls add a comment documenting what does this new field do.Ok.> >> /* an array of userspace buffers info */ >> struct ubuf_info *ubuf_info; >> /* Reference counting for outstanding ubufs. >> @@ -123,6 +124,7 @@ struct vhost_net_virtqueue { >> struct vhost_net_ubuf_ref *ubufs; >> struct ptr_ring *rx_ring; >> struct vhost_net_buf rxq; >> + struct xdp_buff xdp[VHOST_NET_BATCH]; > This is a bit too much to have inline. 64 entries 48 bytes each, and we > have 2 of these structures, that's already >4K.Let me allocate it elsewhere.> >> }; >> >> struct vhost_net { >> @@ -338,6 +340,11 @@ static bool vhost_sock_zcopy(struct socket *sock) >> sock_flag(sock->sk, SOCK_ZEROCOPY); >> } >> >> +static bool vhost_sock_xdp(struct socket *sock) >> +{ >> + return sock_flag(sock->sk, SOCK_XDP); > what if an xdp program is attached while this processing > is going on? Flag value will be wrong - is this safe > and why? Pls add a comment.Ok, let me add comment to explain. It's safe and may just lead few packets to be dropped if XDP program want to extend packet's header.> >> +} >> + >> /* In case of DMA done not in order in lower device driver for some reason. >> * upend_idx is used to track end of used idx, done_idx is used to track head >> * of used idx. Once lower device DMA done contiguously, we will signal KVM >> @@ -444,10 +451,36 @@ static void vhost_net_signal_used(struct vhost_net_virtqueue *nvq) >> nvq->done_idx = 0; >> } >> >> +static void vhost_tx_batch(struct vhost_net *net, >> + struct vhost_net_virtqueue *nvq, >> + struct socket *sock, >> + struct msghdr *msghdr) >> +{ >> + struct tun_msg_ctl ctl = { >> + .type = nvq->batched_xdp << 16 | TUN_MSG_PTR, >> + .ptr = nvq->xdp, >> + }; >> + int err; >> + >> + if (nvq->batched_xdp == 0) >> + goto signal_used; >> + >> + msghdr->msg_control = &ctl; >> + err = sock->ops->sendmsg(sock, msghdr, 0); >> + if (unlikely(err < 0)) { >> + vq_err(&nvq->vq, "Fail to batch sending packets\n"); >> + return; >> + } >> + >> +signal_used: >> + vhost_net_signal_used(nvq); >> + nvq->batched_xdp = 0; >> +} >> + >> static int vhost_net_tx_get_vq_desc(struct vhost_net *net, >> struct vhost_net_virtqueue *nvq, >> unsigned int *out_num, unsigned int *in_num, >> - bool *busyloop_intr) >> + struct msghdr *msghdr, bool *busyloop_intr) >> { >> struct vhost_virtqueue *vq = &nvq->vq; >> unsigned long uninitialized_var(endtime); >> @@ -455,8 +488,9 @@ static int vhost_net_tx_get_vq_desc(struct vhost_net *net, >> out_num, in_num, NULL, NULL); >> >> if (r == vq->num && vq->busyloop_timeout) { >> + /* Flush batched packets first */ >> if (!vhost_sock_zcopy(vq->private_data)) >> - vhost_net_signal_used(nvq); >> + vhost_tx_batch(net, nvq, vq->private_data, msghdr); >> preempt_disable(); >> endtime = busy_clock() + vq->busyloop_timeout; >> while (vhost_can_busy_poll(endtime)) { >> @@ -512,7 +546,7 @@ static int get_tx_bufs(struct vhost_net *net, >> struct vhost_virtqueue *vq = &nvq->vq; >> int ret; >> >> - ret = vhost_net_tx_get_vq_desc(net, nvq, out, in, busyloop_intr); >> + ret = vhost_net_tx_get_vq_desc(net, nvq, out, in, msg, busyloop_intr); >> >> if (ret < 0 || ret == vq->num) >> return ret; >> @@ -540,6 +574,83 @@ static bool tx_can_batch(struct vhost_virtqueue *vq, size_t total_len) >> !vhost_vq_avail_empty(vq->dev, vq); >> } >> >> +#define VHOST_NET_RX_PAD (NET_IP_ALIGN + NET_SKB_PAD) > I wonder whether NET_IP_ALIGN make sense for XDP.XDP is not the only consumer, socket may build skb based on this.> >> + >> +static int vhost_net_build_xdp(struct vhost_net_virtqueue *nvq, >> + struct iov_iter *from) >> +{ >> + struct vhost_virtqueue *vq = &nvq->vq; >> + struct socket *sock = vq->private_data; >> + struct page_frag *alloc_frag = ¤t->task_frag; >> + struct virtio_net_hdr *gso; >> + struct xdp_buff *xdp = &nvq->xdp[nvq->batched_xdp]; >> + size_t len = iov_iter_count(from); >> + int headroom = vhost_sock_xdp(sock) ? XDP_PACKET_HEADROOM : 0; >> + int buflen = SKB_DATA_ALIGN(sizeof(struct skb_shared_info)); >> + int pad = SKB_DATA_ALIGN(VHOST_NET_RX_PAD + headroom + nvq->sock_hlen); >> + int sock_hlen = nvq->sock_hlen; >> + void *buf; >> + int copied; >> + >> + if (unlikely(len < nvq->sock_hlen)) >> + return -EFAULT; >> + >> + if (SKB_DATA_ALIGN(len + pad) + >> + SKB_DATA_ALIGN(sizeof(struct skb_shared_info)) > PAGE_SIZE) >> + return -ENOSPC; >> + >> + buflen += SKB_DATA_ALIGN(len + pad); >> + alloc_frag->offset = ALIGN((u64)alloc_frag->offset, SMP_CACHE_BYTES); >> + if (unlikely(!skb_page_frag_refill(buflen, alloc_frag, GFP_KERNEL))) >> + return -ENOMEM; >> + >> + buf = (char *)page_address(alloc_frag->page) + alloc_frag->offset; >> + >> + /* We store two kinds of metadata in the header which will be >> + * used for XDP_PASS to do build_skb(): >> + * offset 0: buflen >> + * offset sizeof(int): vnet header > Define a struct for the metadata then?Ok.> > >> + */ >> + copied = copy_page_from_iter(alloc_frag->page, >> + alloc_frag->offset + sizeof(int), >> + sock_hlen, from); >> + if (copied != sock_hlen) >> + return -EFAULT; >> + >> + gso = (struct virtio_net_hdr *)(buf + sizeof(int)); >> + >> + if ((gso->flags & VIRTIO_NET_HDR_F_NEEDS_CSUM) && >> + vhost16_to_cpu(vq, gso->csum_start) + >> + vhost16_to_cpu(vq, gso->csum_offset) + 2 > >> + vhost16_to_cpu(vq, gso->hdr_len)) { >> + gso->hdr_len = cpu_to_vhost16(vq, >> + vhost16_to_cpu(vq, gso->csum_start) + >> + vhost16_to_cpu(vq, gso->csum_offset) + 2); >> + >> + if (vhost16_to_cpu(vq, gso->hdr_len) > len) >> + return -EINVAL; >> + } >> + >> + len -= sock_hlen; >> + copied = copy_page_from_iter(alloc_frag->page, >> + alloc_frag->offset + pad, >> + len, from); >> + if (copied != len) >> + return -EFAULT; >> + >> + xdp->data_hard_start = buf; >> + xdp->data = buf + pad; >> + xdp->data_end = xdp->data + len; >> + *(int *)(xdp->data_hard_start) = buflen; >> + >> + get_page(alloc_frag->page); >> + alloc_frag->offset += buflen; >> + >> + ++nvq->batched_xdp; >> + >> + return 0; >> +} >> + >> static void handle_tx_copy(struct vhost_net *net, struct socket *sock) >> { >> struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX]; >> @@ -556,10 +667,14 @@ static void handle_tx_copy(struct vhost_net *net, struct socket *sock) >> size_t len, total_len = 0; >> int err; >> int sent_pkts = 0; >> + bool bulking = (sock->sk->sk_sndbuf == INT_MAX); > What does bulking mean?The name is misleading, it means whether we can do batching. For simplicity, I disable batching is sndbuf is not INT_MAX.>> >> for (;;) { >> bool busyloop_intr = false; >> >> + if (nvq->done_idx == VHOST_NET_BATCH) >> + vhost_tx_batch(net, nvq, sock, &msg); >> + >> head = get_tx_bufs(net, nvq, &msg, &out, &in, &len, >> &busyloop_intr); >> /* On error, stop handling until the next kick. */ >> @@ -577,14 +692,34 @@ static void handle_tx_copy(struct vhost_net *net, struct socket *sock) >> break; >> } >> >> - vq->heads[nvq->done_idx].id = cpu_to_vhost32(vq, head); >> - vq->heads[nvq->done_idx].len = 0; >> - >> total_len += len; >> - if (tx_can_batch(vq, total_len)) >> - msg.msg_flags |= MSG_MORE; >> - else >> - msg.msg_flags &= ~MSG_MORE; >> + >> + /* For simplicity, TX batching is only enabled if >> + * sndbuf is unlimited. > What if sndbuf changes while this processing is going on?We will get the correct sndbuf in the next run of handle_tx(). I think this is safe. Thanks> >> + */ >> + if (bulking) { >> + err = vhost_net_build_xdp(nvq, &msg.msg_iter); >> + if (!err) { >> + goto done; >> + } else if (unlikely(err != -ENOSPC)) { >> + vhost_tx_batch(net, nvq, sock, &msg); >> + vhost_discard_vq_desc(vq, 1); >> + vhost_net_enable_vq(net, vq); >> + break; >> + } >> + >> + /* We can't build XDP buff, go for single >> + * packet path but let's flush batched >> + * packets. >> + */ >> + vhost_tx_batch(net, nvq, sock, &msg); >> + msg.msg_control = NULL; >> + } else { >> + if (tx_can_batch(vq, total_len)) >> + msg.msg_flags |= MSG_MORE; >> + else >> + msg.msg_flags &= ~MSG_MORE; >> + } >> >> /* TODO: Check specific error and bomb out unless ENOBUFS? */ >> err = sock->ops->sendmsg(sock, &msg, len); >> @@ -596,15 +731,17 @@ static void handle_tx_copy(struct vhost_net *net, struct socket *sock) >> if (err != len) >> pr_debug("Truncated TX packet: len %d != %zd\n", >> err, len); >> - if (++nvq->done_idx >= VHOST_NET_BATCH) >> - vhost_net_signal_used(nvq); >> +done: >> + vq->heads[nvq->done_idx].id = cpu_to_vhost32(vq, head); >> + vq->heads[nvq->done_idx].len = 0; >> + ++nvq->done_idx; >> if (vhost_exceeds_weight(++sent_pkts, total_len)) { >> vhost_poll_queue(&vq->poll); >> break; >> } >> } >> >> - vhost_net_signal_used(nvq); >> + vhost_tx_batch(net, nvq, sock, &msg); >> } >> >> static void handle_tx_zerocopy(struct vhost_net *net, struct socket *sock) >> @@ -1111,6 +1248,7 @@ static int vhost_net_open(struct inode *inode, struct file *f) >> n->vqs[i].ubuf_info = NULL; >> n->vqs[i].upend_idx = 0; >> n->vqs[i].done_idx = 0; >> + n->vqs[i].batched_xdp = 0; >> n->vqs[i].vhost_hlen = 0; >> n->vqs[i].sock_hlen = 0; >> n->vqs[i].rx_ring = NULL; >> -- >> 2.17.1
Michael S. Tsirkin
2018-Sep-07 16:13 UTC
[PATCH net-next 11/11] vhost_net: batch submitting XDP buffers to underlayer sockets
On Fri, Sep 07, 2018 at 03:41:52PM +0800, Jason Wang wrote:> > > @@ -556,10 +667,14 @@ static void handle_tx_copy(struct vhost_net *net, struct socket *sock) > > > size_t len, total_len = 0; > > > int err; > > > int sent_pkts = 0; > > > + bool bulking = (sock->sk->sk_sndbuf == INT_MAX); > > What does bulking mean? > > The name is misleading, it means whether we can do batching. For simplicity, > I disable batching is sndbuf is not INT_MAX.But what does batching have to do with sndbuf?> > > for (;;) { > > > bool busyloop_intr = false; > > > + if (nvq->done_idx == VHOST_NET_BATCH) > > > + vhost_tx_batch(net, nvq, sock, &msg); > > > + > > > head = get_tx_bufs(net, nvq, &msg, &out, &in, &len, > > > &busyloop_intr); > > > /* On error, stop handling until the next kick. */ > > > @@ -577,14 +692,34 @@ static void handle_tx_copy(struct vhost_net *net, struct socket *sock) > > > break; > > > } > > > - vq->heads[nvq->done_idx].id = cpu_to_vhost32(vq, head); > > > - vq->heads[nvq->done_idx].len = 0; > > > - > > > total_len += len; > > > - if (tx_can_batch(vq, total_len)) > > > - msg.msg_flags |= MSG_MORE; > > > - else > > > - msg.msg_flags &= ~MSG_MORE; > > > + > > > + /* For simplicity, TX batching is only enabled if > > > + * sndbuf is unlimited. > > What if sndbuf changes while this processing is going on? > > We will get the correct sndbuf in the next run of handle_tx(). I think this > is safe.If it's safe why bother with special-casing INT_MAX? -- MST
Apparently Analagous Threads
- [PATCH net-next 11/11] vhost_net: batch submitting XDP buffers to underlayer sockets
- [PATCH net-next V2 11/11] vhost_net: batch submitting XDP buffers to underlayer sockets
- [PATCH net-next 11/11] vhost_net: batch submitting XDP buffers to underlayer sockets
- [PATCH net-next 11/11] vhost_net: batch submitting XDP buffers to underlayer sockets
- [PATCH RFC 08/13] vhost/net: convert to new API: heads->bufs