Krishna Kumar
2011-Nov-11 13:02 UTC
[RFC] [ver3 PATCH 0/6] Implement multiqueue virtio-net
This patch series resurrects the earlier multiple TX/RX queues functionality for virtio_net, and addresses the issues pointed out. It also includes an API to share irq's, f.e. amongst the TX vqs. I plan to run TCP/UDP STREAM and RR tests for local->host and local->remote, and send the results in the next couple of days. patch #1: Introduce VIRTIO_NET_F_MULTIQUEUE patch #2: Move 'num_queues' to virtqueue patch #3: virtio_net driver changes patch #4: vhost_net changes patch #5: Implement find_vqs_irq() patch #6: Convert virtio_net driver to use find_vqs_irq() Changes from rev2: Michael: ------- 1. Added functions to handle setting RX/TX/CTRL vq's. 2. num_queue_pairs instead of numtxqs. 3. Experimental support for fewer irq's in find_vqs. Rusty: ------ 4. Cleaned up some existing "while (1)". 5. rvq/svq and rx_sg/tx_sg changed to vq and sg respectively. 6. Cleaned up some "#if 1" code. Issue when using patch5: ------------------------- The new API is designed to minimize code duplication. E.g. vp_find_vqs() is implemented as: static int vp_find_vqs(...) { return vp_find_vqs_irq(vdev, nvqs, vqs, callbacks, names, NULL); } In my testing, when multiple tx/rx is used with multiple netperf sessions, all the device tx queues stops a few thousand times and subsequently woken up by skb_xmit_done. But after some 40K-50K iterations of stop/wake, some of the txq's stop and no wake interrupt comes. (modprobe -r followed by modprobe solves this, so it is not a system hang). At the time of the hang (#txqs=#rxqs=4): # egrep "CPU|virtio0" /proc/interrupts | grep -v config CPU0 CPU1 CPU2 CPU3 41: 49057 49262 48828 49421 PCI-MSI-edge virtio0-input.0 42: 5066 5213 5221 5109 PCI-MSI-edge virtio0-output.0 43: 43380 43770 43007 43148 PCI-MSI-edge virtio0-input.1 44: 41433 41727 42101 41175 PCI-MSI-edge virtio0-input.2 45: 38465 37629 38468 38768 PCI-MSI-edge virtio0-input.3 # tc -s qdisc show dev eth0 qdisc mq 0: root Sent 393196939897 bytes 271191624 pkt (dropped 59897, overlimits 0 requeues 67156) backlog 25375720b 1601p requeues 67156 I am not sure if patch #5 is responsible for the hang. Also, without patch #5/patch #6, I changed vp_find_vqs() to: static int vp_find_vqs(...) { return vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, false, false); } No packets were getting TX'd with this change when #txqs>1. This is with the MQ-only patch that doesn't touch drivers/virtio/ directory. Also, the MQ patch works reasonably well with 2 vectors - with use_msix=1 and per_vq_vectors=0 in vp_find_vqs(). Patch against net-next - please review. Signed-off-by: krkumar2 at in.ibm.com ---
Krishna Kumar
2011-Nov-11 13:02 UTC
[RFC] [ver3 PATCH 1/6] virtio_net: Introduce VIRTIO_NET_F_MULTIQUEUE
Introduce VIRTIO_NET_F_MULTIQUEUE. Signed-off-by: krkumar2 at in.ibm.com --- include/linux/virtio_net.h | 1 + 1 file changed, 1 insertion(+) diff -ruNp org/include/linux/virtio_net.h new/include/linux/virtio_net.h --- org/include/linux/virtio_net.h 2011-10-12 10:16:46.000000000 +0530 +++ new/include/linux/virtio_net.h 2011-11-11 16:44:34.000000000 +0530 @@ -49,6 +49,7 @@ #define VIRTIO_NET_F_CTRL_RX 18 /* Control channel RX mode support */ #define VIRTIO_NET_F_CTRL_VLAN 19 /* Control channel VLAN filtering */ #define VIRTIO_NET_F_CTRL_RX_EXTRA 20 /* Extra RX mode control support */ +#define VIRTIO_NET_F_MULTIQUEUE 21 /* Device supports multiple TXQ/RXQ */ #define VIRTIO_NET_S_LINK_UP 1 /* Link is up */
Krishna Kumar
2011-Nov-11 13:03 UTC
[RFC] [ver3 PATCH 2/6] virtio: Move 'num_queues' to virtqueue
Move queue_index from "virtio_net_config" to "virtqueue". This is needed to figure out the queue number of the vq in the 'done' handler of the device. Signed-off-by: krkumar2 at in.ibm.com --- drivers/virtio/virtio_pci.c | 10 +++------- include/linux/virtio.h | 1 + 2 files changed, 4 insertions(+), 7 deletions(-) diff -ruNp org/drivers/virtio/virtio_pci.c new/drivers/virtio/virtio_pci.c --- org/drivers/virtio/virtio_pci.c 2011-11-11 16:44:30.000000000 +0530 +++ new/drivers/virtio/virtio_pci.c 2011-11-11 16:44:45.000000000 +0530 @@ -75,9 +75,6 @@ struct virtio_pci_vq_info /* the number of entries in the queue */ int num; - /* the index of the queue */ - int queue_index; - /* the virtual address of the ring queue */ void *queue; @@ -180,11 +177,10 @@ static void vp_reset(struct virtio_devic static void vp_notify(struct virtqueue *vq) { struct virtio_pci_device *vp_dev = to_vp_device(vq->vdev); - struct virtio_pci_vq_info *info = vq->priv; /* we write the queue's selector into the notification register to * signal the other end */ - iowrite16(info->queue_index, vp_dev->ioaddr + VIRTIO_PCI_QUEUE_NOTIFY); + iowrite16(vq->queue_index, vp_dev->ioaddr + VIRTIO_PCI_QUEUE_NOTIFY); } /* Handle a configuration change: Tell driver if it wants to know. */ @@ -380,7 +376,6 @@ static struct virtqueue *setup_vq(struct if (!info) return ERR_PTR(-ENOMEM); - info->queue_index = index; info->num = num; info->msix_vector = msix_vec; @@ -403,6 +398,7 @@ static struct virtqueue *setup_vq(struct goto out_activate_queue; } + vq->queue_index = index; vq->priv = info; info->vq = vq; @@ -445,7 +441,7 @@ static void vp_del_vq(struct virtqueue * list_del(&info->node); spin_unlock_irqrestore(&vp_dev->lock, flags); - iowrite16(info->queue_index, vp_dev->ioaddr + VIRTIO_PCI_QUEUE_SEL); + iowrite16(vq->queue_index, vp_dev->ioaddr + VIRTIO_PCI_QUEUE_SEL); if (vp_dev->msix_enabled) { iowrite16(VIRTIO_MSI_NO_VECTOR, diff -ruNp org/include/linux/virtio.h new/include/linux/virtio.h --- org/include/linux/virtio.h 2011-11-11 16:44:30.000000000 +0530 +++ new/include/linux/virtio.h 2011-11-11 16:44:45.000000000 +0530 @@ -22,6 +22,7 @@ struct virtqueue { void (*callback)(struct virtqueue *vq); const char *name; struct virtio_device *vdev; + int queue_index; /* the index of the queue */ void *priv; };
Krishna Kumar
2011-Nov-11 13:04 UTC
[RFC] [ver3 PATCH 3/6] virtio_net: virtio_net driver changes
Changes for multiqueue virtio_net driver. Signed-off-by: krkumar2 at in.ibm.com --- drivers/net/virtio_net.c | 688 ++++++++++++++++++++++++----------- include/linux/virtio_net.h | 2 2 files changed, 481 insertions(+), 209 deletions(-) diff -ruNp org/drivers/net/virtio_net.c new/drivers/net/virtio_net.c --- org/drivers/net/virtio_net.c 2011-11-11 16:44:38.000000000 +0530 +++ new/drivers/net/virtio_net.c 2011-11-11 16:44:59.000000000 +0530 @@ -40,33 +40,42 @@ module_param(gso, bool, 0444); #define VIRTNET_SEND_COMMAND_SG_MAX 2 -struct virtnet_stats { +struct virtnet_send_stats { struct u64_stats_sync syncp; u64 tx_bytes; u64 tx_packets; +}; +struct virtnet_recv_stats { + struct u64_stats_sync syncp; u64 rx_bytes; u64 rx_packets; }; -struct virtnet_info { - struct virtio_device *vdev; - struct virtqueue *rvq, *svq, *cvq; - struct net_device *dev; - struct napi_struct napi; - unsigned int status; +/* Internal representation of a send virtqueue */ +struct send_queue { + /* Virtqueue associated with this send _queue */ + struct virtqueue *vq; - /* Number of input buffers, and max we've ever had. */ - unsigned int num, max; + /* TX: fragments + linear part + virtio header */ + struct scatterlist sg[MAX_SKB_FRAGS + 2]; - /* I like... big packets and I cannot lie! */ - bool big_packets; + /* Active tx statistics */ + struct virtnet_send_stats __percpu *stats; +}; - /* Host will merge rx buffers for big packets (shake it! shake it!) */ - bool mergeable_rx_bufs; +/* Internal representation of a receive virtqueue */ +struct receive_queue { + /* Virtqueue associated with this receive_queue */ + struct virtqueue *vq; + + /* Back pointer to the virtnet_info */ + struct virtnet_info *vi; - /* Active statistics */ - struct virtnet_stats __percpu *stats; + struct napi_struct napi; + + /* Number of input buffers, and max we've ever had. */ + unsigned int num, max; /* Work struct for refilling if we run low on memory. */ struct delayed_work refill; @@ -74,9 +83,29 @@ struct virtnet_info { /* Chain pages by the private ptr. */ struct page *pages; - /* fragments + linear part + virtio header */ - struct scatterlist rx_sg[MAX_SKB_FRAGS + 2]; - struct scatterlist tx_sg[MAX_SKB_FRAGS + 2]; + /* RX: fragments + linear part + virtio header */ + struct scatterlist sg[MAX_SKB_FRAGS + 2]; + + /* Active rx statistics */ + struct virtnet_recv_stats __percpu *stats; +}; + +struct virtnet_info { + int num_queue_pairs; /* # of RX/TX vq pairs */ + + struct send_queue **sq; + struct receive_queue **rq; + struct virtqueue *cvq; + + struct virtio_device *vdev; + struct net_device *dev; + unsigned int status; + + /* I like... big packets and I cannot lie! */ + bool big_packets; + + /* Host will merge rx buffers for big packets (shake it! shake it!) */ + bool mergeable_rx_bufs; }; struct skb_vnet_hdr { @@ -106,22 +135,22 @@ static inline struct skb_vnet_hdr *skb_v * private is used to chain pages for big packets, put the whole * most recent used list in the beginning for reuse */ -static void give_pages(struct virtnet_info *vi, struct page *page) +static void give_pages(struct receive_queue *rq, struct page *page) { struct page *end; /* Find end of list, sew whole thing into vi->pages. */ for (end = page; end->private; end = (struct page *)end->private); - end->private = (unsigned long)vi->pages; - vi->pages = page; + end->private = (unsigned long)rq->pages; + rq->pages = page; } -static struct page *get_a_page(struct virtnet_info *vi, gfp_t gfp_mask) +static struct page *get_a_page(struct receive_queue *rq, gfp_t gfp_mask) { - struct page *p = vi->pages; + struct page *p = rq->pages; if (p) { - vi->pages = (struct page *)p->private; + rq->pages = (struct page *)p->private; /* clear private here, it is used to chain pages */ p->private = 0; } else @@ -129,15 +158,16 @@ static struct page *get_a_page(struct vi return p; } -static void skb_xmit_done(struct virtqueue *svq) +static void skb_xmit_done(struct virtqueue *vq) { - struct virtnet_info *vi = svq->vdev->priv; + struct virtnet_info *vi = vq->vdev->priv; + int qnum = vq->queue_index / 2; /* RX/TX vqs are allocated in pairs */ /* Suppress further interrupts. */ - virtqueue_disable_cb(svq); + virtqueue_disable_cb(vq); /* We were probably waiting for more output buffers. */ - netif_wake_queue(vi->dev); + netif_wake_subqueue(vi->dev, qnum); } static void set_skb_frag(struct sk_buff *skb, struct page *page, @@ -155,9 +185,10 @@ static void set_skb_frag(struct sk_buff *len -= size; } -static struct sk_buff *page_to_skb(struct virtnet_info *vi, +static struct sk_buff *page_to_skb(struct receive_queue *rq, struct page *page, unsigned int len) { + struct virtnet_info *vi = rq->vi; struct sk_buff *skb; struct skb_vnet_hdr *hdr; unsigned int copy, hdr_len, offset; @@ -213,12 +244,12 @@ static struct sk_buff *page_to_skb(struc } if (page) - give_pages(vi, page); + give_pages(rq, page); return skb; } -static int receive_mergeable(struct virtnet_info *vi, struct sk_buff *skb) +static int receive_mergeable(struct receive_queue *rq, struct sk_buff *skb) { struct skb_vnet_hdr *hdr = skb_vnet_hdr(skb); struct page *page; @@ -232,7 +263,7 @@ static int receive_mergeable(struct virt skb->dev->stats.rx_length_errors++; return -EINVAL; } - page = virtqueue_get_buf(vi->rvq, &len); + page = virtqueue_get_buf(rq->vq, &len); if (!page) { pr_debug("%s: rx error: %d buffers missing\n", skb->dev->name, hdr->mhdr.num_buffers); @@ -245,15 +276,16 @@ static int receive_mergeable(struct virt set_skb_frag(skb, page, 0, &len); - --vi->num; + --rq->num; } return 0; } -static void receive_buf(struct net_device *dev, void *buf, unsigned int len) +static void receive_buf(struct receive_queue *rq, void *buf, unsigned int len) { + struct net_device *dev = rq->vi->dev; struct virtnet_info *vi = netdev_priv(dev); - struct virtnet_stats __percpu *stats = this_cpu_ptr(vi->stats); + struct virtnet_recv_stats __percpu *stats = this_cpu_ptr(rq->stats); struct sk_buff *skb; struct page *page; struct skb_vnet_hdr *hdr; @@ -262,7 +294,7 @@ static void receive_buf(struct net_devic pr_debug("%s: short packet %i\n", dev->name, len); dev->stats.rx_length_errors++; if (vi->mergeable_rx_bufs || vi->big_packets) - give_pages(vi, buf); + give_pages(rq, buf); else dev_kfree_skb(buf); return; @@ -274,14 +306,14 @@ static void receive_buf(struct net_devic skb_trim(skb, len); } else { page = buf; - skb = page_to_skb(vi, page, len); + skb = page_to_skb(rq, page, len); if (unlikely(!skb)) { dev->stats.rx_dropped++; - give_pages(vi, page); + give_pages(rq, page); return; } if (vi->mergeable_rx_bufs) - if (receive_mergeable(vi, skb)) { + if (receive_mergeable(rq, skb)) { dev_kfree_skb(skb); return; } @@ -351,184 +383,200 @@ frame_err: dev_kfree_skb(skb); } -static int add_recvbuf_small(struct virtnet_info *vi, gfp_t gfp) +static int add_recvbuf_small(struct receive_queue *rq, gfp_t gfp) { struct sk_buff *skb; struct skb_vnet_hdr *hdr; int err; - skb = netdev_alloc_skb_ip_align(vi->dev, MAX_PACKET_LEN); + skb = netdev_alloc_skb_ip_align(rq->vi->dev, MAX_PACKET_LEN); if (unlikely(!skb)) return -ENOMEM; skb_put(skb, MAX_PACKET_LEN); hdr = skb_vnet_hdr(skb); - sg_set_buf(vi->rx_sg, &hdr->hdr, sizeof hdr->hdr); + sg_set_buf(rq->sg, &hdr->hdr, sizeof hdr->hdr); - skb_to_sgvec(skb, vi->rx_sg + 1, 0, skb->len); + skb_to_sgvec(skb, rq->sg + 1, 0, skb->len); - err = virtqueue_add_buf_gfp(vi->rvq, vi->rx_sg, 0, 2, skb, gfp); + err = virtqueue_add_buf_gfp(rq->vq, rq->sg, 0, 2, skb, gfp); if (err < 0) dev_kfree_skb(skb); return err; } -static int add_recvbuf_big(struct virtnet_info *vi, gfp_t gfp) +static int add_recvbuf_big(struct receive_queue *rq, gfp_t gfp) { struct page *first, *list = NULL; char *p; int i, err, offset; - /* page in vi->rx_sg[MAX_SKB_FRAGS + 1] is list tail */ + /* page in rq->sg[MAX_SKB_FRAGS + 1] is list tail */ for (i = MAX_SKB_FRAGS + 1; i > 1; --i) { - first = get_a_page(vi, gfp); + first = get_a_page(rq, gfp); if (!first) { if (list) - give_pages(vi, list); + give_pages(rq, list); return -ENOMEM; } - sg_set_buf(&vi->rx_sg[i], page_address(first), PAGE_SIZE); + sg_set_buf(&rq->sg[i], page_address(first), PAGE_SIZE); /* chain new page in list head to match sg */ first->private = (unsigned long)list; list = first; } - first = get_a_page(vi, gfp); + first = get_a_page(rq, gfp); if (!first) { - give_pages(vi, list); + give_pages(rq, list); return -ENOMEM; } p = page_address(first); - /* vi->rx_sg[0], vi->rx_sg[1] share the same page */ - /* a separated vi->rx_sg[0] for virtio_net_hdr only due to QEMU bug */ - sg_set_buf(&vi->rx_sg[0], p, sizeof(struct virtio_net_hdr)); + /* rq->sg[0], rq->sg[1] share the same page */ + /* a separated rq->sg[0] for virtio_net_hdr only due to QEMU bug */ + sg_set_buf(&rq->sg[0], p, sizeof(struct virtio_net_hdr)); - /* vi->rx_sg[1] for data packet, from offset */ + /* rq->sg[1] for data packet, from offset */ offset = sizeof(struct padded_vnet_hdr); - sg_set_buf(&vi->rx_sg[1], p + offset, PAGE_SIZE - offset); + sg_set_buf(&rq->sg[1], p + offset, PAGE_SIZE - offset); /* chain first in list head */ first->private = (unsigned long)list; - err = virtqueue_add_buf_gfp(vi->rvq, vi->rx_sg, 0, MAX_SKB_FRAGS + 2, + err = virtqueue_add_buf_gfp(rq->vq, rq->sg, 0, MAX_SKB_FRAGS + 2, first, gfp); if (err < 0) - give_pages(vi, first); + give_pages(rq, first); return err; } -static int add_recvbuf_mergeable(struct virtnet_info *vi, gfp_t gfp) +static int add_recvbuf_mergeable(struct receive_queue *rq, gfp_t gfp) { struct page *page; int err; - page = get_a_page(vi, gfp); + page = get_a_page(rq, gfp); if (!page) return -ENOMEM; - sg_init_one(vi->rx_sg, page_address(page), PAGE_SIZE); + sg_init_one(rq->sg, page_address(page), PAGE_SIZE); - err = virtqueue_add_buf_gfp(vi->rvq, vi->rx_sg, 0, 1, page, gfp); + err = virtqueue_add_buf_gfp(rq->vq, rq->sg, 0, 1, page, gfp); if (err < 0) - give_pages(vi, page); + give_pages(rq, page); return err; } /* Returns false if we couldn't fill entirely (OOM). */ -static bool try_fill_recv(struct virtnet_info *vi, gfp_t gfp) +static bool try_fill_recv(struct receive_queue *rq, gfp_t gfp) { + struct virtnet_info *vi = rq->vi; int err; bool oom; do { if (vi->mergeable_rx_bufs) - err = add_recvbuf_mergeable(vi, gfp); + err = add_recvbuf_mergeable(rq, gfp); else if (vi->big_packets) - err = add_recvbuf_big(vi, gfp); + err = add_recvbuf_big(rq, gfp); else - err = add_recvbuf_small(vi, gfp); + err = add_recvbuf_small(rq, gfp); oom = err == -ENOMEM; if (err < 0) break; - ++vi->num; + ++rq->num; } while (err > 0); - if (unlikely(vi->num > vi->max)) - vi->max = vi->num; - virtqueue_kick(vi->rvq); + if (unlikely(rq->num > rq->max)) + rq->max = rq->num; + virtqueue_kick(rq->vq); return !oom; } -static void skb_recv_done(struct virtqueue *rvq) +static void skb_recv_done(struct virtqueue *vq) { - struct virtnet_info *vi = rvq->vdev->priv; + int qnum = vq->queue_index / 2; /* RX/TX vqs are allocated in pairs */ + struct virtnet_info *vi = vq->vdev->priv; + struct napi_struct *napi = &vi->rq[qnum]->napi; + /* Schedule NAPI, Suppress further interrupts if successful. */ - if (napi_schedule_prep(&vi->napi)) { - virtqueue_disable_cb(rvq); - __napi_schedule(&vi->napi); + if (napi_schedule_prep(napi)) { + virtqueue_disable_cb(vq); + __napi_schedule(napi); } } -static void virtnet_napi_enable(struct virtnet_info *vi) +static void virtnet_napi_enable(struct receive_queue *rq) { - napi_enable(&vi->napi); + napi_enable(&rq->napi); /* If all buffers were filled by other side before we napi_enabled, we * won't get another interrupt, so process any outstanding packets * now. virtnet_poll wants re-enable the queue, so we disable here. * We synchronize against interrupts via NAPI_STATE_SCHED */ - if (napi_schedule_prep(&vi->napi)) { - virtqueue_disable_cb(vi->rvq); - __napi_schedule(&vi->napi); + if (napi_schedule_prep(&rq->napi)) { + virtqueue_disable_cb(rq->vq); + __napi_schedule(&rq->napi); } } +static void virtnet_napi_enable_all_queues(struct virtnet_info *vi) +{ + int i; + + for (i = 0; i < vi->num_queue_pairs; i++) + virtnet_napi_enable(vi->rq[i]); +} + static void refill_work(struct work_struct *work) { - struct virtnet_info *vi; + struct napi_struct *napi; + struct receive_queue *rq; bool still_empty; - vi = container_of(work, struct virtnet_info, refill.work); - napi_disable(&vi->napi); - still_empty = !try_fill_recv(vi, GFP_KERNEL); - virtnet_napi_enable(vi); + rq = container_of(work, struct receive_queue, refill.work); + napi = &rq->napi; + + napi_disable(napi); + still_empty = !try_fill_recv(rq, GFP_KERNEL); + virtnet_napi_enable(rq); /* In theory, this can happen: if we don't get any buffers in * we will *never* try to fill again. */ if (still_empty) - schedule_delayed_work(&vi->refill, HZ/2); + schedule_delayed_work(&rq->refill, HZ/2); } static int virtnet_poll(struct napi_struct *napi, int budget) { - struct virtnet_info *vi = container_of(napi, struct virtnet_info, napi); + struct receive_queue *rq = container_of(napi, struct receive_queue, + napi); void *buf; unsigned int len, received = 0; again: while (received < budget && - (buf = virtqueue_get_buf(vi->rvq, &len)) != NULL) { - receive_buf(vi->dev, buf, len); - --vi->num; + (buf = virtqueue_get_buf(rq->vq, &len)) != NULL) { + receive_buf(rq, buf, len); + --rq->num; received++; } - if (vi->num < vi->max / 2) { - if (!try_fill_recv(vi, GFP_ATOMIC)) - schedule_delayed_work(&vi->refill, 0); + if (rq->num < rq->max / 2) { + if (!try_fill_recv(rq, GFP_ATOMIC)) + schedule_delayed_work(&rq->refill, 0); } /* Out of packets? */ if (received < budget) { napi_complete(napi); - if (unlikely(!virtqueue_enable_cb(vi->rvq)) && + if (unlikely(!virtqueue_enable_cb(rq->vq)) && napi_schedule_prep(napi)) { - virtqueue_disable_cb(vi->rvq); + virtqueue_disable_cb(rq->vq); __napi_schedule(napi); goto again; } @@ -537,13 +585,14 @@ again: return received; } -static unsigned int free_old_xmit_skbs(struct virtnet_info *vi) +static unsigned int free_old_xmit_skbs(struct send_queue *sq, + struct virtqueue *vq) { struct sk_buff *skb; unsigned int len, tot_sgs = 0; - struct virtnet_stats __percpu *stats = this_cpu_ptr(vi->stats); + struct virtnet_send_stats __percpu *stats = this_cpu_ptr(sq->stats); - while ((skb = virtqueue_get_buf(vi->svq, &len)) != NULL) { + while ((skb = virtqueue_get_buf(vq, &len)) != NULL) { pr_debug("Sent skb %p\n", skb); u64_stats_update_begin(&stats->syncp); @@ -557,7 +606,8 @@ static unsigned int free_old_xmit_skbs(s return tot_sgs; } -static int xmit_skb(struct virtnet_info *vi, struct sk_buff *skb) +static int xmit_skb(struct virtnet_info *vi, struct sk_buff *skb, + struct virtqueue *vq, struct scatterlist *sg) { struct skb_vnet_hdr *hdr = skb_vnet_hdr(skb); const unsigned char *dest = ((struct ethhdr *)skb->data)->h_dest; @@ -595,44 +645,47 @@ static int xmit_skb(struct virtnet_info /* Encode metadata header at front. */ if (vi->mergeable_rx_bufs) - sg_set_buf(vi->tx_sg, &hdr->mhdr, sizeof hdr->mhdr); + sg_set_buf(sg, &hdr->mhdr, sizeof hdr->mhdr); else - sg_set_buf(vi->tx_sg, &hdr->hdr, sizeof hdr->hdr); + sg_set_buf(sg, &hdr->hdr, sizeof hdr->hdr); - hdr->num_sg = skb_to_sgvec(skb, vi->tx_sg + 1, 0, skb->len) + 1; - return virtqueue_add_buf(vi->svq, vi->tx_sg, hdr->num_sg, + hdr->num_sg = skb_to_sgvec(skb, sg + 1, 0, skb->len) + 1; + return virtqueue_add_buf(vq, sg, hdr->num_sg, 0, skb); } static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev) { struct virtnet_info *vi = netdev_priv(dev); + int qnum = skb_get_queue_mapping(skb); + struct virtqueue *vq = vi->sq[qnum]->vq; int capacity; /* Free up any pending old buffers before queueing new ones. */ - free_old_xmit_skbs(vi); + free_old_xmit_skbs(vi->sq[qnum], vq); /* Try to transmit */ - capacity = xmit_skb(vi, skb); + capacity = xmit_skb(vi, skb, vq, vi->sq[qnum]->sg); /* This can happen with OOM and indirect buffers. */ if (unlikely(capacity < 0)) { if (net_ratelimit()) { if (likely(capacity == -ENOMEM)) { dev_warn(&dev->dev, - "TX queue failure: out of memory\n"); + "TXQ (%d) failure: out of memory\n", + qnum); } else { dev->stats.tx_fifo_errors++; dev_warn(&dev->dev, - "Unexpected TX queue failure: %d\n", - capacity); + "Unexpected TXQ (%d) failure: %d\n", + qnum, capacity); } } dev->stats.tx_dropped++; kfree_skb(skb); return NETDEV_TX_OK; } - virtqueue_kick(vi->svq); + virtqueue_kick(vq); /* Don't wait up for transmitted skbs to be freed. */ skb_orphan(skb); @@ -641,13 +694,13 @@ static netdev_tx_t start_xmit(struct sk_ /* Apparently nice girls don't return TX_BUSY; stop the queue * before it gets out of hand. Naturally, this wastes entries. */ if (capacity < 2+MAX_SKB_FRAGS) { - netif_stop_queue(dev); - if (unlikely(!virtqueue_enable_cb_delayed(vi->svq))) { + netif_stop_subqueue(dev, qnum); + if (unlikely(!virtqueue_enable_cb_delayed(vq))) { /* More just got used, free them then recheck. */ - capacity += free_old_xmit_skbs(vi); + capacity += free_old_xmit_skbs(vi->sq[qnum], vq); if (capacity >= 2+MAX_SKB_FRAGS) { - netif_start_queue(dev); - virtqueue_disable_cb(vi->svq); + netif_start_subqueue(dev, qnum); + virtqueue_disable_cb(vq); } } } @@ -677,25 +730,35 @@ static struct rtnl_link_stats64 *virtnet { struct virtnet_info *vi = netdev_priv(dev); int cpu; - unsigned int start; for_each_possible_cpu(cpu) { - struct virtnet_stats __percpu *stats - = per_cpu_ptr(vi->stats, cpu); - u64 tpackets, tbytes, rpackets, rbytes; - - do { - start = u64_stats_fetch_begin(&stats->syncp); - tpackets = stats->tx_packets; - tbytes = stats->tx_bytes; - rpackets = stats->rx_packets; - rbytes = stats->rx_bytes; - } while (u64_stats_fetch_retry(&stats->syncp, start)); - - tot->rx_packets += rpackets; - tot->tx_packets += tpackets; - tot->rx_bytes += rbytes; - tot->tx_bytes += tbytes; + int qpair; + + for (qpair = 0; qpair < vi->num_queue_pairs; qpair++) { + struct virtnet_send_stats __percpu *tx_stat; + struct virtnet_recv_stats __percpu *rx_stat; + u64 tpackets, tbytes, rpackets, rbytes; + unsigned int start; + + tx_stat = per_cpu_ptr(vi->sq[qpair]->stats, cpu); + do { + start = u64_stats_fetch_begin(&tx_stat->syncp); + tpackets = tx_stat->tx_packets; + tbytes = tx_stat->tx_bytes; + } while (u64_stats_fetch_retry(&tx_stat->syncp, start)); + + rx_stat = per_cpu_ptr(vi->rq[qpair]->stats, cpu); + do { + start = u64_stats_fetch_begin(&rx_stat->syncp); + rpackets = rx_stat->rx_packets; + rbytes = rx_stat->rx_bytes; + } while (u64_stats_fetch_retry(&rx_stat->syncp, start)); + + tot->rx_packets += rpackets; + tot->tx_packets += tpackets; + tot->rx_bytes += rbytes; + tot->tx_bytes += tbytes; + } } tot->tx_dropped = dev->stats.tx_dropped; @@ -710,16 +773,35 @@ static struct rtnl_link_stats64 *virtnet static void virtnet_netpoll(struct net_device *dev) { struct virtnet_info *vi = netdev_priv(dev); + int i; - napi_schedule(&vi->napi); + for (i = 0; i < vi->num_queue_pairs; i++) + napi_schedule(&vi->rq[i]->napi); } #endif +static void free_stats(struct virtnet_info *vi) +{ + int i; + + for (i = 0; i < vi->num_queue_pairs; i++) { + if (vi->sq && vi->sq[i]) { + free_percpu(vi->sq[i]->stats); + vi->sq[i]->stats = NULL; + } + + if (vi->rq && vi->rq[i]) { + free_percpu(vi->rq[i]->stats); + vi->rq[i]->stats = NULL; + } + } +} + static int virtnet_open(struct net_device *dev) { struct virtnet_info *vi = netdev_priv(dev); - virtnet_napi_enable(vi); + virtnet_napi_enable_all_queues(vi); return 0; } @@ -771,8 +853,10 @@ static bool virtnet_send_command(struct static int virtnet_close(struct net_device *dev) { struct virtnet_info *vi = netdev_priv(dev); + int i; - napi_disable(&vi->napi); + for (i = 0; i < vi->num_queue_pairs; i++) + napi_disable(&vi->rq[i]->napi); return 0; } @@ -882,11 +966,10 @@ static void virtnet_get_ringparam(struct { struct virtnet_info *vi = netdev_priv(dev); - ring->rx_max_pending = virtqueue_get_vring_size(vi->rvq); - ring->tx_max_pending = virtqueue_get_vring_size(vi->svq); + ring->rx_max_pending = virtqueue_get_vring_size(vi->rq[0]->vq); + ring->tx_max_pending = virtqueue_get_vring_size(vi->sq[0]->vq); ring->rx_pending = ring->rx_max_pending; ring->tx_pending = ring->tx_max_pending; - } static const struct ethtool_ops virtnet_ethtool_ops = { @@ -940,10 +1023,10 @@ static void virtnet_update_status(struct if (vi->status & VIRTIO_NET_S_LINK_UP) { netif_carrier_on(vi->dev); - netif_wake_queue(vi->dev); + netif_tx_wake_all_queues(vi->dev); } else { netif_carrier_off(vi->dev); - netif_stop_queue(vi->dev); + netif_tx_stop_all_queues(vi->dev); } } @@ -954,18 +1037,232 @@ static void virtnet_config_changed(struc virtnet_update_status(vi); } +static void free_receive_bufs(struct virtnet_info *vi) +{ + int i; + + for (i = 0; i < vi->num_queue_pairs; i++) { + while (vi->rq[i]->pages) + __free_pages(get_a_page(vi->rq[i], GFP_KERNEL), 0); + } +} + +/* Free memory allocated for send and receive queues */ +static void free_rq_sq(struct virtnet_info *vi) +{ + int i; + + free_stats(vi); + + if (vi->rq) { + for (i = 0; i < vi->num_queue_pairs; i++) + kfree(vi->rq[i]); + kfree(vi->rq); + } + + if (vi->sq) { + for (i = 0; i < vi->num_queue_pairs; i++) + kfree(vi->sq[i]); + kfree(vi->sq); + } +} + +static void free_unused_bufs(struct virtnet_info *vi) +{ + void *buf; + int i; + + for (i = 0; i < vi->num_queue_pairs; i++) { + struct virtqueue *vq = vi->sq[i]->vq; + + while ((buf = virtqueue_detach_unused_buf(vq)) != NULL) + dev_kfree_skb(buf); + } + + for (i = 0; i < vi->num_queue_pairs; i++) { + struct virtqueue *vq = vi->rq[i]->vq; + + while ((buf = virtqueue_detach_unused_buf(vq)) != NULL) { + if (vi->mergeable_rx_bufs || vi->big_packets) + give_pages(vi->rq[i], buf); + else + dev_kfree_skb(buf); + --vi->rq[i]->num; + } + BUG_ON(vi->rq[i]->num != 0); + } +} + +static void setup_rx_vqs(struct virtnet_info *vi, struct virtqueue **vqs, + int total_vqs) +{ + int i; + + for (i = 0; i < total_vqs; i += 2) + vi->rq[i/2]->vq = vqs[i]; +} + +static void setup_tx_vqs(struct virtnet_info *vi, struct virtqueue **vqs, + int total_vqs) +{ + int i; + + for (i = 0; i < total_vqs; i += 2) + vi->sq[i/2]->vq = vqs[i + 1]; +} + +static void setup_cvq(struct virtnet_info *vi, struct virtqueue **vqs, + int index) +{ + if (virtio_has_feature(vi->vdev, VIRTIO_NET_F_CTRL_VQ)) + vi->cvq = vqs[index]; +} + +static int invoke_find_vqs(struct virtnet_info *vi) +{ + vq_callback_t **callbacks; + struct virtqueue **vqs; + int ret = -ENOMEM; + int i, total_vqs; + char **names; + + /* + * We expect 1 RX virtqueue followed by 1 TX virtqueue, followed + * by the same 'vi->num_queue_pairs-1' more times, and optionally + * one control virtqueue. + */ + total_vqs = vi->num_queue_pairs * 2 + + virtio_has_feature(vi->vdev, VIRTIO_NET_F_CTRL_VQ); + + /* Allocate space for find_vqs parameters */ + vqs = kmalloc(total_vqs * sizeof(*vqs), GFP_KERNEL); + callbacks = kmalloc(total_vqs * sizeof(*callbacks), GFP_KERNEL); + names = kmalloc(total_vqs * sizeof(*names), GFP_KERNEL); + if (!vqs || !callbacks || !names) + goto err; + + /* Allocate/initialize parameters for recv virtqueues */ + for (i = 0; i < vi->num_queue_pairs * 2; i += 2) { + callbacks[i] = skb_recv_done; + names[i] = kasprintf(GFP_KERNEL, "input.%d", i / 2); + if (!names[i]) + goto err; + } + + /* Allocate/initialize parameters for send virtqueues */ + for (i = 1; i < vi->num_queue_pairs * 2; i += 2) { + callbacks[i] = skb_xmit_done; + names[i] = kasprintf(GFP_KERNEL, "output.%d", i / 2); + if (!names[i]) + goto err; + } + + /* Parameters for control virtqueue, if any */ + if (virtio_has_feature(vi->vdev, VIRTIO_NET_F_CTRL_VQ)) { + callbacks[i - 1] = NULL; + names[i - 1] = "control"; + } + + ret = vi->vdev->config->find_vqs(vi->vdev, total_vqs, vqs, callbacks, + (const char **)names); + + if (ret) + goto err; + + setup_rx_vqs(vi, vqs, vi->num_queue_pairs * 2); + setup_tx_vqs(vi, vqs, vi->num_queue_pairs * 2); + setup_cvq(vi, vqs, vi->num_queue_pairs * 2); + +err: + if (ret && names) + for (i = 0; i < vi->num_queue_pairs * 2; i++) + kfree(names[i]); + + kfree(names); + kfree(callbacks); + kfree(vqs); + + return ret; +} + +static int allocate_queues(struct virtnet_info *vi) +{ + int ret = -ENOMEM; + int i; + + vi->rq = kcalloc(vi->num_queue_pairs, sizeof(*vi->rq), GFP_KERNEL); + vi->sq = kcalloc(vi->num_queue_pairs, sizeof(*vi->sq), GFP_KERNEL); + if (!vi->sq || !vi->rq) + goto err; + + for (i = 0; i < vi->num_queue_pairs; i++) { + vi->rq[i] = kzalloc(sizeof(*vi->rq[i]), GFP_KERNEL); + vi->sq[i] = kzalloc(sizeof(*vi->sq[i]), GFP_KERNEL); + if (!vi->rq[i] || !vi->sq[i]) + goto err; + + vi->rq[i]->stats = alloc_percpu(struct virtnet_recv_stats); + vi->sq[i]->stats = alloc_percpu(struct virtnet_send_stats); + if (!vi->rq[i]->stats || !vi->sq[i]->stats) + goto err; + } + + ret = 0; + + /* setup initial receive and send queue parameters */ + for (i = 0; i < vi->num_queue_pairs; i++) { + vi->rq[i]->vi = vi; + vi->rq[i]->pages = NULL; + INIT_DELAYED_WORK(&vi->rq[i]->refill, refill_work); + netif_napi_add(vi->dev, &vi->rq[i]->napi, virtnet_poll, + napi_weight); + + sg_init_table(vi->rq[i]->sg, ARRAY_SIZE(vi->rq[i]->sg)); + sg_init_table(vi->sq[i]->sg, ARRAY_SIZE(vi->sq[i]->sg)); + } + +err: + if (ret) + free_rq_sq(vi); + + return ret; +} + +static int virtnet_setup_vqs(struct virtnet_info *vi) +{ + int ret; + + /* Allocate send & receive queues */ + ret = allocate_queues(vi); + if (!ret) { + ret = invoke_find_vqs(vi); + if (ret) + free_rq_sq(vi); + } + + return ret; +} + static int virtnet_probe(struct virtio_device *vdev) { - int err; + int i, err; struct net_device *dev; struct virtnet_info *vi; - struct virtqueue *vqs[3]; - vq_callback_t *callbacks[] = { skb_recv_done, skb_xmit_done, NULL}; - const char *names[] = { "input", "output", "control" }; - int nvqs; + u16 num_queues, num_queue_pairs; + + /* Find if host supports multiqueue virtio_net device */ + err = virtio_config_val(vdev, VIRTIO_NET_F_MULTIQUEUE, + offsetof(struct virtio_net_config, + num_queues), &num_queues); + + /* We need atleast 2 queue's */ + if (err || num_queues < 2) + num_queues = 2; + + num_queue_pairs = num_queues / 2; /* Allocate ourselves a network device with room for our info */ - dev = alloc_etherdev(sizeof(struct virtnet_info)); + dev = alloc_etherdev_mq(sizeof(struct virtnet_info), num_queue_pairs); if (!dev) return -ENOMEM; @@ -1011,19 +1308,10 @@ static int virtnet_probe(struct virtio_d /* Set up our device-specific information */ vi = netdev_priv(dev); - netif_napi_add(dev, &vi->napi, virtnet_poll, napi_weight); vi->dev = dev; vi->vdev = vdev; vdev->priv = vi; - vi->pages = NULL; - vi->stats = alloc_percpu(struct virtnet_stats); - err = -ENOMEM; - if (vi->stats == NULL) - goto free; - - INIT_DELAYED_WORK(&vi->refill, refill_work); - sg_init_table(vi->rx_sg, ARRAY_SIZE(vi->rx_sg)); - sg_init_table(vi->tx_sg, ARRAY_SIZE(vi->tx_sg)); + vi->num_queue_pairs = num_queue_pairs; /* If we can receive ANY GSO packets, we must allocate large ones. */ if (virtio_has_feature(vdev, VIRTIO_NET_F_GUEST_TSO4) || @@ -1034,23 +1322,14 @@ static int virtnet_probe(struct virtio_d if (virtio_has_feature(vdev, VIRTIO_NET_F_MRG_RXBUF)) vi->mergeable_rx_bufs = true; - /* We expect two virtqueues, receive then send, - * and optionally control. */ - nvqs = virtio_has_feature(vi->vdev, VIRTIO_NET_F_CTRL_VQ) ? 3 : 2; - - err = vdev->config->find_vqs(vdev, nvqs, vqs, callbacks, names); + /* Allocate/initialize the rx/tx queues, and invoke find_vqs */ + err = virtnet_setup_vqs(vi); if (err) - goto free_stats; - - vi->rvq = vqs[0]; - vi->svq = vqs[1]; + goto free_netdev; - if (virtio_has_feature(vi->vdev, VIRTIO_NET_F_CTRL_VQ)) { - vi->cvq = vqs[2]; - - if (virtio_has_feature(vi->vdev, VIRTIO_NET_F_CTRL_VLAN)) - dev->features |= NETIF_F_HW_VLAN_FILTER; - } + if (virtio_has_feature(vi->vdev, VIRTIO_NET_F_CTRL_VQ) && + virtio_has_feature(vi->vdev, VIRTIO_NET_F_CTRL_VLAN)) + dev->features |= NETIF_F_HW_VLAN_FILTER; err = register_netdev(dev); if (err) { @@ -1059,12 +1338,15 @@ static int virtnet_probe(struct virtio_d } /* Last of all, set up some receive buffers. */ - try_fill_recv(vi, GFP_KERNEL); + for (i = 0; i < num_queue_pairs; i++) { + try_fill_recv(vi->rq[i], GFP_KERNEL); - /* If we didn't even get one input buffer, we're useless. */ - if (vi->num == 0) { - err = -ENOMEM; - goto unregister; + /* If we didn't even get one input buffer, we're useless. */ + if (vi->rq[i]->num == 0) { + free_unused_bufs(vi); + err = -ENOMEM; + goto free_recv_bufs; + } } /* Assume link up if device can't report link status, @@ -1077,63 +1359,51 @@ static int virtnet_probe(struct virtio_d netif_carrier_on(dev); } - pr_debug("virtnet: registered device %s\n", dev->name); + pr_debug("virtnet: registered device %s with %d RX and TX vq's\n", + dev->name, num_queue_pairs); + return 0; -unregister: +free_recv_bufs: + free_receive_bufs(vi); unregister_netdev(dev); - cancel_delayed_work_sync(&vi->refill); + free_vqs: + for (i = 0; i < num_queue_pairs; i++) + cancel_delayed_work_sync(&vi->rq[i]->refill); vdev->config->del_vqs(vdev); -free_stats: - free_percpu(vi->stats); -free: + +free_netdev: + free_rq_sq(vi); + free_netdev(dev); return err; } -static void free_unused_bufs(struct virtnet_info *vi) -{ - void *buf; - while (1) { - buf = virtqueue_detach_unused_buf(vi->svq); - if (!buf) - break; - dev_kfree_skb(buf); - } - while (1) { - buf = virtqueue_detach_unused_buf(vi->rvq); - if (!buf) - break; - if (vi->mergeable_rx_bufs || vi->big_packets) - give_pages(vi, buf); - else - dev_kfree_skb(buf); - --vi->num; - } - BUG_ON(vi->num != 0); -} - static void __devexit virtnet_remove(struct virtio_device *vdev) { struct virtnet_info *vi = vdev->priv; + int i; /* Stop all the virtqueues. */ vdev->config->reset(vdev); unregister_netdev(vi->dev); - cancel_delayed_work_sync(&vi->refill); + + for (i = 0; i < vi->num_queue_pairs; i++) + cancel_delayed_work_sync(&vi->rq[i]->refill); /* Free unused buffers in both send and recv, if any. */ free_unused_bufs(vi); vdev->config->del_vqs(vi->vdev); - while (vi->pages) - __free_pages(get_a_page(vi, GFP_KERNEL), 0); + free_receive_bufs(vi); + + /* Free memory for send and receive queues */ + free_rq_sq(vi); - free_percpu(vi->stats); free_netdev(vi->dev); } @@ -1149,7 +1419,7 @@ static unsigned int features[] = { VIRTIO_NET_F_HOST_ECN, VIRTIO_NET_F_GUEST_TSO4, VIRTIO_NET_F_GUEST_TSO6, VIRTIO_NET_F_GUEST_ECN, VIRTIO_NET_F_GUEST_UFO, VIRTIO_NET_F_MRG_RXBUF, VIRTIO_NET_F_STATUS, VIRTIO_NET_F_CTRL_VQ, - VIRTIO_NET_F_CTRL_RX, VIRTIO_NET_F_CTRL_VLAN, + VIRTIO_NET_F_CTRL_RX, VIRTIO_NET_F_CTRL_VLAN, VIRTIO_NET_F_MULTIQUEUE, }; static struct virtio_driver virtio_net_driver = { diff -ruNp org/include/linux/virtio_net.h new/include/linux/virtio_net.h --- org/include/linux/virtio_net.h 2011-11-11 16:44:38.000000000 +0530 +++ new/include/linux/virtio_net.h 2011-11-11 16:44:59.000000000 +0530 @@ -58,6 +58,8 @@ struct virtio_net_config { __u8 mac[6]; /* See VIRTIO_NET_F_STATUS and VIRTIO_NET_S_* above */ __u16 status; + /* Total number of RX/TX queues */ + __u16 num_queues; } __attribute__((packed)); /* This is the first element of the scatter-gather list. If you don't
Changes for multiqueue vhost_net driver. Signed-off-by: krkumar2 at in.ibm.com --- drivers/vhost/net.c | 253 +++++++++++++++++++++++++--------------- drivers/vhost/vhost.c | 225 ++++++++++++++++++++++++----------- drivers/vhost/vhost.h | 26 +++- 3 files changed, 340 insertions(+), 164 deletions(-) diff -ruNp org/drivers/vhost/net.c new/drivers/vhost/net.c --- org/drivers/vhost/net.c 2011-11-11 16:44:56.000000000 +0530 +++ new/drivers/vhost/net.c 2011-11-11 16:45:11.000000000 +0530 @@ -41,12 +41,6 @@ MODULE_PARM_DESC(experimental_zcopytx, " #define VHOST_MAX_PEND 128 #define VHOST_GOODCOPY_LEN 256 -enum { - VHOST_NET_VQ_RX = 0, - VHOST_NET_VQ_TX = 1, - VHOST_NET_VQ_MAX = 2, -}; - enum vhost_net_poll_state { VHOST_NET_POLL_DISABLED = 0, VHOST_NET_POLL_STARTED = 1, @@ -55,12 +49,13 @@ enum vhost_net_poll_state { struct vhost_net { struct vhost_dev dev; - struct vhost_virtqueue vqs[VHOST_NET_VQ_MAX]; - struct vhost_poll poll[VHOST_NET_VQ_MAX]; + struct vhost_virtqueue *vqs; + struct vhost_poll *poll; + struct socket **socks; /* Tells us whether we are polling a socket for TX. * We only do this when socket buffer fills up. * Protected by tx vq lock. */ - enum vhost_net_poll_state tx_poll_state; + enum vhost_net_poll_state *tx_poll_state; }; static bool vhost_sock_zcopy(struct socket *sock) @@ -108,28 +103,28 @@ static void copy_iovec_hdr(const struct } /* Caller must have TX VQ lock */ -static void tx_poll_stop(struct vhost_net *net) +static void tx_poll_stop(struct vhost_net *net, int qnum) { - if (likely(net->tx_poll_state != VHOST_NET_POLL_STARTED)) + if (likely(net->tx_poll_state[qnum / 2] != VHOST_NET_POLL_STARTED)) return; - vhost_poll_stop(net->poll + VHOST_NET_VQ_TX); - net->tx_poll_state = VHOST_NET_POLL_STOPPED; + vhost_poll_stop(&net->poll[qnum]); + net->tx_poll_state[qnum / 2] = VHOST_NET_POLL_STOPPED; } /* Caller must have TX VQ lock */ -static void tx_poll_start(struct vhost_net *net, struct socket *sock) +static void tx_poll_start(struct vhost_net *net, struct socket *sock, int qnum) { - if (unlikely(net->tx_poll_state != VHOST_NET_POLL_STOPPED)) + if (unlikely(net->tx_poll_state[qnum / 2] != VHOST_NET_POLL_STOPPED)) return; - vhost_poll_start(net->poll + VHOST_NET_VQ_TX, sock->file); - net->tx_poll_state = VHOST_NET_POLL_STARTED; + vhost_poll_start(&net->poll[qnum], sock->file); + net->tx_poll_state[qnum / 2] = VHOST_NET_POLL_STARTED; } /* Expects to be always run from workqueue - which acts as * read-size critical section for our kind of RCU. */ -static void handle_tx(struct vhost_net *net) +static void handle_tx(struct vhost_virtqueue *vq) { - struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_TX]; + struct vhost_net *net = container_of(vq->dev, struct vhost_net, dev); unsigned out, in, s; int head; struct msghdr msg = { @@ -155,7 +150,7 @@ static void handle_tx(struct vhost_net * wmem = atomic_read(&sock->sk->sk_wmem_alloc); if (wmem >= sock->sk->sk_sndbuf) { mutex_lock(&vq->mutex); - tx_poll_start(net, sock); + tx_poll_start(net, sock, vq->qnum); mutex_unlock(&vq->mutex); return; } @@ -164,7 +159,7 @@ static void handle_tx(struct vhost_net * vhost_disable_notify(&net->dev, vq); if (wmem < sock->sk->sk_sndbuf / 2) - tx_poll_stop(net); + tx_poll_stop(net, vq->qnum); hdr_size = vq->vhost_hlen; zcopy = vhost_sock_zcopy(sock); @@ -186,7 +181,7 @@ static void handle_tx(struct vhost_net * wmem = atomic_read(&sock->sk->sk_wmem_alloc); if (wmem >= sock->sk->sk_sndbuf * 3 / 4) { - tx_poll_start(net, sock); + tx_poll_start(net, sock, vq->qnum); set_bit(SOCK_ASYNC_NOSPACE, &sock->flags); break; } @@ -197,7 +192,7 @@ static void handle_tx(struct vhost_net * (vq->upend_idx - vq->done_idx) : (vq->upend_idx + UIO_MAXIOV - vq->done_idx); if (unlikely(num_pends > VHOST_MAX_PEND)) { - tx_poll_start(net, sock); + tx_poll_start(net, sock, vq->qnum); set_bit(SOCK_ASYNC_NOSPACE, &sock->flags); break; } @@ -257,7 +252,7 @@ static void handle_tx(struct vhost_net * UIO_MAXIOV; } vhost_discard_vq_desc(vq, 1); - tx_poll_start(net, sock); + tx_poll_start(net, sock, vq->qnum); break; } if (err != len) @@ -353,9 +348,9 @@ err: /* Expects to be always run from workqueue - which acts as * read-size critical section for our kind of RCU. */ -static void handle_rx(struct vhost_net *net) +static void handle_rx(struct vhost_virtqueue *vq) { - struct vhost_virtqueue *vq = &net->dev.vqs[VHOST_NET_VQ_RX]; + struct vhost_net *net = container_of(vq->dev, struct vhost_net, dev); unsigned uninitialized_var(in), log; struct vhost_log *vq_log; struct msghdr msg = { @@ -464,87 +459,155 @@ static void handle_tx_kick(struct vhost_ { struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue, poll.work); - struct vhost_net *net = container_of(vq->dev, struct vhost_net, dev); - handle_tx(net); + handle_tx(vq); } static void handle_rx_kick(struct vhost_work *work) { struct vhost_virtqueue *vq = container_of(work, struct vhost_virtqueue, poll.work); - struct vhost_net *net = container_of(vq->dev, struct vhost_net, dev); - handle_rx(net); + handle_rx(vq); } static void handle_tx_net(struct vhost_work *work) { - struct vhost_net *net = container_of(work, struct vhost_net, - poll[VHOST_NET_VQ_TX].work); - handle_tx(net); + struct vhost_virtqueue *vq = container_of(work, struct vhost_poll, + work)->vq; + + handle_tx(vq); } static void handle_rx_net(struct vhost_work *work) { - struct vhost_net *net = container_of(work, struct vhost_net, - poll[VHOST_NET_VQ_RX].work); - handle_rx(net); + struct vhost_virtqueue *vq = container_of(work, struct vhost_poll, + work)->vq; + + handle_rx(vq); } -static int vhost_net_open(struct inode *inode, struct file *f) +void vhost_free_vqs(struct vhost_dev *dev) { - struct vhost_net *n = kmalloc(sizeof *n, GFP_KERNEL); - struct vhost_dev *dev; - int r; + struct vhost_net *n = container_of(dev, struct vhost_net, dev); - if (!n) - return -ENOMEM; + if (!n->vqs) + return; - dev = &n->dev; - n->vqs[VHOST_NET_VQ_TX].handle_kick = handle_tx_kick; - n->vqs[VHOST_NET_VQ_RX].handle_kick = handle_rx_kick; - r = vhost_dev_init(dev, n->vqs, VHOST_NET_VQ_MAX); - if (r < 0) { - kfree(n); - return r; + kfree(n->socks); + kfree(n->tx_poll_state); + kfree(n->poll); + kfree(n->vqs); + + /* + * Reset so that vhost_net_release (which gets called when + * vhost_dev_set_owner() call fails) will notice. + */ + n->vqs = NULL; +} + +int vhost_setup_vqs(struct vhost_dev *dev, int numtxqs) +{ + struct vhost_net *n = container_of(dev, struct vhost_net, dev); + int i, nvqs; + int ret = -ENOMEM; + + if (numtxqs < 0) + return -EINVAL; + + if (numtxqs == 0) { + /* Old qemu doesn't pass arguments to set_owner, use 1 txq */ + numtxqs = 1; + } + + /* Get total number of virtqueues */ + nvqs = numtxqs * 2; + + n->vqs = kmalloc(nvqs * sizeof(*n->vqs), GFP_KERNEL); + n->poll = kmalloc(nvqs * sizeof(*n->poll), GFP_KERNEL); + n->socks = kmalloc(nvqs * sizeof(*n->socks), GFP_KERNEL); + n->tx_poll_state = kmalloc(numtxqs * sizeof(*n->tx_poll_state), + GFP_KERNEL); + if (!n->vqs || !n->poll || !n->socks || !n->tx_poll_state) + goto err; + + /* RX followed by TX queues */ + for (i = 0; i < nvqs; i += 2) { + n->vqs[i].handle_kick = handle_rx_kick; + n->vqs[i + 1].handle_kick = handle_tx_kick; } - vhost_poll_init(n->poll + VHOST_NET_VQ_TX, handle_tx_net, POLLOUT, dev); - vhost_poll_init(n->poll + VHOST_NET_VQ_RX, handle_rx_net, POLLIN, dev); - n->tx_poll_state = VHOST_NET_POLL_DISABLED; + ret = vhost_dev_init(dev, n->vqs, nvqs); + if (ret < 0) + goto err; - f->private_data = n; + for (i = 0; i < nvqs; i += 2) { + vhost_poll_init(&n->poll[i], handle_rx_net, POLLIN, + &n->vqs[i]); + vhost_poll_init(&n->poll[i+1], handle_tx_net, POLLOUT, + &n->vqs[i+1]); + if (i / 2 < numtxqs) + n->tx_poll_state[i/2] = VHOST_NET_POLL_DISABLED; + } return 0; + +err: + /* Free all pointers that may have been allocated */ + vhost_free_vqs(dev); + + return ret; +} + +static int vhost_net_open(struct inode *inode, struct file *f) +{ + struct vhost_net *n = kzalloc(sizeof *n, GFP_KERNEL); + int ret = -ENOMEM; + + if (n) { + struct vhost_dev *dev = &n->dev; + + f->private_data = n; + mutex_init(&dev->mutex); + + /* Defer all other initialization till user does SET_OWNER */ + ret = 0; + } + + return ret; } static void vhost_net_disable_vq(struct vhost_net *n, struct vhost_virtqueue *vq) { + int qnum = vq->qnum; + if (!vq->private_data) return; - if (vq == n->vqs + VHOST_NET_VQ_TX) { - tx_poll_stop(n); - n->tx_poll_state = VHOST_NET_POLL_DISABLED; - } else - vhost_poll_stop(n->poll + VHOST_NET_VQ_RX); + if (qnum & 1) { /* Odd qnum -> TX */ + tx_poll_stop(n, qnum); + n->tx_poll_state[qnum / 2] = VHOST_NET_POLL_DISABLED; + } else { /* Even qnum -> RX */ + vhost_poll_stop(&n->poll[qnum]); + } } static void vhost_net_enable_vq(struct vhost_net *n, struct vhost_virtqueue *vq) { struct socket *sock; + int qnum = vq->qnum; sock = rcu_dereference_protected(vq->private_data, lockdep_is_held(&vq->mutex)); if (!sock) return; - if (vq == n->vqs + VHOST_NET_VQ_TX) { - n->tx_poll_state = VHOST_NET_POLL_STOPPED; - tx_poll_start(n, sock); - } else - vhost_poll_start(n->poll + VHOST_NET_VQ_RX, sock->file); + if (qnum & 1) { /* Odd qnum -> TX */ + n->tx_poll_state[qnum / 2] = VHOST_NET_POLL_STOPPED; + tx_poll_start(n, sock, qnum); + } else { /* Even qnum -> RX */ + vhost_poll_start(&n->poll[qnum], sock->file); + } } static struct socket *vhost_net_stop_vq(struct vhost_net *n, @@ -561,11 +624,12 @@ static struct socket *vhost_net_stop_vq( return sock; } -static void vhost_net_stop(struct vhost_net *n, struct socket **tx_sock, - struct socket **rx_sock) +static void vhost_net_stop(struct vhost_net *n) { - *tx_sock = vhost_net_stop_vq(n, n->vqs + VHOST_NET_VQ_TX); - *rx_sock = vhost_net_stop_vq(n, n->vqs + VHOST_NET_VQ_RX); + int i; + + for (i = 0; i < n->dev.nvqs; i++) + n->socks[i] = vhost_net_stop_vq(n, &n->vqs[i]); } static void vhost_net_flush_vq(struct vhost_net *n, int index) @@ -576,26 +640,33 @@ static void vhost_net_flush_vq(struct vh static void vhost_net_flush(struct vhost_net *n) { - vhost_net_flush_vq(n, VHOST_NET_VQ_TX); - vhost_net_flush_vq(n, VHOST_NET_VQ_RX); + int i; + + for (i = 0; i < n->dev.nvqs; i++) + vhost_net_flush_vq(n, i); } static int vhost_net_release(struct inode *inode, struct file *f) { struct vhost_net *n = f->private_data; - struct socket *tx_sock; - struct socket *rx_sock; + struct vhost_dev *dev = &n->dev; + int i; - vhost_net_stop(n, &tx_sock, &rx_sock); + vhost_net_stop(n); vhost_net_flush(n); - vhost_dev_cleanup(&n->dev); - if (tx_sock) - fput(tx_sock->file); - if (rx_sock) - fput(rx_sock->file); + vhost_dev_cleanup(dev); + + for (i = 0; i < n->dev.nvqs; i++) + if (n->socks[i]) + fput(n->socks[i]->file); + /* We do an extra flush before freeing memory, * since jobs can re-queue themselves. */ vhost_net_flush(n); + + /* Free all old pointers */ + vhost_free_vqs(dev); + kfree(n); return 0; } @@ -677,7 +748,7 @@ static long vhost_net_set_backend(struct if (r) goto err; - if (index >= VHOST_NET_VQ_MAX) { + if (index >= n->dev.nvqs) { r = -ENOBUFS; goto err; } @@ -743,23 +814,25 @@ err: static long vhost_net_reset_owner(struct vhost_net *n) { - struct socket *tx_sock = NULL; - struct socket *rx_sock = NULL; long err; + int i; mutex_lock(&n->dev.mutex); err = vhost_dev_check_owner(&n->dev); - if (err) - goto done; - vhost_net_stop(n, &tx_sock, &rx_sock); + if (err) { + mutex_unlock(&n->dev.mutex); + return err; + } + + vhost_net_stop(n); vhost_net_flush(n); err = vhost_dev_reset_owner(&n->dev); -done: mutex_unlock(&n->dev.mutex); - if (tx_sock) - fput(tx_sock->file); - if (rx_sock) - fput(rx_sock->file); + + for (i = 0; i < n->dev.nvqs; i++) + if (n->socks[i]) + fput(n->socks[i]->file); + return err; } @@ -788,7 +861,7 @@ static int vhost_net_set_features(struct } n->dev.acked_features = features; smp_wmb(); - for (i = 0; i < VHOST_NET_VQ_MAX; ++i) { + for (i = 0; i < n->dev.nvqs; ++i) { mutex_lock(&n->vqs[i].mutex); n->vqs[i].vhost_hlen = vhost_hlen; n->vqs[i].sock_hlen = sock_hlen; @@ -864,7 +937,7 @@ static struct miscdevice vhost_net_misc static int vhost_net_init(void) { if (experimental_zcopytx) - vhost_enable_zcopy(VHOST_NET_VQ_TX); + vhost_enable_zcopy(VHOST_NET_TX_VQS); return misc_register(&vhost_net_misc); } module_init(vhost_net_init); diff -ruNp org/drivers/vhost/vhost.c new/drivers/vhost/vhost.c --- org/drivers/vhost/vhost.c 2011-11-11 16:44:56.000000000 +0530 +++ new/drivers/vhost/vhost.c 2011-11-11 16:45:11.000000000 +0530 @@ -75,12 +75,12 @@ static void vhost_work_init(struct vhost /* Init poll structure */ void vhost_poll_init(struct vhost_poll *poll, vhost_work_fn_t fn, - unsigned long mask, struct vhost_dev *dev) + unsigned long mask, struct vhost_virtqueue *vq) { init_waitqueue_func_entry(&poll->wait, vhost_poll_wakeup); init_poll_funcptr(&poll->table, vhost_poll_func); poll->mask = mask; - poll->dev = dev; + poll->vq = vq; vhost_work_init(&poll->work, fn); } @@ -103,30 +103,31 @@ void vhost_poll_stop(struct vhost_poll * remove_wait_queue(poll->wqh, &poll->wait); } -static bool vhost_work_seq_done(struct vhost_dev *dev, struct vhost_work *work, - unsigned seq) +static bool vhost_work_seq_done(struct vhost_virtqueue *vq, + struct vhost_work *work, unsigned seq) { int left; - spin_lock_irq(&dev->work_lock); + spin_lock_irq(vq->work_lock); left = seq - work->done_seq; - spin_unlock_irq(&dev->work_lock); + spin_unlock_irq(vq->work_lock); return left <= 0; } -static void vhost_work_flush(struct vhost_dev *dev, struct vhost_work *work) +static void vhost_work_flush(struct vhost_virtqueue *vq, + struct vhost_work *work) { unsigned seq; int flushing; - spin_lock_irq(&dev->work_lock); + spin_lock_irq(vq->work_lock); seq = work->queue_seq; work->flushing++; - spin_unlock_irq(&dev->work_lock); - wait_event(work->done, vhost_work_seq_done(dev, work, seq)); - spin_lock_irq(&dev->work_lock); + spin_unlock_irq(vq->work_lock); + wait_event(work->done, vhost_work_seq_done(vq, work, seq)); + spin_lock_irq(vq->work_lock); flushing = --work->flushing; - spin_unlock_irq(&dev->work_lock); + spin_unlock_irq(vq->work_lock); BUG_ON(flushing < 0); } @@ -134,26 +135,26 @@ static void vhost_work_flush(struct vhos * locks that are also used by the callback. */ void vhost_poll_flush(struct vhost_poll *poll) { - vhost_work_flush(poll->dev, &poll->work); + vhost_work_flush(poll->vq, &poll->work); } -static inline void vhost_work_queue(struct vhost_dev *dev, +static inline void vhost_work_queue(struct vhost_virtqueue *vq, struct vhost_work *work) { unsigned long flags; - spin_lock_irqsave(&dev->work_lock, flags); + spin_lock_irqsave(vq->work_lock, flags); if (list_empty(&work->node)) { - list_add_tail(&work->node, &dev->work_list); + list_add_tail(&work->node, vq->work_list); work->queue_seq++; - wake_up_process(dev->worker); + wake_up_process(vq->worker); } - spin_unlock_irqrestore(&dev->work_lock, flags); + spin_unlock_irqrestore(vq->work_lock, flags); } void vhost_poll_queue(struct vhost_poll *poll) { - vhost_work_queue(poll->dev, &poll->work); + vhost_work_queue(poll->vq, &poll->work); } static void vhost_vq_reset(struct vhost_dev *dev, @@ -188,17 +189,17 @@ static void vhost_vq_reset(struct vhost_ static int vhost_worker(void *data) { - struct vhost_dev *dev = data; + struct vhost_virtqueue *vq = data; struct vhost_work *work = NULL; unsigned uninitialized_var(seq); - use_mm(dev->mm); + use_mm(vq->dev->mm); for (;;) { /* mb paired w/ kthread_stop */ set_current_state(TASK_INTERRUPTIBLE); - spin_lock_irq(&dev->work_lock); + spin_lock_irq(vq->work_lock); if (work) { work->done_seq = seq; if (work->flushing) @@ -206,18 +207,18 @@ static int vhost_worker(void *data) } if (kthread_should_stop()) { - spin_unlock_irq(&dev->work_lock); + spin_unlock_irq(vq->work_lock); __set_current_state(TASK_RUNNING); break; } - if (!list_empty(&dev->work_list)) { - work = list_first_entry(&dev->work_list, + if (!list_empty(vq->work_list)) { + work = list_first_entry(vq->work_list, struct vhost_work, node); list_del_init(&work->node); seq = work->queue_seq; } else work = NULL; - spin_unlock_irq(&dev->work_lock); + spin_unlock_irq(vq->work_lock); if (work) { __set_current_state(TASK_RUNNING); @@ -226,7 +227,7 @@ static int vhost_worker(void *data) schedule(); } - unuse_mm(dev->mm); + unuse_mm(vq->dev->mm); return 0; } @@ -260,7 +261,7 @@ static long vhost_dev_alloc_iovecs(struc GFP_KERNEL); dev->vqs[i].heads = kmalloc(sizeof *dev->vqs[i].heads * UIO_MAXIOV, GFP_KERNEL); - zcopy = vhost_zcopy_mask & (0x1 << i); + zcopy = vhost_zcopy_mask & (0x1 << (i & VHOST_NET_TX_VQS)); if (zcopy) dev->vqs[i].ubuf_info kmalloc(sizeof *dev->vqs[i].ubuf_info * @@ -286,6 +287,30 @@ static void vhost_dev_free_iovecs(struct vhost_vq_free_iovecs(&dev->vqs[i]); } +/* + * Get index of an existing thread that will handle this rx/tx queue pair. + * The same thread handles both rx and tx. + */ +static int vhost_get_thread_index(int index) +{ + return (index / 2) % MAX_VHOST_THREADS; +} + +/* Get index of the an earlier vq that we can share with */ +static int vhost_get_vq_index(int index) +{ + return vhost_get_thread_index(index) * 2; +} + +/* + * This is needed to determine whether work_list/work_lock needs + * initialization; or to start a new worker thread. + */ +static int vhost_needs_init(int i, int j) +{ + return i == j * 2; +} + long vhost_dev_init(struct vhost_dev *dev, struct vhost_virtqueue *vqs, int nvqs) { @@ -298,21 +323,31 @@ long vhost_dev_init(struct vhost_dev *de dev->log_file = NULL; dev->memory = NULL; dev->mm = NULL; - spin_lock_init(&dev->work_lock); - INIT_LIST_HEAD(&dev->work_list); - dev->worker = NULL; for (i = 0; i < dev->nvqs; ++i) { - dev->vqs[i].log = NULL; - dev->vqs[i].indirect = NULL; - dev->vqs[i].heads = NULL; - dev->vqs[i].ubuf_info = NULL; - dev->vqs[i].dev = dev; - mutex_init(&dev->vqs[i].mutex); + struct vhost_virtqueue *vq = &dev->vqs[i]; + int j = vhost_get_thread_index(i); + + if (vhost_needs_init(i, j)) { + spin_lock_init(&dev->work[j].work_lock); + INIT_LIST_HEAD(&dev->work[j].work_list); + } + + vq->ubuf_info = NULL; + vq->work_lock = &dev->work[j].work_lock; + vq->work_list = &dev->work[j].work_list; + + vq->worker = NULL; + vq->qnum = i; + vq->log = NULL; + vq->indirect = NULL; + vq->heads = NULL; + vq->dev = dev; + mutex_init(&vq->mutex); vhost_vq_reset(dev, dev->vqs + i); - if (dev->vqs[i].handle_kick) - vhost_poll_init(&dev->vqs[i].poll, - dev->vqs[i].handle_kick, POLLIN, dev); + if (vq->handle_kick) + vhost_poll_init(&vq->poll, + vq->handle_kick, POLLIN, vq); } return 0; @@ -339,21 +374,83 @@ static void vhost_attach_cgroups_work(st s->ret = cgroup_attach_task_all(s->owner, current); } -static int vhost_attach_cgroups(struct vhost_dev *dev) +static int vhost_attach_cgroups(struct vhost_virtqueue *vq) { struct vhost_attach_cgroups_struct attach; attach.owner = current; vhost_work_init(&attach.work, vhost_attach_cgroups_work); - vhost_work_queue(dev, &attach.work); - vhost_work_flush(dev, &attach.work); + vhost_work_queue(vq, &attach.work); + vhost_work_flush(vq, &attach.work); return attach.ret; } +static void __vhost_stop_workers(struct vhost_dev *dev, int nvhosts) +{ + int i; + + for (i = 0; i < dev->nvqs; i++) { + if (i < nvhosts) { + WARN_ON(!list_empty(dev->vqs[i * 2].work_list)); + if (dev->vqs[i * 2].worker) + kthread_stop(dev->vqs[i * 2].worker); + } + dev->vqs[i].worker = NULL; + } + + if (dev->mm) + mmput(dev->mm); + dev->mm = NULL; +} + +static void vhost_stop_workers(struct vhost_dev *dev) +{ + int nthreads = min_t(int, dev->nvqs / 2, MAX_VHOST_THREADS); + + __vhost_stop_workers(dev, nthreads); +} + +static int vhost_start_workers(struct vhost_dev *dev) +{ + int i, err; + + for (i = 0; i < dev->nvqs; ++i) { + struct vhost_virtqueue *vq = &dev->vqs[i]; + int j = vhost_get_thread_index(i); + + if (vhost_needs_init(i, j)) { + /* Start a new thread */ + vq->worker = kthread_create(vhost_worker, vq, + "vhost-%d-%d", + current->pid, j); + if (IS_ERR(vq->worker)) { + err = PTR_ERR(vq->worker); + goto err; + } + + wake_up_process(vq->worker); + + /* avoid contributing to loadavg */ + err = vhost_attach_cgroups(vq); + if (err) + goto err; + } else { + /* Share work with an existing thread */ + int j = vhost_get_vq_index(i); + + vq->worker = dev->vqs[j].worker; + } + } + return 0; + +err: + __vhost_stop_workers(dev, i / 2); + return err; +} + /* Caller should have device mutex */ -static long vhost_dev_set_owner(struct vhost_dev *dev) +static long vhost_dev_set_owner(struct vhost_dev *dev, int numtxqs) { - struct task_struct *worker; int err; /* Is there an owner already? */ @@ -362,33 +459,30 @@ static long vhost_dev_set_owner(struct v goto err_mm; } + err = vhost_setup_vqs(dev, numtxqs); + if (err) + goto err_mm; + /* No owner, become one */ dev->mm = get_task_mm(current); - worker = kthread_create(vhost_worker, dev, "vhost-%d", current->pid); - if (IS_ERR(worker)) { - err = PTR_ERR(worker); - goto err_worker; - } - - dev->worker = worker; - wake_up_process(worker); /* avoid contributing to loadavg */ - err = vhost_attach_cgroups(dev); + /* Start threads */ + err = vhost_start_workers(dev); if (err) - goto err_cgroup; + goto free_vqs; err = vhost_dev_alloc_iovecs(dev); if (err) - goto err_cgroup; + goto clean_workers; return 0; -err_cgroup: - kthread_stop(worker); - dev->worker = NULL; -err_worker: +clean_workers: + vhost_stop_workers(dev); +free_vqs: if (dev->mm) mmput(dev->mm); dev->mm = NULL; + vhost_free_vqs(dev); err_mm: return err; } @@ -474,14 +568,7 @@ void vhost_dev_cleanup(struct vhost_dev kfree(rcu_dereference_protected(dev->memory, lockdep_is_held(&dev->mutex))); RCU_INIT_POINTER(dev->memory, NULL); - WARN_ON(!list_empty(&dev->work_list)); - if (dev->worker) { - kthread_stop(dev->worker); - dev->worker = NULL; - } - if (dev->mm) - mmput(dev->mm); - dev->mm = NULL; + vhost_stop_workers(dev); } static int log_access_ok(void __user *log_base, u64 addr, unsigned long sz) @@ -835,7 +922,7 @@ long vhost_dev_ioctl(struct vhost_dev *d /* If you are not the owner, you can become one */ if (ioctl == VHOST_SET_OWNER) { - r = vhost_dev_set_owner(d); + r = vhost_dev_set_owner(d, arg); goto done; } diff -ruNp org/drivers/vhost/vhost.h new/drivers/vhost/vhost.h --- org/drivers/vhost/vhost.h 2011-11-11 16:44:56.000000000 +0530 +++ new/drivers/vhost/vhost.h 2011-11-11 16:45:11.000000000 +0530 @@ -18,6 +18,9 @@ #define VHOST_DMA_DONE_LEN 1 #define VHOST_DMA_CLEAR_LEN 0 +/* TX vqs are those vq's whose qnum's are odd */ +#define VHOST_NET_TX_VQS 0x1 + struct vhost_device; struct vhost_work; @@ -40,11 +43,11 @@ struct vhost_poll { wait_queue_t wait; struct vhost_work work; unsigned long mask; - struct vhost_dev *dev; + struct vhost_virtqueue *vq; /* points back to vq */ }; void vhost_poll_init(struct vhost_poll *poll, vhost_work_fn_t fn, - unsigned long mask, struct vhost_dev *dev); + unsigned long mask, struct vhost_virtqueue *vq); void vhost_poll_start(struct vhost_poll *poll, struct file *file); void vhost_poll_stop(struct vhost_poll *poll); void vhost_poll_flush(struct vhost_poll *poll); @@ -141,8 +144,21 @@ struct vhost_virtqueue { /* Reference counting for outstanding ubufs. * Protected by vq mutex. Writers must also take device mutex. */ struct vhost_ubuf_ref *ubufs; + + struct task_struct *worker; /* worker for this vq */ + spinlock_t *work_lock; /* points to a dev->work_lock[] entry */ + struct list_head *work_list; /* points to a dev->work_list[] entry */ + int qnum; /* 0 for RX, 1 for TX, and so on alternatively */ }; +/* work entry and the lock */ +struct work_lock_list { + spinlock_t work_lock; + struct list_head work_list; +} ____cacheline_aligned_in_smp; + +#define MAX_VHOST_THREADS 4 + struct vhost_dev { /* Readers use RCU to access memory table pointer * log base pointer and features. @@ -155,11 +171,11 @@ struct vhost_dev { int nvqs; struct file *log_file; struct eventfd_ctx *log_ctx; - spinlock_t work_lock; - struct list_head work_list; - struct task_struct *worker; + struct work_lock_list work[MAX_VHOST_THREADS]; }; +int vhost_setup_vqs(struct vhost_dev *dev, int numtxqs); +void vhost_free_vqs(struct vhost_dev *dev); long vhost_dev_init(struct vhost_dev *, struct vhost_virtqueue *vqs, int nvqs); long vhost_dev_check_owner(struct vhost_dev *); long vhost_dev_reset_owner(struct vhost_dev *);
Krishna Kumar
2011-Nov-11 13:06 UTC
[RFC] [ver3 PATCH 5/6] virtio: Implement find_vqs_irq()
Implement find_vqs_irq() to reduce number of vectors. It can be used to specify which vq's need their own irqs, and which can share irqs with other vq's. Signed-off-by: krkumar2 at in.ibm.com --- drivers/virtio/virtio_pci.c | 108 ++++++++++++++++++++++++-------- include/linux/virtio_config.h | 14 ++++ 2 files changed, 95 insertions(+), 27 deletions(-) diff -ruNp org/drivers/virtio/virtio_pci.c new/drivers/virtio/virtio_pci.c --- org/drivers/virtio/virtio_pci.c 2011-11-11 16:45:09.000000000 +0530 +++ new/drivers/virtio/virtio_pci.c 2011-11-11 16:54:35.000000000 +0530 @@ -40,7 +40,7 @@ struct virtio_pci_device /* the IO mapping for the PCI config space */ void __iomem *ioaddr; - /* a list of queues so we can dispatch IRQs */ + /* a list of queues which have registered to receive IRQs */ spinlock_t lock; struct list_head virtqueues; @@ -196,7 +196,7 @@ static irqreturn_t vp_config_changed(int return IRQ_HANDLED; } -/* Notify all virtqueues on an interrupt. */ +/* Notify all vq's on 'virtqueues' list on an interrupt. */ static irqreturn_t vp_vring_interrupt(int irq, void *opaque) { struct virtio_pci_device *vp_dev = opaque; @@ -358,7 +358,7 @@ static struct virtqueue *setup_vq(struct struct virtio_pci_device *vp_dev = to_vp_device(vdev); struct virtio_pci_vq_info *info; struct virtqueue *vq; - unsigned long flags, size; + unsigned long size; u16 num; int err; @@ -378,6 +378,7 @@ static struct virtqueue *setup_vq(struct info->num = num; info->msix_vector = msix_vec; + INIT_LIST_HEAD(&info->node); size = PAGE_ALIGN(vring_size(num, VIRTIO_PCI_VRING_ALIGN)); info->queue = alloc_pages_exact(size, GFP_KERNEL|__GFP_ZERO); @@ -411,14 +412,6 @@ static struct virtqueue *setup_vq(struct } } - if (callback) { - spin_lock_irqsave(&vp_dev->lock, flags); - list_add(&info->node, &vp_dev->virtqueues); - spin_unlock_irqrestore(&vp_dev->lock, flags); - } else { - INIT_LIST_HEAD(&info->node); - } - return vq; out_assign: @@ -472,7 +465,8 @@ static void vp_del_vqs(struct virtio_dev if (vp_dev->per_vq_vectors && info->msix_vector != VIRTIO_MSI_NO_VECTOR) free_irq(vp_dev->msix_entries[info->msix_vector].vector, - vq); + list_empty(&info->node) ? + (void *)vq : (void *)vp_dev); vp_del_vq(vq); } vp_dev->per_vq_vectors = false; @@ -480,16 +474,37 @@ static void vp_del_vqs(struct virtio_dev vp_free_vectors(vdev); } +static void add_vq_to_list(struct virtqueue *vq, + struct virtio_pci_device *vp_dev, + vq_callback_t *cb) +{ + struct virtio_pci_vq_info *info = vq->priv; + unsigned long flags; + + if (cb) { + spin_lock_irqsave(&vp_dev->lock, flags); + list_add(&info->node, &vp_dev->virtqueues); + spin_unlock_irqrestore(&vp_dev->lock, flags); + } +} + +/* Return true if flags is NULL, or 'bit'# in flags is clear */ +static bool bit_clear(unsigned long *flags, int bit) +{ + return flags ? !test_bit(bit, flags) : true; +} + static int vp_try_to_find_vqs(struct virtio_device *vdev, unsigned nvqs, struct virtqueue *vqs[], vq_callback_t *callbacks[], const char *names[], bool use_msix, - bool per_vq_vectors) + bool per_vq_vectors, unsigned long *flags) { struct virtio_pci_device *vp_dev = to_vp_device(vdev); u16 msix_vec; int i, err, nvectors, allocated_vectors; + int count = 0; /* Count of vq's using shared irq's */ if (!use_msix) { /* Old style: one normal interrupt for change and all vqs. */ @@ -500,9 +515,19 @@ static int vp_try_to_find_vqs(struct vir if (per_vq_vectors) { /* Best option: one for change interrupt, one per vq. */ nvectors = 1; - for (i = 0; i < nvqs; ++i) - if (callbacks[i]) + for (i = 0; i < nvqs; ++i) { + bool alloc_irq = bit_clear(flags, i); + + /* + * We allocate a vector if cb is present, + * AND (driver requested a vector OR this + * is the first shared vector). + */ + if (callbacks[i] && + (alloc_irq || ++count == 1)) ++nvectors; + } + count = 0; } else { /* Second best: one for change, shared for all vqs. */ nvectors = 2; @@ -516,20 +541,38 @@ static int vp_try_to_find_vqs(struct vir vp_dev->per_vq_vectors = per_vq_vectors; allocated_vectors = vp_dev->msix_used_vectors; for (i = 0; i < nvqs; ++i) { - if (!callbacks[i] || !vp_dev->msix_enabled) + bool alloc_irq = bit_clear(flags, i); + irq_handler_t irq_handler; + void *data; + + if (!callbacks[i] || !vp_dev->msix_enabled || + !(alloc_irq || ++count == 1)) msix_vec = VIRTIO_MSI_NO_VECTOR; else if (vp_dev->per_vq_vectors) msix_vec = allocated_vectors++; else msix_vec = VP_MSIX_VQ_VECTOR; + vqs[i] = setup_vq(vdev, i, callbacks[i], names[i], msix_vec); if (IS_ERR(vqs[i])) { err = PTR_ERR(vqs[i]); goto error_find; } - if (!vp_dev->per_vq_vectors || msix_vec == VIRTIO_MSI_NO_VECTOR) + if (!vp_dev->per_vq_vectors || + msix_vec == VIRTIO_MSI_NO_VECTOR) { + add_vq_to_list(vqs[i], vp_dev, callbacks[i]); continue; + } + + if (alloc_irq) { + irq_handler = vring_interrupt; + data = vqs[i]; + } else { + add_vq_to_list(vqs[i], vp_dev, callbacks[i]); + irq_handler = vp_vring_interrupt; + data = vp_dev; + } /* allocate per-vq irq if available and necessary */ snprintf(vp_dev->msix_names[msix_vec], @@ -537,9 +580,9 @@ static int vp_try_to_find_vqs(struct vir "%s-%s", dev_name(&vp_dev->vdev.dev), names[i]); err = request_irq(vp_dev->msix_entries[msix_vec].vector, - vring_interrupt, 0, + irq_handler, 0, vp_dev->msix_names[msix_vec], - vqs[i]); + data); if (err) { vp_del_vq(vqs[i]); goto error_find; @@ -554,26 +597,36 @@ error_request: return err; } -/* the config->find_vqs() implementation */ -static int vp_find_vqs(struct virtio_device *vdev, unsigned nvqs, - struct virtqueue *vqs[], - vq_callback_t *callbacks[], - const char *names[]) +/* the config->find_vqs_irq() implementation */ +static int vp_find_vqs_irq(struct virtio_device *vdev, unsigned nvqs, + struct virtqueue *vqs[], + vq_callback_t *callbacks[], + const char *names[], unsigned long *flags) { int err; /* Try MSI-X with one vector per queue. */ - err = vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, true, true); + err = vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, true, true, + flags); if (!err) return 0; /* Fallback: MSI-X with one vector for config, one shared for queues. */ err = vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, - true, false); + true, false, NULL); if (!err) return 0; /* Finally fall back to regular interrupts. */ return vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, - false, false); + false, false, NULL); +} + +/* the config->find_vqs() implementation */ +static int vp_find_vqs(struct virtio_device *vdev, unsigned nvqs, + struct virtqueue *vqs[], + vq_callback_t *callbacks[], + const char *names[]) +{ + return vp_find_vqs_irq(vdev, nvqs, vqs, callbacks, names, NULL); } static struct virtio_config_ops virtio_pci_config_ops = { @@ -583,6 +636,7 @@ static struct virtio_config_ops virtio_p .set_status = vp_set_status, .reset = vp_reset, .find_vqs = vp_find_vqs, + .find_vqs_irq = vp_find_vqs_irq, .del_vqs = vp_del_vqs, .get_features = vp_get_features, .finalize_features = vp_finalize_features, diff -ruNp org/include/linux/virtio_config.h new/include/linux/virtio_config.h --- org/include/linux/virtio_config.h 2011-11-11 16:45:09.000000000 +0530 +++ new/include/linux/virtio_config.h 2011-11-11 16:45:21.000000000 +0530 @@ -92,6 +92,16 @@ * callbacks: array of callbacks, for each virtqueue * names: array of virtqueue names (mainly for debugging) * Returns 0 on success or error status + * @find_vqs_irq: find virtqueues and instantiate them. The flags parameter + * indicates the vq's that can share irq's. + * vdev: the virtio_device + * nvqs: the number of virtqueues to find + * vqs: on success, includes new virtqueues + * callbacks: array of callbacks, for each virtqueue + * names: array of virtqueue names (mainly for debugging) + * flags: indicates which vq's need their own irq and which can share. + * See example usage in virtio_net.c + * Returns 0 on success or error status * @del_vqs: free virtqueues found by find_vqs(). * @get_features: get the array of feature bits for this device. * vdev: the virtio_device @@ -114,6 +124,10 @@ struct virtio_config_ops { struct virtqueue *vqs[], vq_callback_t *callbacks[], const char *names[]); + int (*find_vqs_irq)(struct virtio_device *vdev, unsigned nvqs, + struct virtqueue *vqs[], + vq_callback_t *callbacks[], + const char *names[], unsigned long *flags); void (*del_vqs)(struct virtio_device *); u32 (*get_features)(struct virtio_device *vdev); void (*finalize_features)(struct virtio_device *vdev);
Krishna Kumar
2011-Nov-11 13:07 UTC
[RFC] [ver3 PATCH 6/6] virtio_net: Convert virtio_net driver to use find_vqs_irq
Convert virtio_net driver to use find_vqs_irq(). The TX vq's share a single irq, while the RX vq's have individual irq's. The skb_xmit_done handler also checks if any work is required. Signed-off-by: krkumar2 at in.ibm.com --- drivers/net/virtio_net.c | 29 ++++++++++++++++++++++------- 1 file changed, 22 insertions(+), 7 deletions(-) diff -ruNp org/drivers/net/virtio_net.c new/drivers/net/virtio_net.c --- org/drivers/net/virtio_net.c 2011-11-11 16:45:17.000000000 +0530 +++ new/drivers/net/virtio_net.c 2011-11-11 16:48:45.000000000 +0530 @@ -163,11 +163,13 @@ static void skb_xmit_done(struct virtque struct virtnet_info *vi = vq->vdev->priv; int qnum = vq->queue_index / 2; /* RX/TX vqs are allocated in pairs */ - /* Suppress further interrupts. */ - virtqueue_disable_cb(vq); + if (__netif_subqueue_stopped(vi->dev, qnum)) { + /* Suppress further interrupts. */ + virtqueue_disable_cb(vq); - /* We were probably waiting for more output buffers. */ - netif_wake_subqueue(vi->dev, qnum); + /* We were probably waiting for more output buffers. */ + netif_wake_subqueue(vi->dev, qnum); + } } static void set_skb_frag(struct sk_buff *skb, struct page *page, @@ -1120,6 +1122,7 @@ static void setup_cvq(struct virtnet_inf static int invoke_find_vqs(struct virtnet_info *vi) { + unsigned long *flags = NULL; vq_callback_t **callbacks; struct virtqueue **vqs; int ret = -ENOMEM; @@ -1141,6 +1144,14 @@ static int invoke_find_vqs(struct virtne if (!vqs || !callbacks || !names) goto err; + if (vi->num_queue_pairs > 1) { + int num = (total_vqs + BITS_PER_LONG - 1) / BITS_PER_LONG; + + flags = kzalloc(num * sizeof(*flags), GFP_KERNEL); + if (!flags) + goto err; + } + /* Allocate/initialize parameters for recv virtqueues */ for (i = 0; i < vi->num_queue_pairs * 2; i += 2) { callbacks[i] = skb_recv_done; @@ -1155,6 +1166,8 @@ static int invoke_find_vqs(struct virtne names[i] = kasprintf(GFP_KERNEL, "output.%d", i / 2); if (!names[i]) goto err; + if (flags) + set_bit(i, flags); } /* Parameters for control virtqueue, if any */ @@ -1163,9 +1176,9 @@ static int invoke_find_vqs(struct virtne names[i - 1] = "control"; } - ret = vi->vdev->config->find_vqs(vi->vdev, total_vqs, vqs, callbacks, - (const char **)names); - + ret = vi->vdev->config->find_vqs_irq(vi->vdev, total_vqs, vqs, + callbacks, (const char **)names, + flags); if (ret) goto err; @@ -1174,6 +1187,8 @@ static int invoke_find_vqs(struct virtne setup_cvq(vi, vqs, vi->num_queue_pairs * 2); err: + kfree(flags); + if (ret && names) for (i = 0; i < vi->num_queue_pairs * 2; i++) kfree(names[i]);
Hi, I'm seeing this BUG() sometimes when running it using a small patch I did for KVM tool: [ 1.280766] BUG: unable to handle kernel NULL pointer dereference at 0000000000000010 [ 1.281531] IP: [<ffffffff810b3ac7>] free_percpu+0x9a/0x104 [ 1.281531] PGD 0 [ 1.281531] Oops: 0000 [#1] PREEMPT SMP [ 1.281531] CPU 0 [ 1.281531] Pid: 1, comm: swapper Not tainted 3.1.0-sasha-19665-gef3d2b7 #39 [ 1.281531] RIP: 0010:[<ffffffff810b3ac7>] [<ffffffff810b3ac7>] free_percpu+0x9a/0x104 [ 1.281531] RSP: 0018:ffff88001383fd50 EFLAGS: 00010046 [ 1.281531] RAX: 0000000000000000 RBX: 0000000000000282 RCX: 00000000000f4400 [ 1.281531] RDX: 00003ffffffff000 RSI: ffff880000000240 RDI: 0000000001c06063 [ 1.281531] RBP: ffff880013fcb7c0 R08: ffffea00004e30c0 R09: ffffffff8138ba64 [ 1.281531] R10: 0000000000001880 R11: 0000000000001880 R12: ffff881213c00000 [ 1.281531] R13: ffff8800138c0e00 R14: 0000000000000010 R15: ffff8800138c0d00 [ 1.281531] FS: 0000000000000000(0000) GS:ffff880013c00000(0000) knlGS:0000000000000000 [ 1.281531] CS: 0010 DS: 0000 ES: 0000 CR0: 000000008005003b [ 1.281531] CR2: 0000000000000010 CR3: 0000000001c05000 CR4: 00000000000406f0 [ 1.281531] DR0: 0000000000000000 DR1: 0000000000000000 DR2: 0000000000000000 [ 1.281531] DR3: 0000000000000000 DR6: 00000000ffff0ff0 DR7: 0000000000000400 [ 1.281531] Process swapper (pid: 1, threadinfo ffff88001383e000, task ffff880013848000) [ 1.281531] Stack: [ 1.281531] ffff880013846ec0 0000000000000000 0000000000000000 ffffffff8138a0e5 [ 1.281531] ffff880013846ec0 ffff880013846800 ffff880013b6c000 ffffffff8138bb63 [ 1.281531] 0000000000000011 000000000000000f ffff8800fffffff0 0000000181239bcd [ 1.281531] Call Trace: [ 1.281531] [<ffffffff8138a0e5>] ? free_rq_sq+0x2c/0xce [ 1.281531] [<ffffffff8138bb63>] ? virtnet_probe+0x81c/0x855 [ 1.281531] [<ffffffff8129c9e7>] ? virtio_dev_probe+0xa7/0xc6 [ 1.281531] [<ffffffff8134d2c3>] ? driver_probe_device+0xb2/0x142 [ 1.281531] [<ffffffff8134d3a2>] ? __driver_attach+0x4f/0x6f [ 1.281531] [<ffffffff8134d353>] ? driver_probe_device+0x142/0x142 [ 1.281531] [<ffffffff8134c3ab>] ? bus_for_each_dev+0x47/0x72 [ 1.281531] [<ffffffff8134c90d>] ? bus_add_driver+0xa2/0x1e6 [ 1.281531] [<ffffffff81cc1b36>] ? tun_init+0x89/0x89 [ 1.281531] [<ffffffff8134db59>] ? driver_register+0x8d/0xf8 [ 1.281531] [<ffffffff81cc1b36>] ? tun_init+0x89/0x89 [ 1.281531] [<ffffffff81c98ac1>] ? do_one_initcall+0x78/0x130 [ 1.281531] [<ffffffff81c98c0e>] ? kernel_init+0x95/0x113 [ 1.281531] [<ffffffff81658274>] ? kernel_thread_helper+0x4/0x10 [ 1.281531] [<ffffffff81c98b79>] ? do_one_initcall+0x130/0x130 [ 1.281531] [<ffffffff81658270>] ? gs_change+0x13/0x13 [ 1.281531] Code: c2 85 d2 48 0f 45 2d d1 39 ce 00 eb 22 65 8b 14 25 90 cc 00 00 48 8b 05 f0 a6 bc 00 48 63 d2 4c 89 e7 48 03 3c d0 e8 83 dd 00 00 [ 1.281531] 8b 68 10 44 89 e6 48 89 ef 2b 75 18 e8 e4 f1 ff ff 8b 05 fd [ 1.281531] RIP [<ffffffff810b3ac7>] free_percpu+0x9a/0x104 [ 1.281531] RSP <ffff88001383fd50> [ 1.281531] CR2: 0000000000000010 [ 1.281531] ---[ end trace 68cbc23dfe2fe62a ]--- I don't have time today to dig into it, sorry. On Fri, 2011-11-11 at 18:32 +0530, Krishna Kumar wrote:> This patch series resurrects the earlier multiple TX/RX queues > functionality for virtio_net, and addresses the issues pointed > out. It also includes an API to share irq's, f.e. amongst the > TX vqs. > > I plan to run TCP/UDP STREAM and RR tests for local->host and > local->remote, and send the results in the next couple of days. > > > patch #1: Introduce VIRTIO_NET_F_MULTIQUEUE > patch #2: Move 'num_queues' to virtqueue > patch #3: virtio_net driver changes > patch #4: vhost_net changes > patch #5: Implement find_vqs_irq() > patch #6: Convert virtio_net driver to use find_vqs_irq() > > > Changes from rev2: > Michael: > ------- > 1. Added functions to handle setting RX/TX/CTRL vq's. > 2. num_queue_pairs instead of numtxqs. > 3. Experimental support for fewer irq's in find_vqs. > > Rusty: > ------ > 4. Cleaned up some existing "while (1)". > 5. rvq/svq and rx_sg/tx_sg changed to vq and sg respectively. > 6. Cleaned up some "#if 1" code. > > > Issue when using patch5: > ------------------------- > > The new API is designed to minimize code duplication. E.g. > vp_find_vqs() is implemented as: > > static int vp_find_vqs(...) > { > return vp_find_vqs_irq(vdev, nvqs, vqs, callbacks, names, NULL); > } > > In my testing, when multiple tx/rx is used with multiple netperf > sessions, all the device tx queues stops a few thousand times and > subsequently woken up by skb_xmit_done. But after some 40K-50K > iterations of stop/wake, some of the txq's stop and no wake > interrupt comes. (modprobe -r followed by modprobe solves this, so > it is not a system hang). At the time of the hang (#txqs=#rxqs=4): > > # egrep "CPU|virtio0" /proc/interrupts | grep -v config > CPU0 CPU1 CPU2 CPU3 > 41: 49057 49262 48828 49421 PCI-MSI-edge virtio0-input.0 > 42: 5066 5213 5221 5109 PCI-MSI-edge virtio0-output.0 > 43: 43380 43770 43007 43148 PCI-MSI-edge virtio0-input.1 > 44: 41433 41727 42101 41175 PCI-MSI-edge virtio0-input.2 > 45: 38465 37629 38468 38768 PCI-MSI-edge virtio0-input.3 > > # tc -s qdisc show dev eth0 > qdisc mq 0: root > Sent 393196939897 bytes 271191624 pkt (dropped 59897, > overlimits 0 requeues 67156) backlog 25375720b 1601p > requeues 67156 > > I am not sure if patch #5 is responsible for the hang. Also, without > patch #5/patch #6, I changed vp_find_vqs() to: > static int vp_find_vqs(...) > { > return vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, > false, false); > } > No packets were getting TX'd with this change when #txqs>1. This is > with the MQ-only patch that doesn't touch drivers/virtio/ directory. > > Also, the MQ patch works reasonably well with 2 vectors - with > use_msix=1 and per_vq_vectors=0 in vp_find_vqs(). > > Patch against net-next - please review. > > Signed-off-by: krkumar2 at in.ibm.com > --- > > -- > To unsubscribe from this list: send the line "unsubscribe kvm" in > the body of a message to majordomo at vger.kernel.org > More majordomo info at http://vger.kernel.org/majordomo-info.html-- Sasha.
Krishna Kumar
2011-Nov-12 05:45 UTC
[RFC] [ver3 PATCH 0/6] Implement multiqueue virtio-net
Sasha Levin <levinsasha928 at gmail.com> wrote on 11/12/2011 03:32:04 AM:> I'm seeing this BUG() sometimes when running it using a small patch I > did for KVM tool: > > [ 1.281531] Call Trace: > [ 1.281531] [<ffffffff8138a0e5>] ? free_rq_sq+0x2c/0xce > [ 1.281531] [<ffffffff8138bb63>] ? virtnet_probe+0x81c/0x855 > [ 1.281531] [<ffffffff8129c9e7>] ? virtio_dev_probe+0xa7/0xc6 > [ 1.281531] [<ffffffff8134d2c3>] ? driver_probe_device+0xb2/0x142 > [ 1.281531] [<ffffffff8134d3a2>] ? __driver_attach+0x4f/0x6f > [ 1.281531] [<ffffffff8134d353>] ? driver_probe_device+0x142/0x142 > [ 1.281531] [<ffffffff8134c3ab>] ? bus_for_each_dev+0x47/0x72 > [ 1.281531] [<ffffffff8134c90d>] ? bus_add_driver+0xa2/0x1e6 > [ 1.281531] [<ffffffff81cc1b36>] ? tun_init+0x89/0x89 > [ 1.281531] [<ffffffff8134db59>] ? driver_register+0x8d/0xf8 > [ 1.281531] [<ffffffff81cc1b36>] ? tun_init+0x89/0x89 > [ 1.281531] [<ffffffff81c98ac1>] ? do_one_initcall+0x78/0x130 > [ 1.281531] [<ffffffff81c98c0e>] ? kernel_init+0x95/0x113 > [ 1.281531] [<ffffffff81658274>] ? kernel_thread_helper+0x4/0x10 > [ 1.281531] [<ffffffff81c98b79>] ? do_one_initcall+0x130/0x130 > [ 1.281531] [<ffffffff81658270>] ? gs_change+0x13/0x13 > [ 1.281531] Code: c2 85 d2 48 0f 45 2d d1 39 ce 00 eb 22 65 8b 14 25 > 90 cc 00 00 48 8b 05 f0 a6 bc 00 48 63 d2 4c 89 e7 48 03 3c d0 e8 83 dd > 00 00 > [ 1.281531] 8b 68 10 44 89 e6 48 89 ef 2b 75 18 e8 e4 f1 ff ff 8b 05 > fd > [ 1.281531] RIP [<ffffffff810b3ac7>] free_percpu+0x9a/0x104 > [ 1.281531] RSP <ffff88001383fd50> > [ 1.281531] CR2: 0000000000000010 > [ 1.281531] ---[ end trace 68cbc23dfe2fe62a ]--- > > I don't have time today to dig into it, sorry.Thanks for the report. free_rq_sq() was being called twice in the failure path. The second call panic'd since it had freed the same pointers earlier. 1. free_rq_sq() was being called twice in the failure path. virtnet_setup_vqs() had already freed up rq/sq on error, and virtnet_probe() tried to do it again. Fix it in virtnet_probe by moving the call up. 2. Make free_rq_sq() re-entrant by setting freed pointers to NULL. 3. Remove free_stats() as it was being called only once. Sasha, could you please try this patch on top of existing patches? thanks! Signed-off-by: krkumar2 at in.ibm.com --- drivers/net/virtio_net.c | 41 +++++++++++-------------------------- 1 file changed, 13 insertions(+), 28 deletions(-) diff -ruNp n6/drivers/net/virtio_net.c n7/drivers/net/virtio_net.c --- n6/drivers/net/virtio_net.c 2011-11-12 11:03:48.000000000 +0530 +++ n7/drivers/net/virtio_net.c 2011-11-12 10:39:28.000000000 +0530 @@ -782,23 +782,6 @@ static void virtnet_netpoll(struct net_d } #endif -static void free_stats(struct virtnet_info *vi) -{ - int i; - - for (i = 0; i < vi->num_queue_pairs; i++) { - if (vi->sq && vi->sq[i]) { - free_percpu(vi->sq[i]->stats); - vi->sq[i]->stats = NULL; - } - - if (vi->rq && vi->rq[i]) { - free_percpu(vi->rq[i]->stats); - vi->rq[i]->stats = NULL; - } - } -} - static int virtnet_open(struct net_device *dev) { struct virtnet_info *vi = netdev_priv(dev); @@ -1054,19 +1037,22 @@ static void free_rq_sq(struct virtnet_in { int i; - free_stats(vi); - - if (vi->rq) { - for (i = 0; i < vi->num_queue_pairs; i++) + for (i = 0; i < vi->num_queue_pairs; i++) { + if (vi->rq && vi->rq[i]) { + free_percpu(vi->rq[i]->stats); kfree(vi->rq[i]); - kfree(vi->rq); - } + vi->rq[i] = NULL; + } - if (vi->sq) { - for (i = 0; i < vi->num_queue_pairs; i++) + if (vi->sq && vi->sq[i]) { + free_percpu(vi->sq[i]->stats); kfree(vi->sq[i]); - kfree(vi->sq); + vi->sq[i] = NULL; + } } + + kfree(vi->rq); + kfree(vi->sq); } static void free_unused_bufs(struct virtnet_info *vi) @@ -1387,10 +1373,9 @@ free_vqs: for (i = 0; i < num_queue_pairs; i++) cancel_delayed_work_sync(&vi->rq[i]->refill); vdev->config->del_vqs(vdev); - -free_netdev: free_rq_sq(vi); +free_netdev: free_netdev(dev); return err; }
Michael S. Tsirkin
2011-Nov-13 11:40 UTC
[RFC] [ver3 PATCH 0/6] Implement multiqueue virtio-net
On Fri, Nov 11, 2011 at 06:32:23PM +0530, Krishna Kumar wrote:> This patch series resurrects the earlier multiple TX/RX queues > functionality for virtio_net, and addresses the issues pointed > out.Some general questions/issues with the approach this patchset takes: 1. Lack of host-guest synchronization for flow hash. On the host side, things will scale if the same vhost thread handles both transmit and receive for a specific flow. Further, things will scale if packets from distinct guest queues get routed to distict queues on the NIC and tap devices in the host. It seems that to achieve both, host and guest need to pass the flow hash information to each other. Ben Hutchings suggested effectively pushing the guest's RFS socket map out to the host. Any thoughts on this? 2. Reduced batching/increased number of exits. It's easy to see that the amount of work per VQ is reduced with this patch. Thus it's easy to imagine that under some workloads, where we previously had X packets per VM exit/interrupt, we'll now have X/N with N the number of virtqueues. Since both a VM exit and an interrupt are expensive operations, one wonders whether this can lead to performance regressions. It seems that to reduce the chance of such, some adaptive strategy would work better. But how would we ensure packets aren't reordered then? Any thoughts? 3. Lack of userspace resource control. A vhost-net device already uses quite a lot of resources. This patch seems to make the problem worse. At the moment, management can to some level control that by using a file descriptor per virtio device. So using a file descriptor per VQ has an advantage of limiting the amount of resources qemu can consume. In April, Jason posted a qemu patch that supported a multiqueue guest by using existing vhost interfaces, by opening multiple devices, one per queue. It seems that this can be improved upon, if we allow e.g. sharing of memory maps between file descriptors. This might also make adaptive queueing strategies possible. Would it be possible to do this instead?> It also includes an API to share irq's, f.e. amongst the > TX vqs. > I plan to run TCP/UDP STREAM and RR tests for local->host and > local->remote, and send the results in the next couple of days.Please do. Small message throughput would be especially interesting.> patch #1: Introduce VIRTIO_NET_F_MULTIQUEUE > patch #2: Move 'num_queues' to virtqueue > patch #3: virtio_net driver changes > patch #4: vhost_net changes > patch #5: Implement find_vqs_irq() > patch #6: Convert virtio_net driver to use find_vqs_irq() > > > Changes from rev2: > Michael: > ------- > 1. Added functions to handle setting RX/TX/CTRL vq's. > 2. num_queue_pairs instead of numtxqs. > 3. Experimental support for fewer irq's in find_vqs. > > Rusty: > ------ > 4. Cleaned up some existing "while (1)". > 5. rvq/svq and rx_sg/tx_sg changed to vq and sg respectively. > 6. Cleaned up some "#if 1" code. > > > Issue when using patch5: > ------------------------- > > The new API is designed to minimize code duplication. E.g. > vp_find_vqs() is implemented as: > > static int vp_find_vqs(...) > { > return vp_find_vqs_irq(vdev, nvqs, vqs, callbacks, names, NULL); > } > > In my testing, when multiple tx/rx is used with multiple netperf > sessions, all the device tx queues stops a few thousand times and > subsequently woken up by skb_xmit_done. But after some 40K-50K > iterations of stop/wake, some of the txq's stop and no wake > interrupt comes. (modprobe -r followed by modprobe solves this, so > it is not a system hang). At the time of the hang (#txqs=#rxqs=4): > > # egrep "CPU|virtio0" /proc/interrupts | grep -v config > CPU0 CPU1 CPU2 CPU3 > 41: 49057 49262 48828 49421 PCI-MSI-edge virtio0-input.0 > 42: 5066 5213 5221 5109 PCI-MSI-edge virtio0-output.0 > 43: 43380 43770 43007 43148 PCI-MSI-edge virtio0-input.1 > 44: 41433 41727 42101 41175 PCI-MSI-edge virtio0-input.2 > 45: 38465 37629 38468 38768 PCI-MSI-edge virtio0-input.3 > > # tc -s qdisc show dev eth0 > qdisc mq 0: root > Sent 393196939897 bytes 271191624 pkt (dropped 59897, > overlimits 0 requeues 67156) backlog 25375720b 1601p > requeues 67156 > > I am not sure if patch #5 is responsible for the hang. Also, without > patch #5/patch #6, I changed vp_find_vqs() to: > static int vp_find_vqs(...) > { > return vp_try_to_find_vqs(vdev, nvqs, vqs, callbacks, names, > false, false); > } > No packets were getting TX'd with this change when #txqs>1. This is > with the MQ-only patch that doesn't touch drivers/virtio/ directory. > > Also, the MQ patch works reasonably well with 2 vectors - with > use_msix=1 and per_vq_vectors=0 in vp_find_vqs(). > > Patch against net-next - please review. > > Signed-off-by: krkumar2 at in.ibm.com > ---
Michael S. Tsirkin
2011-Nov-13 17:48 UTC
[PATCH RFC] ndo: ndo_queue_xmit/ndo_flush_xmit (was Re: [RFC] [ver3 PATCH 0/6] Implement multiqueue virtio-net)
On Fri, Nov 11, 2011 at 06:32:23PM +0530, Krishna Kumar wrote:> This patch series resurrects the earlier multiple TX/RX queues > functionality for virtio_net, and addresses the issues pointed > out.I'm guessing the biggest source of contention for transmit is keeping the TX hardware lock across VM exit. I wonder whether, for transmit, it's not a good idea to pass all traffic through a single queue to improve batching, and then if necessary multiplex it out on the host. The following is a stub at supporting this in the guest - it needs to be split up, and cleaned up, and I'm not sure about the trick of returning NETDEV_TX_QUEUED, but should give you the idea. Compile-tested only, sent out for early flames/feedback. This is on top of Rusty's unlocked kick patches. ----> - add optional ndo_queue_xmit/ndo_flush_xmit netdev ops - ndo_queue_xmit can transmit skb and return NETDEV_TX_OK, or it can return NETDEV_TX_QUEUED to signal that the skb was queued and ndo_flush_xmit needs to be called to actually transmit it. The point is to improve batching by calling ndo_flush_xmit once after multiple ndo_queue_xmit calls, and reduce lock contention by calling ndo_flush_xmit outside any locks. Signed-off-by: Michael S. Tsirkin <mst at redhat.com> Compile-tested only. --- diff --git a/include/linux/netdevice.h b/include/linux/netdevice.h index cbeb586..a7df098 100644 --- a/include/linux/netdevice.h +++ b/include/linux/netdevice.h @@ -105,6 +105,7 @@ struct wireless_dev; enum netdev_tx { __NETDEV_TX_MIN = INT_MIN, /* make sure enum is signed */ NETDEV_TX_OK = 0x00, /* driver took care of packet */ + NETDEV_TX_QUEUED = 0x04, /* queued, need to flush */ NETDEV_TX_BUSY = 0x10, /* driver tx path was busy*/ NETDEV_TX_LOCKED = 0x20, /* driver tx lock was already taken */ }; @@ -712,6 +713,14 @@ struct netdev_tc_txq { * Must return NETDEV_TX_OK , NETDEV_TX_BUSY. * (can also return NETDEV_TX_LOCKED iff NETIF_F_LLTX) * Required can not be NULL. + * netdev_tx_t (*ndo_queue_xmit)(struct sk_buff *skb, + * struct net_device *dev); + * Same as ndo_start_xmit but allows batching packet transmission. + * Must return NETDEV_TX_QUEUED , NETDEV_TX_OK , NETDEV_TX_BUSY. + * (can also return NETDEV_TX_LOCKED iff NETIF_F_LLTX) + * + * void (*ndo_flush_xmit)(struct net_device *dev); + * Called after queueing a batch of packets. * * u16 (*ndo_select_queue)(struct net_device *dev, struct sk_buff *skb); * Called to decide which queue to when device supports multiple @@ -863,6 +872,9 @@ struct net_device_ops { int (*ndo_stop)(struct net_device *dev); netdev_tx_t (*ndo_start_xmit) (struct sk_buff *skb, struct net_device *dev); + netdev_tx_t (*ndo_queue_xmit)(struct sk_buff *skb, + struct net_device *dev); + void (*ndo_flush_xmit)(struct net_device *dev); u16 (*ndo_select_queue)(struct net_device *dev, struct sk_buff *skb); void (*ndo_change_rx_flags)(struct net_device *dev, @@ -2099,6 +2111,10 @@ extern int dev_set_mac_address(struct net_device *, extern int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev, struct netdev_queue *txq); +extern int dev_queue_start_xmit(struct sk_buff *skb, + struct net_device *dev, + struct netdev_queue *txq); +extern void dev_flush_start_xmit(struct net_device *dev); extern int dev_forward_skb(struct net_device *dev, struct sk_buff *skb); diff --git a/net/core/dev.c b/net/core/dev.c index 6ba50a1..608b67c 100644 --- a/net/core/dev.c +++ b/net/core/dev.c @@ -2167,8 +2167,8 @@ static inline int skb_needs_linearize(struct sk_buff *skb, !(features & NETIF_F_SG))); } -int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev, - struct netdev_queue *txq) +static int __dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev, + struct netdev_queue *txq, bool queue) { const struct net_device_ops *ops = dev->netdev_ops; int rc = NETDEV_TX_OK; @@ -2224,9 +2224,12 @@ int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev, } skb_len = skb->len; - rc = ops->ndo_start_xmit(skb, dev); + if (queue && ops->ndo_queue_xmit) + rc = ops->ndo_queue_xmit(skb, dev); + else + rc = ops->ndo_start_xmit(skb, dev); trace_net_dev_xmit(skb, rc, dev, skb_len); - if (rc == NETDEV_TX_OK) + if (rc == NETDEV_TX_OK || rc == NETDEV_TX_QUEUED) txq_trans_update(txq); return rc; } @@ -2246,9 +2249,12 @@ gso: skb_dst_drop(nskb); skb_len = nskb->len; - rc = ops->ndo_start_xmit(nskb, dev); + if (queue && ops->ndo_queue_xmit) + rc = ops->ndo_queue_xmit(nskb, dev); + else + rc = ops->ndo_start_xmit(nskb, dev); trace_net_dev_xmit(nskb, rc, dev, skb_len); - if (unlikely(rc != NETDEV_TX_OK)) { + if (unlikely(rc != NETDEV_TX_OK && rc != NETDEV_TX_QUEUED)) { if (rc & ~NETDEV_TX_MASK) goto out_kfree_gso_skb; nskb->next = skb->next; @@ -2269,6 +2275,25 @@ out: return rc; } +int dev_hard_start_xmit(struct sk_buff *skb, struct net_device *dev, + struct netdev_queue *txq) +{ + return __dev_hard_start_xmit(skb, dev, txq, false); +} + +int dev_queue_start_xmit(struct sk_buff *skb, struct net_device *dev, + struct netdev_queue *txq) +{ + return __dev_hard_start_xmit(skb, dev, txq, true); +} + +void dev_flush_start_xmit(struct net_device *dev) +{ + const struct net_device_ops *ops = dev->netdev_ops; + if (ops->ndo_flush_xmit) + ops->ndo_flush_xmit(dev); +} + static u32 hashrnd __read_mostly; /* diff --git a/net/sched/sch_generic.c b/net/sched/sch_generic.c index 69fca27..83b3758 100644 --- a/net/sched/sch_generic.c +++ b/net/sched/sch_generic.c @@ -102,18 +102,9 @@ static inline int handle_dev_cpu_collision(struct sk_buff *skb, return ret; } -/* - * Transmit one skb, and handle the return status as required. Holding the - * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute this - * function. - * - * Returns to the caller: - * 0 - queue is empty or throttled. - * >0 - queue is not empty. - */ -int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, +static int __sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, struct net_device *dev, struct netdev_queue *txq, - spinlock_t *root_lock) + spinlock_t *root_lock, bool *queued) { int ret = NETDEV_TX_BUSY; @@ -122,10 +113,13 @@ int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, HARD_TX_LOCK(dev, txq, smp_processor_id()); if (!netif_tx_queue_frozen_or_stopped(txq)) - ret = dev_hard_start_xmit(skb, dev, txq); + ret = dev_queue_start_xmit(skb, dev, txq); HARD_TX_UNLOCK(dev, txq); + if (ret == NETDEV_TX_QUEUED) + *queued = true; + spin_lock(root_lock); if (dev_xmit_complete(ret)) { @@ -150,6 +144,30 @@ int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, } /* + * Transmit one skb, and handle the return status as required. Holding the + * __QDISC_STATE_RUNNING bit guarantees that only one CPU can execute this + * function. + * + * Returns to the caller: + * 0 - queue is empty or throttled. + * >0 - queue is not empty. + */ +int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, + struct net_device *dev, struct netdev_queue *txq, + spinlock_t *root_lock) +{ + bool queued = false; + int ret; + ret = __sch_direct_xmit(skb, q, dev, txq, root_lock, &queued); + if (queued) { + spin_unlock(root_lock); + dev_flush_start_xmit(dev); + spin_lock(root_lock); + } + return ret; +} + +/* * NOTE: Called under qdisc_lock(q) with locally disabled BH. * * __QDISC_STATE_RUNNING guarantees only one CPU can process @@ -168,7 +186,7 @@ int sch_direct_xmit(struct sk_buff *skb, struct Qdisc *q, * >0 - queue is not empty. * */ -static inline int qdisc_restart(struct Qdisc *q) +static inline int qdisc_restart(struct Qdisc *q, bool *queued) { struct netdev_queue *txq; struct net_device *dev; @@ -184,14 +202,22 @@ static inline int qdisc_restart(struct Qdisc *q) dev = qdisc_dev(q); txq = netdev_get_tx_queue(dev, skb_get_queue_mapping(skb)); - return sch_direct_xmit(skb, q, dev, txq, root_lock); + return __sch_direct_xmit(skb, q, dev, txq, root_lock, queued); +} + +static inline void qdisc_flush_start(struct Qdisc *q) +{ + spin_unlock(qdisc_lock(q)); + dev_flush_start_xmit(qdisc_dev(q)); + spin_lock(qdisc_lock(q)); } void __qdisc_run(struct Qdisc *q) { int quota = weight_p; + bool queued = false; - while (qdisc_restart(q)) { + while (qdisc_restart(q, &queued)) { /* * Ordered by possible occurrence: Postpone processing if * 1. we've exceeded packet quota @@ -203,6 +229,9 @@ void __qdisc_run(struct Qdisc *q) } } + if (queued) + qdisc_flush_start(q); + qdisc_run_end(q); } diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c index d6f6f40..6d28c26 100644 --- a/drivers/net/virtio_net.c +++ b/drivers/net/virtio_net.c @@ -604,9 +604,11 @@ static int xmit_skb(struct virtnet_info *vi, struct sk_buff *skb) 0, skb, GFP_ATOMIC); } -static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev) +static netdev_tx_t __start_xmit(struct sk_buff *skb, struct net_device *dev, + bool queue) { struct virtnet_info *vi = netdev_priv(dev); + bool notify; int capacity; /* Free up any pending old buffers before queueing new ones. */ @@ -632,7 +634,12 @@ static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev) kfree_skb(skb); return NETDEV_TX_OK; } - virtqueue_kick(vi->svq); + + notify = virtqueue_kick_prepare(vi->svq); + if (!queue && notify) { + virtqueue_notify(vi->svq); + notify = false; + } /* Don't wait up for transmitted skbs to be freed. */ skb_orphan(skb); @@ -652,7 +659,23 @@ static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev) } } - return NETDEV_TX_OK; + return notify ? NETDEV_TX_QUEUED : NETDEV_TX_OK; +} + +static netdev_tx_t start_xmit(struct sk_buff *skb, struct net_device *dev) +{ + return __start_xmit(skb, dev, false); +} + +static netdev_tx_t queue_xmit(struct sk_buff *skb, struct net_device *dev) +{ + return __start_xmit(skb, dev, true); +} + +static void flush_xmit(struct net_device *dev) +{ + struct virtnet_info *vi = netdev_priv(dev); + virtqueue_notify(vi->svq); } static int virtnet_set_mac_address(struct net_device *dev, void *p) @@ -909,6 +932,8 @@ static const struct net_device_ops virtnet_netdev = { .ndo_open = virtnet_open, .ndo_stop = virtnet_close, .ndo_start_xmit = start_xmit, + .ndo_queue_xmit = queue_xmit, + .ndo_flush_xmit = flush_xmit, .ndo_validate_addr = eth_validate_addr, .ndo_set_mac_address = virtnet_set_mac_address, .ndo_set_rx_mode = virtnet_set_rx_mode,