Jason Wang
2021-Jan-18 09:10 UTC
[PATCH net-next v2 5/7] virtio-net, xsk: realize the function of xsk packet sending
On 2021/1/16 ??10:59, Xuan Zhuo wrote:> virtnet_xsk_run will be called in the tx interrupt handling function > virtnet_poll_tx. > > The sending process gets desc from the xsk tx queue, and assembles it to > send the data. > > Compared with other drivers, a special place is that the page of the > data in xsk is used here instead of the dma address. Because the virtio > interface does not use the dma address. > > Signed-off-by: Xuan Zhuo <xuanzhuo at linux.alibaba.com> > --- > drivers/net/virtio_net.c | 200 ++++++++++++++++++++++++++++++++++++++++++++++- > 1 file changed, 197 insertions(+), 3 deletions(-) > > diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c > index a62d456..42aa9ad 100644 > --- a/drivers/net/virtio_net.c > +++ b/drivers/net/virtio_net.c > @@ -119,6 +119,8 @@ struct virtnet_xsk_hdr { > u32 len; > }; > > +#define VIRTNET_STATE_XSK_WAKEUP 1 > + > #define VIRTNET_SQ_STAT(m) offsetof(struct virtnet_sq_stats, m) > #define VIRTNET_RQ_STAT(m) offsetof(struct virtnet_rq_stats, m) > > @@ -163,9 +165,12 @@ struct send_queue { > struct xsk_buff_pool __rcu *pool; > struct virtnet_xsk_hdr __rcu *hdr; > > + unsigned long state; > u64 hdr_con; > u64 hdr_pro; > u64 hdr_n; > + struct xdp_desc last_desc; > + bool wait_slot; > } xsk; > }; > > @@ -284,6 +289,8 @@ static void __free_old_xmit_ptr(struct send_queue *sq, bool in_napi, > bool xsk_wakeup, > unsigned int *_packets, unsigned int *_bytes); > static void free_old_xmit_skbs(struct send_queue *sq, bool in_napi); > +static int virtnet_xsk_run(struct send_queue *sq, > + struct xsk_buff_pool *pool, int budget); > > static bool is_xdp_frame(void *ptr) > { > @@ -1590,6 +1597,8 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget) > struct virtnet_info *vi = sq->vq->vdev->priv; > unsigned int index = vq2txq(sq->vq); > struct netdev_queue *txq; > + struct xsk_buff_pool *pool; > + int work = 0; > > if (unlikely(is_xdp_raw_buffer_queue(vi, index))) { > /* We don't need to enable cb for XDP */ > @@ -1599,15 +1608,26 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget) > > txq = netdev_get_tx_queue(vi->dev, index); > __netif_tx_lock(txq, raw_smp_processor_id()); > - free_old_xmit_skbs(sq, true); > + > + rcu_read_lock(); > + pool = rcu_dereference(sq->xsk.pool); > + if (pool) { > + work = virtnet_xsk_run(sq, pool, budget); > + rcu_read_unlock(); > + } else { > + rcu_read_unlock(); > + free_old_xmit_skbs(sq, true); > + } > + > __netif_tx_unlock(txq); > > - virtqueue_napi_complete(napi, sq->vq, 0); > + if (work < budget) > + virtqueue_napi_complete(napi, sq->vq, 0); > > if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS) > netif_tx_wake_queue(txq); > > - return 0; > + return work; > } > > static int xmit_skb(struct send_queue *sq, struct sk_buff *skb) > @@ -2647,6 +2667,180 @@ static int virtnet_xdp(struct net_device *dev, struct netdev_bpf *xdp) > } > } > > +static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool, > + struct xdp_desc *desc) > +{ > + struct virtnet_info *vi = sq->vq->vdev->priv; > + void *data, *ptr; > + struct page *page; > + struct virtnet_xsk_hdr *xskhdr; > + u32 idx, offset, n, i, copy, copied; > + u64 addr; > + int err, m; > + > + addr = desc->addr; > + > + data = xsk_buff_raw_get_data(pool, addr); > + offset = offset_in_page(data); > + > + /* one for hdr, one for the first page */ > + n = 2; > + m = desc->len - (PAGE_SIZE - offset); > + if (m > 0) { > + n += m >> PAGE_SHIFT; > + if (m & PAGE_MASK) > + ++n; > + > + n = min_t(u32, n, ARRAY_SIZE(sq->sg)); > + } > + > + idx = sq->xsk.hdr_con % sq->xsk.hdr_n;I don't understand the reason of the hdr array. It looks to me all of them are zero and read only from device. Any reason for not reusing a single hdr for all xdp descriptors? Or maybe it's time to introduce VIRTIO_NET_F_NO_HDR.> + xskhdr = &sq->xsk.hdr[idx]; > + > + /* xskhdr->hdr has been memset to zero, so not need to clear again */ > + > + sg_init_table(sq->sg, n); > + sg_set_buf(sq->sg, &xskhdr->hdr, vi->hdr_len); > + > + copied = 0; > + for (i = 1; i < n; ++i) { > + copy = min_t(int, desc->len - copied, PAGE_SIZE - offset); > + > + page = xsk_buff_raw_get_page(pool, addr + copied); > + > + sg_set_page(sq->sg + i, page, copy, offset); > + copied += copy; > + if (offset) > + offset = 0; > + }It looks to me we need to terminate the sg: ** ?* virtqueue_add_outbuf - expose output buffers to other end ?* @vq: the struct virtqueue we're talking about. ?* @sg: scatterlist (must be well-formed and terminated!)> + > + xskhdr->len = desc->len; > + ptr = xdp_to_ptr(&xskhdr->type); > + > + err = virtqueue_add_outbuf(sq->vq, sq->sg, n, ptr, GFP_ATOMIC); > + if (unlikely(err)) > + sq->xsk.last_desc = *desc; > + else > + sq->xsk.hdr_con++; > + > + return err; > +} > + > +static bool virtnet_xsk_dev_is_full(struct send_queue *sq) > +{ > + if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) > + return true; > + > + if (sq->xsk.hdr_con == sq->xsk.hdr_pro) > + return true;Can we really reach here?> + > + return false; > +} > + > +static int virtnet_xsk_xmit_zc(struct send_queue *sq, > + struct xsk_buff_pool *pool, unsigned int budget) > +{ > + struct xdp_desc desc; > + int err, packet = 0; > + int ret = -EAGAIN; > + > + if (sq->xsk.last_desc.addr) { > + err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc); > + if (unlikely(err)) > + return -EBUSY; > + > + ++packet; > + sq->xsk.last_desc.addr = 0; > + } > + > + while (budget-- > 0) { > + if (virtnet_xsk_dev_is_full(sq)) { > + ret = -EBUSY; > + break; > + }It looks to me we will always hit this if userspace is fast. E.g we don't kick until the virtqueue is full ...> + > + if (!xsk_tx_peek_desc(pool, &desc)) { > + /* done */ > + ret = 0; > + break; > + } > + > + err = virtnet_xsk_xmit(sq, pool, &desc); > + if (unlikely(err)) { > + ret = -EBUSY; > + break; > + } > + > + ++packet; > + } > + > + if (packet) { > + xsk_tx_release(pool); > + > + if (virtqueue_kick_prepare(sq->vq) && virtqueue_notify(sq->vq)) { > + u64_stats_update_begin(&sq->stats.syncp); > + sq->stats.kicks++; > + u64_stats_update_end(&sq->stats.syncp); > + } > + } > + > + return ret; > +} > + > +static int virtnet_xsk_run(struct send_queue *sq, > + struct xsk_buff_pool *pool, int budget) > +{ > + int err, ret = 0; > + unsigned int _packets = 0; > + unsigned int _bytes = 0; > + > + sq->xsk.wait_slot = false; > + > + __free_old_xmit_ptr(sq, true, false, &_packets, &_bytes); > + > + err = virtnet_xsk_xmit_zc(sq, pool, xsk_budget); > + if (!err) { > + struct xdp_desc desc; > + > + clear_bit(VIRTNET_STATE_XSK_WAKEUP, &sq->xsk.state); > + xsk_set_tx_need_wakeup(pool); > + > + /* Race breaker. If new is coming after last xmit > + * but before flag change > + */ > + > + if (!xsk_tx_peek_desc(pool, &desc)) > + goto end; > + > + set_bit(VIRTNET_STATE_XSK_WAKEUP, &sq->xsk.state); > + xsk_clear_tx_need_wakeup(pool);How memory ordering is going to work here? Or we don't need to care about that?> + > + sq->xsk.last_desc = desc; > + ret = budget; > + goto end; > + } > + > + xsk_clear_tx_need_wakeup(pool); > + > + if (err == -EAGAIN) { > + ret = budget; > + goto end; > + } > + > + __free_old_xmit_ptr(sq, true, false, &_packets, &_bytes); > + > + if (!virtnet_xsk_dev_is_full(sq)) { > + ret = budget; > + goto end; > + } > + > + sq->xsk.wait_slot = true; > + > + virtnet_sq_stop_check(sq, true); > +end: > + return ret; > +} > + > static int virtnet_get_phys_port_name(struct net_device *dev, char *buf, > size_t len) > {
Michael S. Tsirkin
2021-Jan-18 12:27 UTC
[PATCH net-next v2 5/7] virtio-net, xsk: realize the function of xsk packet sending
On Mon, Jan 18, 2021 at 05:10:24PM +0800, Jason Wang wrote:> > On 2021/1/16 ??10:59, Xuan Zhuo wrote: > > virtnet_xsk_run will be called in the tx interrupt handling function > > virtnet_poll_tx. > > > > The sending process gets desc from the xsk tx queue, and assembles it to > > send the data. > > > > Compared with other drivers, a special place is that the page of the > > data in xsk is used here instead of the dma address. Because the virtio > > interface does not use the dma address. > > > > Signed-off-by: Xuan Zhuo <xuanzhuo at linux.alibaba.com> > > --- > > drivers/net/virtio_net.c | 200 ++++++++++++++++++++++++++++++++++++++++++++++- > > 1 file changed, 197 insertions(+), 3 deletions(-) > > > > diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c > > index a62d456..42aa9ad 100644 > > --- a/drivers/net/virtio_net.c > > +++ b/drivers/net/virtio_net.c > > @@ -119,6 +119,8 @@ struct virtnet_xsk_hdr { > > u32 len; > > }; > > +#define VIRTNET_STATE_XSK_WAKEUP 1 > > + > > #define VIRTNET_SQ_STAT(m) offsetof(struct virtnet_sq_stats, m) > > #define VIRTNET_RQ_STAT(m) offsetof(struct virtnet_rq_stats, m) > > @@ -163,9 +165,12 @@ struct send_queue { > > struct xsk_buff_pool __rcu *pool; > > struct virtnet_xsk_hdr __rcu *hdr; > > + unsigned long state; > > u64 hdr_con; > > u64 hdr_pro; > > u64 hdr_n; > > + struct xdp_desc last_desc; > > + bool wait_slot; > > } xsk; > > };Please add documentation about the new fields/defines, how are they accessed, what locking/ordering is in place.> > @@ -284,6 +289,8 @@ static void __free_old_xmit_ptr(struct send_queue *sq, bool in_napi, > > bool xsk_wakeup, > > unsigned int *_packets, unsigned int *_bytes); > > static void free_old_xmit_skbs(struct send_queue *sq, bool in_napi); > > +static int virtnet_xsk_run(struct send_queue *sq, > > + struct xsk_buff_pool *pool, int budget); > > static bool is_xdp_frame(void *ptr) > > { > > @@ -1590,6 +1597,8 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget) > > struct virtnet_info *vi = sq->vq->vdev->priv; > > unsigned int index = vq2txq(sq->vq); > > struct netdev_queue *txq; > > + struct xsk_buff_pool *pool; > > + int work = 0; > > if (unlikely(is_xdp_raw_buffer_queue(vi, index))) { > > /* We don't need to enable cb for XDP */ > > @@ -1599,15 +1608,26 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget) > > txq = netdev_get_tx_queue(vi->dev, index); > > __netif_tx_lock(txq, raw_smp_processor_id()); > > - free_old_xmit_skbs(sq, true); > > + > > + rcu_read_lock(); > > + pool = rcu_dereference(sq->xsk.pool); > > + if (pool) { > > + work = virtnet_xsk_run(sq, pool, budget); > > + rcu_read_unlock(); > > + } else { > > + rcu_read_unlock(); > > + free_old_xmit_skbs(sq, true); > > + } > > + > > __netif_tx_unlock(txq); > > - virtqueue_napi_complete(napi, sq->vq, 0); > > + if (work < budget) > > + virtqueue_napi_complete(napi, sq->vq, 0); > > if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS) > > netif_tx_wake_queue(txq); > > - return 0; > > + return work; > > } > > static int xmit_skb(struct send_queue *sq, struct sk_buff *skb) > > @@ -2647,6 +2667,180 @@ static int virtnet_xdp(struct net_device *dev, struct netdev_bpf *xdp) > > } > > } > > +static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool, > > + struct xdp_desc *desc) > > +{ > > + struct virtnet_info *vi = sq->vq->vdev->priv; > > + void *data, *ptr; > > + struct page *page; > > + struct virtnet_xsk_hdr *xskhdr; > > + u32 idx, offset, n, i, copy, copied; > > + u64 addr; > > + int err, m; > > + > > + addr = desc->addr; > > + > > + data = xsk_buff_raw_get_data(pool, addr); > > + offset = offset_in_page(data); > > + > > + /* one for hdr, one for the first page */ > > + n = 2; > > + m = desc->len - (PAGE_SIZE - offset); > > + if (m > 0) { > > + n += m >> PAGE_SHIFT; > > + if (m & PAGE_MASK) > > + ++n; > > + > > + n = min_t(u32, n, ARRAY_SIZE(sq->sg)); > > + } > > + > > + idx = sq->xsk.hdr_con % sq->xsk.hdr_n; > > > I don't understand the reason of the hdr array. It looks to me all of them > are zero and read only from device. > > Any reason for not reusing a single hdr for all xdp descriptors? Or maybe > it's time to introduce VIRTIO_NET_F_NO_HDR.I'm not sure it's worth it, since - xdp can be enabled/disabled dynamically - there's intent to add offload support to xdp> > > + xskhdr = &sq->xsk.hdr[idx]; > > + > > + /* xskhdr->hdr has been memset to zero, so not need to clear again */ > > + > > + sg_init_table(sq->sg, n); > > + sg_set_buf(sq->sg, &xskhdr->hdr, vi->hdr_len); > > + > > + copied = 0; > > + for (i = 1; i < n; ++i) { > > + copy = min_t(int, desc->len - copied, PAGE_SIZE - offset); > > + > > + page = xsk_buff_raw_get_page(pool, addr + copied); > > + > > + sg_set_page(sq->sg + i, page, copy, offset); > > + copied += copy; > > + if (offset) > > + offset = 0; > > + } > > > It looks to me we need to terminate the sg: > > ** > ?* virtqueue_add_outbuf - expose output buffers to other end > ?* @vq: the struct virtqueue we're talking about. > ?* @sg: scatterlist (must be well-formed and terminated!) > > > > + > > + xskhdr->len = desc->len; > > + ptr = xdp_to_ptr(&xskhdr->type); > > + > > + err = virtqueue_add_outbuf(sq->vq, sq->sg, n, ptr, GFP_ATOMIC); > > + if (unlikely(err)) > > + sq->xsk.last_desc = *desc; > > + else > > + sq->xsk.hdr_con++; > > + > > + return err; > > +} > > + > > +static bool virtnet_xsk_dev_is_full(struct send_queue *sq) > > +{ > > + if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) > > + return true; > > + > > + if (sq->xsk.hdr_con == sq->xsk.hdr_pro) > > + return true; > > > Can we really reach here? > > > > + > > + return false; > > +} > > + > > +static int virtnet_xsk_xmit_zc(struct send_queue *sq, > > + struct xsk_buff_pool *pool, unsigned int budget) > > +{ > > + struct xdp_desc desc; > > + int err, packet = 0; > > + int ret = -EAGAIN; > > + > > + if (sq->xsk.last_desc.addr) { > > + err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc); > > + if (unlikely(err)) > > + return -EBUSY; > > + > > + ++packet; > > + sq->xsk.last_desc.addr = 0; > > + } > > + > > + while (budget-- > 0) { > > + if (virtnet_xsk_dev_is_full(sq)) { > > + ret = -EBUSY; > > + break; > > + } > > > It looks to me we will always hit this if userspace is fast. E.g we don't > kick until the virtqueue is full ... > > > > + > > + if (!xsk_tx_peek_desc(pool, &desc)) { > > + /* done */ > > + ret = 0; > > + break; > > + } > > + > > + err = virtnet_xsk_xmit(sq, pool, &desc); > > + if (unlikely(err)) { > > + ret = -EBUSY; > > + break; > > + } > > + > > + ++packet; > > + } > > + > > + if (packet) { > > + xsk_tx_release(pool); > > + > > + if (virtqueue_kick_prepare(sq->vq) && virtqueue_notify(sq->vq)) { > > + u64_stats_update_begin(&sq->stats.syncp); > > + sq->stats.kicks++; > > + u64_stats_update_end(&sq->stats.syncp); > > + } > > + } > > + > > + return ret; > > +} > > + > > +static int virtnet_xsk_run(struct send_queue *sq, > > + struct xsk_buff_pool *pool, int budget) > > +{ > > + int err, ret = 0; > > + unsigned int _packets = 0; > > + unsigned int _bytes = 0; > > + > > + sq->xsk.wait_slot = false; > > + > > + __free_old_xmit_ptr(sq, true, false, &_packets, &_bytes); > > + > > + err = virtnet_xsk_xmit_zc(sq, pool, xsk_budget); > > + if (!err) { > > + struct xdp_desc desc; > > + > > + clear_bit(VIRTNET_STATE_XSK_WAKEUP, &sq->xsk.state); > > + xsk_set_tx_need_wakeup(pool); > > + > > + /* Race breaker. If new is coming after last xmit > > + * but before flag change > > + */ > > + > > + if (!xsk_tx_peek_desc(pool, &desc)) > > + goto end; > > + > > + set_bit(VIRTNET_STATE_XSK_WAKEUP, &sq->xsk.state); > > + xsk_clear_tx_need_wakeup(pool); > > > How memory ordering is going to work here? Or we don't need to care about > that? > > > > + > > + sq->xsk.last_desc = desc; > > + ret = budget; > > + goto end; > > + } > > + > > + xsk_clear_tx_need_wakeup(pool); > > + > > + if (err == -EAGAIN) { > > + ret = budget; > > + goto end; > > + } > > + > > + __free_old_xmit_ptr(sq, true, false, &_packets, &_bytes); > > + > > + if (!virtnet_xsk_dev_is_full(sq)) { > > + ret = budget; > > + goto end; > > + } > > + > > + sq->xsk.wait_slot = true; > > + > > + virtnet_sq_stop_check(sq, true); > > +end: > > + return ret; > > +} > > + > > static int virtnet_get_phys_port_name(struct net_device *dev, char *buf, > > size_t len) > > {