Jason Wang
2021-Apr-19 02:46 UTC
[PATCH net-next v4 09/10] virtio-net: xsk zero copy xmit implement wakeup and xmit
? 2021/4/16 ??2:29, Xuan Zhuo ??:> On Fri, 16 Apr 2021 13:35:33 +0800, Jason Wang <jasowang at redhat.com> wrote: >> ? 2021/4/15 ??6:27, Xuan Zhuo ??: >>> On Wed, 14 Apr 2021 13:46:45 +0800, Jason Wang <jasowang at redhat.com> wrote: >>>> ? 2021/4/13 ??11:15, Xuan Zhuo ??: >>>>> This patch implements the core part of xsk zerocopy xmit. >>>>> >>>>> When the user calls sendto to consume the data in the xsk tx queue, >>>>> virtnet_xsk_wakeup() will be called. >>>>> >>>>> In wakeup, it will try to send a part of the data directly. There are >>>>> two purposes for this realization: >>>>> >>>>> 1. Send part of the data quickly to reduce the transmission delay of the >>>>> first packet. >>>>> 2. Trigger tx interrupt, start napi to consume xsk tx data. >>>>> >>>>> All sent xsk packets share the virtio-net header of xsk_hdr. If xsk >>>>> needs to support csum and other functions later, consider assigning xsk >>>>> hdr separately for each sent packet. >>>>> >>>>> There are now three situations in free_old_xmit(): skb, xdp frame, xsk >>>>> desc. Based on the last two bit of ptr returned by virtqueue_get_buf(): >>>>> 00 is skb by default. >>>>> 01 represents the packet sent by xdp >>>>> 10 is the packet sent by xsk >>>>> >>>>> If the xmit work of xsk has not been completed, but the ring is full, >>>>> napi must first exit and wait for the ring to be available, so >>>>> need_wakeup() is set. If free_old_xmit() is called first by start_xmit(), >>>>> we can quickly wake up napi to execute xsk xmit task. >>>>> >>>>> When recycling, we need to count the number of bytes sent, so put xsk >>>>> desc->len into the ptr pointer. Because ptr does not point to meaningful >>>>> objects in xsk. >>>>> >>>>> Signed-off-by: Xuan Zhuo <xuanzhuo at linux.alibaba.com> >>>>> Reviewed-by: Dust Li <dust.li at linux.alibaba.com> >>>>> --- >>>>> drivers/net/virtio_net.c | 296 ++++++++++++++++++++++++++++++++++++++- >>>>> 1 file changed, 292 insertions(+), 4 deletions(-) >>>>> >>>>> diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c >>>>> index 8242a9e9f17d..c441d6bf1510 100644 >>>>> --- a/drivers/net/virtio_net.c >>>>> +++ b/drivers/net/virtio_net.c >>>>> @@ -46,6 +46,11 @@ module_param(napi_tx, bool, 0644); >>>>> #define VIRTIO_XDP_REDIR BIT(1) >>>>> >>>>> #define VIRTIO_XDP_FLAG BIT(0) >>>>> +#define VIRTIO_XSK_FLAG BIT(1) >>>>> + >>>>> +#define VIRTIO_XSK_PTR_SHIFT 4 >>>>> + >>>>> +static struct virtio_net_hdr_mrg_rxbuf xsk_hdr; >>>>> >>>>> /* RX packet size EWMA. The average packet size is used to determine the packet >>>>> * buffer size when refilling RX rings. As the entire RX ring may be refilled >>>>> @@ -138,6 +143,12 @@ struct send_queue { >>>>> struct { >>>>> /* xsk pool */ >>>>> struct xsk_buff_pool __rcu *pool; >>>>> + >>>>> + /* save the desc for next xmit, when xmit fail. */ >>>>> + struct xdp_desc last_desc; >>>> As replied in the pervious version this looks tricky. I think we need to >>>> make sure to reserve some slots as skb path did. >>>> >>>> This looks exactly like what stmmac did which alos shares XDP and skb >>>> for the same ring. >>>> >>>> >>>>> + >>>>> + /* xsk wait for tx inter or softirq */ >>>>> + bool need_wakeup; >>>>> } xsk; >>>>> }; >>>>> >>>>> @@ -255,6 +266,15 @@ struct padded_vnet_hdr { >>>>> char padding[4]; >>>>> }; >>>>> >>>>> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool, >>>>> + int budget, bool in_napi); >>>>> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num); >>>>> + >>>>> +static bool is_skb_ptr(void *ptr) >>>>> +{ >>>>> + return !((unsigned long)ptr & (VIRTIO_XDP_FLAG | VIRTIO_XSK_FLAG)); >>>>> +} >>>>> + >>>>> static bool is_xdp_frame(void *ptr) >>>>> { >>>>> return (unsigned long)ptr & VIRTIO_XDP_FLAG; >>>>> @@ -265,6 +285,19 @@ static void *xdp_to_ptr(struct xdp_frame *ptr) >>>>> return (void *)((unsigned long)ptr | VIRTIO_XDP_FLAG); >>>>> } >>>>> >>>>> +static void *xsk_to_ptr(struct xdp_desc *desc) >>>>> +{ >>>>> + /* save the desc len to ptr */ >>>>> + u64 p = desc->len << VIRTIO_XSK_PTR_SHIFT; >>>>> + >>>>> + return (void *)((unsigned long)p | VIRTIO_XSK_FLAG); >>>>> +} >>>>> + >>>>> +static void ptr_to_xsk(void *ptr, struct xdp_desc *desc) >>>>> +{ >>>>> + desc->len = ((unsigned long)ptr) >> VIRTIO_XSK_PTR_SHIFT; >>>>> +} >>>>> + >>>>> static struct xdp_frame *ptr_to_xdp(void *ptr) >>>>> { >>>>> return (struct xdp_frame *)((unsigned long)ptr & ~VIRTIO_XDP_FLAG); >>>>> @@ -273,25 +306,35 @@ static struct xdp_frame *ptr_to_xdp(void *ptr) >>>>> static void __free_old_xmit(struct send_queue *sq, bool in_napi, >>>>> struct virtnet_sq_stats *stats) >>>>> { >>>>> + unsigned int xsknum = 0; >>>>> unsigned int len; >>>>> void *ptr; >>>>> >>>>> while ((ptr = virtqueue_get_buf(sq->vq, &len)) != NULL) { >>>>> - if (likely(!is_xdp_frame(ptr))) { >>>>> + if (is_skb_ptr(ptr)) { >>>>> struct sk_buff *skb = ptr; >>>>> >>>>> pr_debug("Sent skb %p\n", skb); >>>>> >>>>> stats->bytes += skb->len; >>>>> napi_consume_skb(skb, in_napi); >>>>> - } else { >>>>> + } else if (is_xdp_frame(ptr)) { >>>>> struct xdp_frame *frame = ptr_to_xdp(ptr); >>>>> >>>>> stats->bytes += frame->len; >>>>> xdp_return_frame(frame); >>>>> + } else { >>>>> + struct xdp_desc desc; >>>>> + >>>>> + ptr_to_xsk(ptr, &desc); >>>>> + stats->bytes += desc.len; >>>>> + ++xsknum; >>>>> } >>>>> stats->packets++; >>>>> } >>>>> + >>>>> + if (xsknum) >>>>> + virtnet_xsk_complete(sq, xsknum); >>>>> } >>>>> >>>>> /* Converting between virtqueue no. and kernel tx/rx queue no. >>>>> @@ -1529,6 +1572,19 @@ static int virtnet_open(struct net_device *dev) >>>>> return 0; >>>>> } >>>>> >>>>> +static int virtnet_poll_xsk(struct send_queue *sq, int budget) >>>>> +{ >>>>> + struct xsk_buff_pool *pool; >>>>> + int work_done = 0; >>>>> + >>>>> + rcu_read_lock(); >>>>> + pool = rcu_dereference(sq->xsk.pool); >>>>> + if (pool) >>>>> + work_done = virtnet_xsk_run(sq, pool, budget, true); >>>>> + rcu_read_unlock(); >>>>> + return work_done; >>>>> +} >>>>> + >>>>> static int virtnet_poll_tx(struct napi_struct *napi, int budget) >>>>> { >>>>> struct send_queue *sq = container_of(napi, struct send_queue, napi); >>>>> @@ -1545,6 +1601,7 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget) >>>>> >>>>> txq = netdev_get_tx_queue(vi->dev, index); >>>>> __netif_tx_lock(txq, raw_smp_processor_id()); >>>>> + work_done += virtnet_poll_xsk(sq, budget); >>>>> free_old_xmit(sq, true); >>>>> __netif_tx_unlock(txq); >>>>> >>>>> @@ -2535,6 +2592,234 @@ static int virtnet_xdp_set(struct net_device *dev, struct bpf_prog *prog, >>>>> return err; >>>>> } >>>>> >>>>> +static void virtnet_xsk_check_queue(struct send_queue *sq) >>>>> +{ >>>>> + struct virtnet_info *vi = sq->vq->vdev->priv; >>>>> + struct net_device *dev = vi->dev; >>>>> + int qnum = sq - vi->sq; >>>>> + >>>>> + /* If this sq is not the exclusive queue of the current cpu, >>>>> + * then it may be called by start_xmit, so check it running out >>>>> + * of space. >>>>> + * >>>> I think it's better to move this check after is_xdp_raw_buffer_queue(). >>> Sorry, do not understand. >> >> So what I meant is: >> >> >> /* If it is a raw buffer queue, ... */ >> ??? if (is_xdp_raw_buffer_queue()) >> >> /* if it is not the exclusive queue */ >> ??? if (sq->vq->num_free ...) > I understand. > >> >>>>> + * And if it is a raw buffer queue, it does not check whether the status >>>>> + * of the queue is stopped when sending. So there is no need to check >>>>> + * the situation of the raw buffer queue. >>>>> + */ >>>>> + if (is_xdp_raw_buffer_queue(vi, qnum)) >>>>> + return; >>>>> + >>>>> + /* Stop the queue to avoid getting packets that we are >>>>> + * then unable to transmit. Then wait the tx interrupt. >>>>> + */ >>>>> + if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) >>>>> + netif_stop_subqueue(dev, qnum); >>>> Is there any way to stop xsk TX here? >>> xsk tx is driven by tx napi, so stop has no meaning. >> >> So NAPI can be stopped. >> >> >>> >>>>> +} >>>>> + >>>>> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num) >>>>> +{ >>>>> + struct xsk_buff_pool *pool; >>>>> + >>>>> + rcu_read_lock(); >>>>> + >>>>> + pool = rcu_dereference(sq->xsk.pool); >>>>> + if (!pool) { >>>>> + rcu_read_unlock(); >>>>> + return; >>>>> + } >>>>> + xsk_tx_completed(pool, num); >>>>> + rcu_read_unlock(); >>>>> + >>>>> + if (sq->xsk.need_wakeup) { >>>>> + sq->xsk.need_wakeup = false; >>>>> + virtqueue_napi_schedule(&sq->napi, sq->vq); >>>>> + } >>>>> +} >>>>> + >>>>> +static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool, >>>>> + struct xdp_desc *desc) >>>>> +{ >>>>> + struct virtnet_info *vi; >>>>> + u32 offset, n, len; >>>>> + struct page *page; >>>>> + void *data; >>>>> + u64 addr; >>>>> + int err; >>>>> + >>>>> + vi = sq->vq->vdev->priv; >>>>> + addr = desc->addr; >>>>> + >>>>> + data = xsk_buff_raw_get_data(pool, addr); >>>>> + offset = offset_in_page(data); >>>>> + >>>>> + /* xsk unaligned mode, desc may use two pages */ >>>>> + if (desc->len > PAGE_SIZE - offset) >>>>> + n = 3; >>>>> + else >>>>> + n = 2; >>>>> + >>>>> + sg_init_table(sq->sg, n); >>>>> + sg_set_buf(sq->sg, &xsk_hdr, vi->hdr_len); >>>>> + >>>>> + /* handle for xsk first page */ >>>>> + len = min_t(int, desc->len, PAGE_SIZE - offset); >>>>> + page = xsk_buff_xdp_get_page(pool, addr); >>>>> + sg_set_page(sq->sg + 1, page, len, offset); >>>>> + >>>>> + /* xsk unaligned mode, handle for the second page */ >>>>> + if (len < desc->len) { >>>>> + page = xsk_buff_xdp_get_page(pool, addr + len); >>>>> + len = min_t(int, desc->len - len, PAGE_SIZE); >>>>> + sg_set_page(sq->sg + 2, page, len, 0); >>>>> + } >>>>> + >>>>> + err = virtqueue_add_outbuf(sq->vq, sq->sg, n, xsk_to_ptr(desc), >>>>> + GFP_ATOMIC); >>>>> + if (unlikely(err)) >>>>> + sq->xsk.last_desc = *desc; >>>>> + >>>>> + return err; >>>>> +} >>>>> + >>>>> +static int virtnet_xsk_xmit_batch(struct send_queue *sq, >>>>> + struct xsk_buff_pool *pool, >>>>> + unsigned int budget, >>>>> + bool in_napi, int *done, >>>>> + struct virtnet_sq_stats *stats) >>>>> +{ >>>>> + struct xdp_desc desc; >>>>> + int err, packet = 0; >>>>> + int ret = -EAGAIN; >>>>> + >>>>> + if (sq->xsk.last_desc.addr) { >>>>> + if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) >>>>> + return -EBUSY; >>>>> + >>>>> + err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc); >>>>> + if (unlikely(err)) >>>>> + return -EBUSY; >>>>> + >>>>> + ++packet; >>>>> + --budget; >>>>> + sq->xsk.last_desc.addr = 0; >>>> So I think we don't need to do this since we try always to reserve 2 + >>>> MAX_SKB_FRAGS, then it means we get -EIO/-ENOMEM which is bascially a >>>> broken device or dma map. >>>> >>> Does it mean that when there are enough slots, virtqueue_add_outbuf() returns >>> failure -EIO/-ENOMEM, which means that the network card is no longer working. >> >> Or it could be a bug of the driver. And you don't need to check num_free >> before if you always try to reserve sufficient slots. >> >> >>> I originally thought that an error was returned, but it may work normally next >>> time. It should be my fault. >>> >>> Thank you very much, I learned it. I will delete the related code. >>> >>>>> + } >>>>> + >>>>> + while (budget-- > 0) { >>>>> + if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) { >>>>> + ret = -EBUSY; >>>>> + break; >>>>> + } >>>>> + >>>>> + if (!xsk_tx_peek_desc(pool, &desc)) { >>>>> + /* done */ >>>>> + ret = 0; >>>>> + break; >>>>> + } >>>>> + >>>>> + err = virtnet_xsk_xmit(sq, pool, &desc); >>>>> + if (unlikely(err)) { >>>>> + ret = -EBUSY; >>>> Since the function will be called by NAPI I think we to report the >>>> number of packets that is transmitted as well. >>> A 'desc' points to a packet, so only one packet is sent by virtnet_xsk_xmit(). >>> I have this information in the following variable "packet" statistics. Finally, >>> use the parameter "done" to pass to the upper layer. >> >> I meant you did: >> >> virtnet_poll_tx() >> ??? work_done += virtnet_poll_xsk() >> ??? virtnet_xsk_run() >> ??????? virtnet_xsk_xmit_batch() >> >> If there's no suffcieint slot you still need to return the number of >> packet that has been sent. > I don?t understand what you mean. > > As long as there are packets sent, "++packet" will be counted, and finally > returned to virtnet_xsk_run() through "done", regardless of whether there is > insufficient slot.Right, I misread the code.> >> >> >>>>> + break; >>>>> + } >>>>> + >>>>> + ++packet; >>>>> + } >>>>> + >>>>> + if (packet) { >>>>> + if (virtqueue_kick_prepare(sq->vq) && virtqueue_notify(sq->vq)) >>>>> + ++stats->kicks; >>>>> + >>>>> + *done = packet; >>>>> + stats->xdp_tx += packet; >>>>> + >>>>> + xsk_tx_release(pool); >>>>> + } >>>>> + >>>>> + return ret; >>>>> +} >>>>> + >>>>> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool, >>>>> + int budget, bool in_napi) >>>>> +{ >>>>> + struct virtnet_sq_stats stats = {}; >>>>> + int done = 0; >>>>> + int err; >>>>> + >>>>> + sq->xsk.need_wakeup = false; >>>>> + __free_old_xmit(sq, in_napi, &stats); >>>>> + >>>>> + /* return err: >>>>> + * -EAGAIN: done == budget >>>>> + * -EBUSY: done < budget >>>>> + * 0 : done < budget >>>>> + */ >>>>> + err = virtnet_xsk_xmit_batch(sq, pool, budget, in_napi, &done, &stats); >>>>> + if (err == -EBUSY) { >>>>> + __free_old_xmit(sq, in_napi, &stats); >>>>> + >>>>> + /* If the space is enough, let napi run again. */ >>>>> + if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS) >>>>> + done = budget; >>>> Why you need to run NAPI isntead of a netif_tx_wake()? >>> When virtnet_xsk_run() is called by virtnet_xsk_wakeup(), the return value is >>> not concerned. >>> >>> virtnet_xsk_run() is already in napi when it is called by poll_tx(), so if there >>> are more slots, I try to return "budget" and let napi call poll_tx() again >> >> Well, my question is why you don't need to wakeup the qdisc in this >> case. Note that the queue could be shared with ndo_start_xmit(). > virtnet_xsk_run() will been called by virtnet_poll_tx() or virtnet_xsk_wakeup(). > > 1. virtnet_poll_tx() will call netif_tx_wake_queue() > > static int virtnet_poll_tx(struct napi_struct *napi, int budget) > { > ..... > > if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS) > netif_tx_wake_queue(txq); > > 2. virtnet_xsk_wakeup() > You are right. We can indeed call virtnet_xsk_wakeup() here. > >> >>>>> + else >>>>> + sq->xsk.need_wakeup = true; >>>> So done is 0, is this intended? >>> When err == 0, it indicates that the xsk queue has been exhausted. At this time, >>> done < budget, return done directly, and napi can be stopped. >>> >>> When err == -EAGAIN, it indicates that the budget amount of patch has been sent, >>> and done should be returned to the upper layer. Wait for napi to call poll_tx() >>> again >> >> So the code is only for -EBUSY if I read the code correctly. And in this >> case you return 0. > This case, I return 'done' and the done < budge. 'done' may not be 0. > > done < budget, so napi will stop.So to question is about this: ??????????????? /* If the space is enough, let napi run again. */ ??????????????? if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS) ??????????????????????? done = budget; else ??????????????????????? sq->xsk.need_wakeup = true; So done is 0 when num_free < 2 + MAX_SKB_FRAGS, I think you should return done here. Note that done is still less than budget in this case. Another question, when num_free >= 2 + MAX_SKB_FRAGS why not simply re-try virtnet_xsk_xmit_batch here?> > >> >>> In both cases, done is directly returned to the upper layer without special >>> processing. >>> >>>>> + } >>>>> + >>>>> + virtnet_xsk_check_queue(sq); >>>>> + >>>>> + u64_stats_update_begin(&sq->stats.syncp); >>>>> + sq->stats.packets += stats.packets; >>>>> + sq->stats.bytes += stats.bytes; >>>>> + sq->stats.kicks += stats.kicks; >>>>> + sq->stats.xdp_tx += stats.xdp_tx; >>>>> + u64_stats_update_end(&sq->stats.syncp); >>>>> + >>>>> + return done; >>>>> +} >>>>> + >>>>> +static int virtnet_xsk_wakeup(struct net_device *dev, u32 qid, u32 flag) >>>>> +{ >>>>> + struct virtnet_info *vi = netdev_priv(dev); >>>>> + struct xsk_buff_pool *pool; >>>>> + struct netdev_queue *txq; >>>>> + struct send_queue *sq; >>>>> + >>>>> + if (!netif_running(dev)) >>>>> + return -ENETDOWN; >>>>> + >>>>> + if (qid >= vi->curr_queue_pairs) >>>>> + return -EINVAL; >>>>> + >>>>> + sq = &vi->sq[qid]; >>>>> + >>>>> + rcu_read_lock(); >>>>> + >>>>> + pool = rcu_dereference(sq->xsk.pool); >>>>> + if (!pool) >>>>> + goto end; >>>>> + >>>>> + if (napi_if_scheduled_mark_missed(&sq->napi)) >>>>> + goto end; >>>>> + >>>>> + txq = netdev_get_tx_queue(dev, qid); >>>>> + >>>>> + __netif_tx_lock_bh(txq); >>>>> + >>>>> + /* Send part of the packet directly to reduce the delay in sending the >>>>> + * packet, and this can actively trigger the tx interrupts. >>>>> + * >>>>> + * If no packet is sent out, the ring of the device is full. In this >>>>> + * case, we will still get a tx interrupt response. Then we will deal >>>>> + * with the subsequent packet sending work. >>>> So stmmac schedule NAPI here, do you have perf numbers for this improvement? >>> virtnet_xsk_wakeup() is called by sendto(). The purpose is to start consuming >>> the data in the xsk tx queue. The purpose here is to make napi run. >>> >>> When napi is not running, I try to send a certain amount of data: >>> 1. Reduce the delay of the first packet >>> 2. Trigger hardware to generate tx interrupt >> >> I dont' see the code the trigger hardware interrupt and I don't believe >> virtio can do this. >> >> So the point is we need >> >> 1) schedule NAPI when napi_if_scheduled_mark_missed() returns false >> 2) introduce this optimization on top with perf numbers > It is true that virtio net does not directly let the hardware generate a tx > method. So I try to send data here to wait for the tx interrupt notification > from the hardware to recycle the sent packets. > > Is this method not feasible?I think the answer is yes. But you need to benchmark this optimization to make sure it can give us the imporvement as you expected. So I suggest to do something like follow: 1) start with a simple napi schedule here 2) add your optimization on top and benchmark the difference> > Do you mean the performance improvement brought by xsk zerocopy? I should put it > in the patch set cover. Indeed I should put one in the commit log of this patch. > > There is a problem with that direct schedule NAPI. It will wait for NAPI to run > on the current cpu. This will cause an obvious delay. Especially the current > user process may occupy the cpu.Yes, but can you simply do: local_bh_disalbe(); netif_napi_schedule(); local_bh_enable(); ? Thanks> > Thanks. > >> Thanks >> >>> Thanks. >>> >>>> Thanks >>>> >>>> >>>>> + */ >>>>> + virtnet_xsk_run(sq, pool, napi_weight, false); >>>>> + >>>>> + __netif_tx_unlock_bh(txq); >>>>> + >>>>> +end: >>>>> + rcu_read_unlock(); >>>>> + return 0; >>>>> +} >>>>> + >>>>> static int virtnet_xsk_pool_enable(struct net_device *dev, >>>>> struct xsk_buff_pool *pool, >>>>> u16 qid) >>>>> @@ -2559,6 +2844,8 @@ static int virtnet_xsk_pool_enable(struct net_device *dev, >>>>> return -EPERM; >>>>> >>>>> rcu_read_lock(); >>>>> + memset(&sq->xsk, 0, sizeof(sq->xsk)); >>>>> + >>>>> /* Here is already protected by rtnl_lock, so rcu_assign_pointer is >>>>> * safe. >>>>> */ >>>>> @@ -2658,6 +2945,7 @@ static const struct net_device_ops virtnet_netdev = { >>>>> .ndo_vlan_rx_kill_vid = virtnet_vlan_rx_kill_vid, >>>>> .ndo_bpf = virtnet_xdp, >>>>> .ndo_xdp_xmit = virtnet_xdp_xmit, >>>>> + .ndo_xsk_wakeup = virtnet_xsk_wakeup, >>>>> .ndo_features_check = passthru_features_check, >>>>> .ndo_get_phys_port_name = virtnet_get_phys_port_name, >>>>> .ndo_set_features = virtnet_set_features, >>>>> @@ -2761,9 +3049,9 @@ static void free_unused_bufs(struct virtnet_info *vi) >>>>> for (i = 0; i < vi->max_queue_pairs; i++) { >>>>> struct virtqueue *vq = vi->sq[i].vq; >>>>> while ((buf = virtqueue_detach_unused_buf(vq)) != NULL) { >>>>> - if (!is_xdp_frame(buf)) >>>>> + if (is_skb_ptr(buf)) >>>>> dev_kfree_skb(buf); >>>>> - else >>>>> + else if (is_xdp_frame(buf)) >>>>> xdp_return_frame(ptr_to_xdp(buf)); >>>>> } >>>>> }