On Fri, Apr 13, 2018 at 12:30:24PM +0800, Jason Wang wrote:> On 2018?04?01? 22:12, Tiwei Bie wrote: > > Hello everyone, > > > > This RFC implements packed ring support for virtio driver. > > > > The code was tested with DPDK vhost (testpmd/vhost-PMD) implemented > > by Jens at http://dpdk.org/ml/archives/dev/2018-January/089417.html > > Minor changes are needed for the vhost code, e.g. to kick the guest. > > > > TODO: > > - Refinements and bug fixes; > > - Split into small patches; > > - Test indirect descriptor support; > > - Test/fix event suppression support; > > - Test devices other than net; > > > > RFC v1 -> RFC v2: > > - Add indirect descriptor support - compile test only; > > - Add event suppression supprt - compile test only; > > - Move vring_packed_init() out of uapi (Jason, MST); > > - Merge two loops into one in virtqueue_add_packed() (Jason); > > - Split vring_unmap_one() for packed ring and split ring (Jason); > > - Avoid using '%' operator (Jason); > > - Rename free_head -> next_avail_idx (Jason); > > - Add comments for virtio_wmb() in virtqueue_add_packed() (Jason); > > - Some other refinements and bug fixes; > > > > Thanks! > > > > Signed-off-by: Tiwei Bie <tiwei.bie at intel.com> > > --- > > drivers/virtio/virtio_ring.c | 1094 +++++++++++++++++++++++++++++------- > > include/linux/virtio_ring.h | 8 +- > > include/uapi/linux/virtio_config.h | 12 +- > > include/uapi/linux/virtio_ring.h | 61 ++ > > 4 files changed, 980 insertions(+), 195 deletions(-)[...]> > +static struct vring_packed_desc *alloc_indirect_packed(struct virtqueue *_vq, > > + unsigned int total_sg, > > + gfp_t gfp) > > +{ > > + struct vring_packed_desc *desc; > > + > > + /* > > + * We require lowmem mappings for the descriptors because > > + * otherwise virt_to_phys will give us bogus addresses in the > > + * virtqueue. > > + */ > > + gfp &= ~__GFP_HIGHMEM; > > + > > + desc = kmalloc(total_sg * sizeof(struct vring_packed_desc), gfp); > > Can we simply check vq->packed here to avoid duplicating helpers?Then it would be something like this: static void *alloc_indirect(struct virtqueue *_vq, unsigned int total_sg, gfp_t gfp) { struct vring_virtqueue *vq = to_vvq(_vq); void *data; /* * We require lowmem mappings for the descriptors because * otherwise virt_to_phys will give us bogus addresses in the * virtqueue. */ gfp &= ~__GFP_HIGHMEM; if (vq->packed) { data = kmalloc(total_sg * sizeof(struct vring_packed_desc), gfp); if (!data) return NULL; } else { struct vring_desc *desc; unsigned int i; desc = kmalloc(total_sg * sizeof(struct vring_desc), gfp); if (!desc) return NULL; for (i = 0; i < total_sg; i++) desc[i].next = cpu_to_virtio16(_vq->vdev, i + 1); data = desc; } return data; } I would prefer to have two simpler helpers (and to the callers, it's already very clear about which one they should call), i.e. the current implementation: static struct vring_packed_desc *alloc_indirect_packed(struct virtqueue *_vq, unsigned int total_sg, gfp_t gfp) { struct vring_packed_desc *desc; /* * We require lowmem mappings for the descriptors because * otherwise virt_to_phys will give us bogus addresses in the * virtqueue. */ gfp &= ~__GFP_HIGHMEM; desc = kmalloc(total_sg * sizeof(struct vring_packed_desc), gfp); return desc; } static struct vring_desc *alloc_indirect_split(struct virtqueue *_vq, unsigned int total_sg, gfp_t gfp) { struct vring_desc *desc; unsigned int i; /* * We require lowmem mappings for the descriptors because * otherwise virt_to_phys will give us bogus addresses in the * virtqueue. */ gfp &= ~__GFP_HIGHMEM; desc = kmalloc(total_sg * sizeof(struct vring_desc), gfp); if (!desc) return NULL; for (i = 0; i < total_sg; i++) desc[i].next = cpu_to_virtio16(_vq->vdev, i + 1); return desc; }> > > + > > + return desc; > > +}[...]> > +static inline int virtqueue_add_packed(struct virtqueue *_vq, > > + struct scatterlist *sgs[], > > + unsigned int total_sg, > > + unsigned int out_sgs, > > + unsigned int in_sgs, > > + void *data, > > + void *ctx, > > + gfp_t gfp) > > +{ > > + struct vring_virtqueue *vq = to_vvq(_vq); > > + struct vring_packed_desc *desc; > > + struct scatterlist *sg; > > + unsigned int i, n, descs_used, uninitialized_var(prev), err_idx; > > + __virtio16 uninitialized_var(head_flags), flags; > > + int head, wrap_counter; > > + bool indirect; > > + > > + START_USE(vq); > > + > > + BUG_ON(data == NULL); > > + BUG_ON(ctx && vq->indirect); > > + > > + if (unlikely(vq->broken)) { > > + END_USE(vq); > > + return -EIO; > > + } > > + > > +#ifdef DEBUG > > + { > > + ktime_t now = ktime_get(); > > + > > + /* No kick or get, with .1 second between? Warn. */ > > + if (vq->last_add_time_valid) > > + WARN_ON(ktime_to_ms(ktime_sub(now, vq->last_add_time)) > > + > 100); > > + vq->last_add_time = now; > > + vq->last_add_time_valid = true; > > + } > > +#endif > > + > > + BUG_ON(total_sg == 0); > > + > > + head = vq->next_avail_idx; > > + wrap_counter = vq->wrap_counter; > > + > > + /* If the host supports indirect descriptor tables, and we have multiple > > + * buffers, then go indirect. FIXME: tune this threshold */ > > + if (vq->indirect && total_sg > 1 && vq->vq.num_free) > > Let's introduce a helper like virtqueue_need_indirect() to avoid duplicating > codes and FIXME.Okay.> > > + desc = alloc_indirect_packed(_vq, total_sg, gfp); > > + else { > > + desc = NULL; > > + WARN_ON_ONCE(total_sg > vq->vring_packed.num && !vq->indirect); > > + } > > + > > + if (desc) { > > + /* Use a single buffer which doesn't continue */ > > + indirect = true; > > + /* Set up rest to use this indirect table. */ > > + i = 0; > > + descs_used = 1; > > + } else { > > + indirect = false; > > + desc = vq->vring_packed.desc; > > + i = head; > > + descs_used = total_sg; > > + } > > + > > + if (vq->vq.num_free < descs_used) { > > + pr_debug("Can't add buf len %i - avail = %i\n", > > + descs_used, vq->vq.num_free); > > + /* FIXME: for historical reasons, we force a notify here if > > + * there are outgoing parts to the buffer. Presumably the > > + * host should service the ring ASAP. */ > > + if (out_sgs) > > + vq->notify(&vq->vq); > > + if (indirect) > > + kfree(desc); > > + END_USE(vq); > > + return -ENOSPC; > > + } > > + > > + for (n = 0; n < out_sgs + in_sgs; n++) { > > + for (sg = sgs[n]; sg; sg = sg_next(sg)) { > > + dma_addr_t addr = vring_map_one_sg(vq, sg, n < out_sgs ? > > + DMA_TO_DEVICE : DMA_FROM_DEVICE); > > + if (vring_mapping_error(vq, addr)) > > + goto unmap_release; > > + > > + flags = cpu_to_virtio16(_vq->vdev, VRING_DESC_F_NEXT | > > + (n < out_sgs ? 0 : VRING_DESC_F_WRITE) | > > + VRING_DESC_F_AVAIL(vq->wrap_counter) | > > + VRING_DESC_F_USED(!vq->wrap_counter)); > > + if (!indirect && i == head) > > + head_flags = flags; > > + else > > + desc[i].flags = flags; > > + > > + desc[i].addr = cpu_to_virtio64(_vq->vdev, addr); > > + desc[i].len = cpu_to_virtio32(_vq->vdev, sg->length); > > + desc[i].id = cpu_to_virtio32(_vq->vdev, head); > > Similar to V1, we only need this for the last descriptor.Okay, will just set it for the last desc.> > > + prev = i; > > It looks to me there's no need to track prev inside the loop here. > > > + i++; > > + if (!indirect && i >= vq->vring_packed.num) { > > + i = 0; > > + vq->wrap_counter ^= 1; > > + } > > + } > > + } > > + /* Last one doesn't continue. */ > > + if (total_sg == 1) > > + head_flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT); > > + else > > + desc[prev].flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT); > > The only case when prev != i - 1 is i == 0, we can add a if here.It's just a mirror of the existing implementation in split ring. It seems that split ring implementation needs this just because it's much harder for it to find the prev, which is not true for packed ring. So I'll take your suggestion. Thanks!>[...]> > +static bool virtqueue_kick_prepare_packed(struct virtqueue *_vq) > > +{ > > + struct vring_virtqueue *vq = to_vvq(_vq); > > + u16 new, old, off_wrap; > > + bool needs_kick; > > + > > + START_USE(vq); > > + /* We need to expose the new flags value before checking notification > > + * suppressions. */ > > + virtio_mb(vq->weak_barriers); > > + > > + old = vq->next_avail_idx - vq->num_added; > > + new = vq->next_avail_idx; > > + vq->num_added = 0; > > + > > +#ifdef DEBUG > > + if (vq->last_add_time_valid) { > > + WARN_ON(ktime_to_ms(ktime_sub(ktime_get(), > > + vq->last_add_time)) > 100); > > + } > > + vq->last_add_time_valid = false; > > +#endif > > + > > + off_wrap = virtio16_to_cpu(_vq->vdev, vq->vring_packed.device->off_wrap); > > + > > + if (vq->event) { > > It looks to me we should examine RING_EVENT_FLAGS_DESC in desc_event_flags > instead of vq->event here. Spec does not forces to use evenf_off and > event_wrap if event index is enabled. > > > + // FIXME: fix this! > > + needs_kick = ((off_wrap >> 15) == vq->wrap_counter) && > > + vring_need_event(off_wrap & ~(1<<15), new, old); > > Why need a & here?Because wrap_counter (the most significant bit in off_wrap) isn't part of the index.> > > + } else { > > Need a smp_rmb() to make sure desc_event_flags was checked before flags.I don't get your point, if my understanding is correct, desc_event_flags is vq->vring_packed.device->flags. So what's the "flags"?> > > + needs_kick = (vq->vring_packed.device->flags !> > + cpu_to_virtio16(_vq->vdev, VRING_EVENT_F_DISABLE)); > > + } > > + END_USE(vq); > > + return needs_kick; > > +}[...]> > +static int detach_buf_packed(struct vring_virtqueue *vq, unsigned int head, > > + void **ctx) > > +{ > > + struct vring_packed_desc *desc; > > + unsigned int i, j; > > + > > + /* Clear data ptr. */ > > + vq->desc_state[head].data = NULL; > > + > > + i = head; > > + > > + for (j = 0; j < vq->desc_state[head].num; j++) { > > + desc = &vq->vring_packed.desc[i]; > > + vring_unmap_one_packed(vq, desc); > > + desc->flags = 0x0; > > Looks like this is unnecessary.It's safer to zero it. If we don't zero it, after we call virtqueue_detach_unused_buf_packed() which calls this function, the desc is still available to the device.> > > + i++; > > + if (i >= vq->vring_packed.num) > > + i = 0; > > + }[...]> > +static unsigned virtqueue_enable_cb_prepare_packed(struct virtqueue *_vq) > > +{ > > + struct vring_virtqueue *vq = to_vvq(_vq); > > + u16 last_used_idx, wrap_counter, off_wrap; > > + > > + START_USE(vq); > > + > > + last_used_idx = vq->last_used_idx; > > + wrap_counter = vq->wrap_counter; > > + > > + if (last_used_idx > vq->next_avail_idx) > > + wrap_counter ^= 1; > > + > > + off_wrap = last_used_idx | (wrap_counter << 15); > > + > > + /* We optimistically turn back on interrupts, then check if there was > > + * more to do. */ > > + /* Depending on the VIRTIO_RING_F_EVENT_IDX feature, we need to > > + * either clear the flags bit or point the event index at the next > > + * entry. Always do both to keep code simple. */ > > + if (vq->event_flags_shadow == VRING_EVENT_F_DISABLE) { > > + vq->event_flags_shadow = vq->event ? VRING_EVENT_F_DESC: > > + VRING_EVENT_F_ENABLE; > > + vq->vring_packed.driver->flags = cpu_to_virtio16(_vq->vdev, > > + vq->event_flags_shadow); > > + } > > A smp_wmb() is missed here? > > > + vq->vring_packed.driver->off_wrap = cpu_to_virtio16(_vq->vdev, off_wrap); > > And according to the spec, it looks to me write a VRING_EVENT_F_ENABLE is > sufficient here.I didn't think much when implementing the event suppression for packed ring previously. After I saw your comments, I found something new. Indeed, unlike the split ring, for the packed ring, spec doesn't say we must use VRING_EVENT_F_DESC when EVENT_IDX is negotiated. So do you think below thought is right or makes sense? - For virtqueue_enable_cb_prepare(), we just need to enable the ring by setting flags to VRING_EVENT_F_ENABLE in any case. - We will try to use VRING_EVENT_F_DESC (if EVENT_IDX is negotiated) only when we want to delay the interrupts virtqueue_enable_cb_delayed().> > > + END_USE(vq); > > + return last_used_idx; > > +} > > +[...]> > @@ -1157,14 +1852,18 @@ void vring_transport_features(struct virtio_device *vdev) > > for (i = VIRTIO_TRANSPORT_F_START; i < VIRTIO_TRANSPORT_F_END; i++) { > > switch (i) { > > - case VIRTIO_RING_F_INDIRECT_DESC: > > +#if 0 > > + case VIRTIO_RING_F_INDIRECT_DESC: // FIXME not tested yet. > > break; > > - case VIRTIO_RING_F_EVENT_IDX: > > + case VIRTIO_RING_F_EVENT_IDX: // FIXME probably not work. > > break; > > +#endif > > It would be better if you can split EVENT_IDX and INDIRECT_DESC into > separate patches too.Sure. Will do it in the next version. Thanks for the review!> > Thanks >
On 2018?04?13? 15:15, Tiwei Bie wrote:> On Fri, Apr 13, 2018 at 12:30:24PM +0800, Jason Wang wrote: >> On 2018?04?01? 22:12, Tiwei Bie wrote: >>> Hello everyone, >>> >>> This RFC implements packed ring support for virtio driver. >>> >>> The code was tested with DPDK vhost (testpmd/vhost-PMD) implemented >>> by Jens at http://dpdk.org/ml/archives/dev/2018-January/089417.html >>> Minor changes are needed for the vhost code, e.g. to kick the guest. >>> >>> TODO: >>> - Refinements and bug fixes; >>> - Split into small patches; >>> - Test indirect descriptor support; >>> - Test/fix event suppression support; >>> - Test devices other than net; >>> >>> RFC v1 -> RFC v2: >>> - Add indirect descriptor support - compile test only; >>> - Add event suppression supprt - compile test only; >>> - Move vring_packed_init() out of uapi (Jason, MST); >>> - Merge two loops into one in virtqueue_add_packed() (Jason); >>> - Split vring_unmap_one() for packed ring and split ring (Jason); >>> - Avoid using '%' operator (Jason); >>> - Rename free_head -> next_avail_idx (Jason); >>> - Add comments for virtio_wmb() in virtqueue_add_packed() (Jason); >>> - Some other refinements and bug fixes; >>> >>> Thanks! >>> >>> Signed-off-by: Tiwei Bie <tiwei.bie at intel.com> >>> --- >>> drivers/virtio/virtio_ring.c | 1094 +++++++++++++++++++++++++++++------- >>> include/linux/virtio_ring.h | 8 +- >>> include/uapi/linux/virtio_config.h | 12 +- >>> include/uapi/linux/virtio_ring.h | 61 ++ >>> 4 files changed, 980 insertions(+), 195 deletions(-) > [...] >>> +static struct vring_packed_desc *alloc_indirect_packed(struct virtqueue *_vq, >>> + unsigned int total_sg, >>> + gfp_t gfp) >>> +{ >>> + struct vring_packed_desc *desc; >>> + >>> + /* >>> + * We require lowmem mappings for the descriptors because >>> + * otherwise virt_to_phys will give us bogus addresses in the >>> + * virtqueue. >>> + */ >>> + gfp &= ~__GFP_HIGHMEM; >>> + >>> + desc = kmalloc(total_sg * sizeof(struct vring_packed_desc), gfp); >> Can we simply check vq->packed here to avoid duplicating helpers? > Then it would be something like this: > > static void *alloc_indirect(struct virtqueue *_vq, unsigned int total_sg, > gfp_t gfp) > { > struct vring_virtqueue *vq = to_vvq(_vq); > void *data; > > /* > * We require lowmem mappings for the descriptors because > * otherwise virt_to_phys will give us bogus addresses in the > * virtqueue. > */ > gfp &= ~__GFP_HIGHMEM; > > if (vq->packed) { > data = kmalloc(total_sg * sizeof(struct vring_packed_desc), > gfp); > if (!data) > return NULL; > } else { > struct vring_desc *desc; > unsigned int i; > > desc = kmalloc(total_sg * sizeof(struct vring_desc), gfp); > if (!desc) > return NULL; > > for (i = 0; i < total_sg; i++) > desc[i].next = cpu_to_virtio16(_vq->vdev, i + 1); > > data = desc; > } > > return data; > } > > I would prefer to have two simpler helpers (and to the callers, > it's already very clear about which one they should call), i.e. > the current implementation: > > static struct vring_packed_desc *alloc_indirect_packed(struct virtqueue *_vq, > unsigned int total_sg, > gfp_t gfp) > { > struct vring_packed_desc *desc; > > /* > * We require lowmem mappings for the descriptors because > * otherwise virt_to_phys will give us bogus addresses in the > * virtqueue. > */ > gfp &= ~__GFP_HIGHMEM; > > desc = kmalloc(total_sg * sizeof(struct vring_packed_desc), gfp); > > return desc; > } > > static struct vring_desc *alloc_indirect_split(struct virtqueue *_vq, > unsigned int total_sg, > gfp_t gfp) > { > struct vring_desc *desc; > unsigned int i; > > /* > * We require lowmem mappings for the descriptors because > * otherwise virt_to_phys will give us bogus addresses in the > * virtqueue. > */ > gfp &= ~__GFP_HIGHMEM; > > desc = kmalloc(total_sg * sizeof(struct vring_desc), gfp); > if (!desc) > return NULL; > > for (i = 0; i < total_sg; i++) > desc[i].next = cpu_to_virtio16(_vq->vdev, i + 1); > return desc; > }Yeah, I miss that split version needs a desc list.> >>> + >>> + return desc; >>> +} > [...] >>> +static inline int virtqueue_add_packed(struct virtqueue *_vq, >>> + struct scatterlist *sgs[], >>> + unsigned int total_sg, >>> + unsigned int out_sgs, >>> + unsigned int in_sgs, >>> + void *data, >>> + void *ctx, >>> + gfp_t gfp) >>> +{ >>> + struct vring_virtqueue *vq = to_vvq(_vq); >>> + struct vring_packed_desc *desc; >>> + struct scatterlist *sg; >>> + unsigned int i, n, descs_used, uninitialized_var(prev), err_idx; >>> + __virtio16 uninitialized_var(head_flags), flags; >>> + int head, wrap_counter; >>> + bool indirect; >>> + >>> + START_USE(vq); >>> + >>> + BUG_ON(data == NULL); >>> + BUG_ON(ctx && vq->indirect); >>> + >>> + if (unlikely(vq->broken)) { >>> + END_USE(vq); >>> + return -EIO; >>> + } >>> + >>> +#ifdef DEBUG >>> + { >>> + ktime_t now = ktime_get(); >>> + >>> + /* No kick or get, with .1 second between? Warn. */ >>> + if (vq->last_add_time_valid) >>> + WARN_ON(ktime_to_ms(ktime_sub(now, vq->last_add_time)) >>> + > 100); >>> + vq->last_add_time = now; >>> + vq->last_add_time_valid = true; >>> + } >>> +#endif >>> + >>> + BUG_ON(total_sg == 0); >>> + >>> + head = vq->next_avail_idx; >>> + wrap_counter = vq->wrap_counter; >>> + >>> + /* If the host supports indirect descriptor tables, and we have multiple >>> + * buffers, then go indirect. FIXME: tune this threshold */ >>> + if (vq->indirect && total_sg > 1 && vq->vq.num_free) >> Let's introduce a helper like virtqueue_need_indirect() to avoid duplicating >> codes and FIXME. > Okay. > >>> + desc = alloc_indirect_packed(_vq, total_sg, gfp); >>> + else { >>> + desc = NULL; >>> + WARN_ON_ONCE(total_sg > vq->vring_packed.num && !vq->indirect); >>> + } >>> + >>> + if (desc) { >>> + /* Use a single buffer which doesn't continue */ >>> + indirect = true; >>> + /* Set up rest to use this indirect table. */ >>> + i = 0; >>> + descs_used = 1; >>> + } else { >>> + indirect = false; >>> + desc = vq->vring_packed.desc; >>> + i = head; >>> + descs_used = total_sg; >>> + } >>> + >>> + if (vq->vq.num_free < descs_used) { >>> + pr_debug("Can't add buf len %i - avail = %i\n", >>> + descs_used, vq->vq.num_free); >>> + /* FIXME: for historical reasons, we force a notify here if >>> + * there are outgoing parts to the buffer. Presumably the >>> + * host should service the ring ASAP. */ >>> + if (out_sgs) >>> + vq->notify(&vq->vq); >>> + if (indirect) >>> + kfree(desc); >>> + END_USE(vq); >>> + return -ENOSPC; >>> + } >>> + >>> + for (n = 0; n < out_sgs + in_sgs; n++) { >>> + for (sg = sgs[n]; sg; sg = sg_next(sg)) { >>> + dma_addr_t addr = vring_map_one_sg(vq, sg, n < out_sgs ? >>> + DMA_TO_DEVICE : DMA_FROM_DEVICE); >>> + if (vring_mapping_error(vq, addr)) >>> + goto unmap_release; >>> + >>> + flags = cpu_to_virtio16(_vq->vdev, VRING_DESC_F_NEXT | >>> + (n < out_sgs ? 0 : VRING_DESC_F_WRITE) | >>> + VRING_DESC_F_AVAIL(vq->wrap_counter) | >>> + VRING_DESC_F_USED(!vq->wrap_counter)); >>> + if (!indirect && i == head) >>> + head_flags = flags; >>> + else >>> + desc[i].flags = flags; >>> + >>> + desc[i].addr = cpu_to_virtio64(_vq->vdev, addr); >>> + desc[i].len = cpu_to_virtio32(_vq->vdev, sg->length); >>> + desc[i].id = cpu_to_virtio32(_vq->vdev, head); >> Similar to V1, we only need this for the last descriptor. > Okay, will just set it for the last desc. > >>> + prev = i; >> It looks to me there's no need to track prev inside the loop here. >> >>> + i++; >>> + if (!indirect && i >= vq->vring_packed.num) { >>> + i = 0; >>> + vq->wrap_counter ^= 1; >>> + } >>> + } >>> + } >>> + /* Last one doesn't continue. */ >>> + if (total_sg == 1) >>> + head_flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT); >>> + else >>> + desc[prev].flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT); >> The only case when prev != i - 1 is i == 0, we can add a if here. > It's just a mirror of the existing implementation in split ring. > It seems that split ring implementation needs this just because > it's much harder for it to find the prev, which is not true for > packed ring. So I'll take your suggestion. Thanks! > > [...] >>> +static bool virtqueue_kick_prepare_packed(struct virtqueue *_vq) >>> +{ >>> + struct vring_virtqueue *vq = to_vvq(_vq); >>> + u16 new, old, off_wrap; >>> + bool needs_kick; >>> + >>> + START_USE(vq); >>> + /* We need to expose the new flags value before checking notification >>> + * suppressions. */ >>> + virtio_mb(vq->weak_barriers); >>> + >>> + old = vq->next_avail_idx - vq->num_added; >>> + new = vq->next_avail_idx; >>> + vq->num_added = 0; >>> + >>> +#ifdef DEBUG >>> + if (vq->last_add_time_valid) { >>> + WARN_ON(ktime_to_ms(ktime_sub(ktime_get(), >>> + vq->last_add_time)) > 100); >>> + } >>> + vq->last_add_time_valid = false; >>> +#endif >>> + >>> + off_wrap = virtio16_to_cpu(_vq->vdev, vq->vring_packed.device->off_wrap); >>> + >>> + if (vq->event) { >> It looks to me we should examine RING_EVENT_FLAGS_DESC in desc_event_flags >> instead of vq->event here. Spec does not forces to use evenf_off and >> event_wrap if event index is enabled. >> >>> + // FIXME: fix this! >>> + needs_kick = ((off_wrap >> 15) == vq->wrap_counter) && >>> + vring_need_event(off_wrap & ~(1<<15), new, old); >> Why need a & here? > Because wrap_counter (the most significant bit in off_wrap) > isn't part of the index. > >>> + } else { >> Need a smp_rmb() to make sure desc_event_flags was checked before flags. > I don't get your point, if my understanding is correct, > desc_event_flags is vq->vring_packed.device->flags. So > what's the "flags"?Sorry, I mean we need check device.flags before off_warp. So it needs an smp_rmb() in the middle. It looks to me there's no guarantee that VRING_EVENT_F_DESC is set if event index is supported.> >>> + needs_kick = (vq->vring_packed.device->flags !>>> + cpu_to_virtio16(_vq->vdev, VRING_EVENT_F_DISABLE)); >>> + } >>> + END_USE(vq); >>> + return needs_kick; >>> +} > [...] >>> +static int detach_buf_packed(struct vring_virtqueue *vq, unsigned int head, >>> + void **ctx) >>> +{ >>> + struct vring_packed_desc *desc; >>> + unsigned int i, j; >>> + >>> + /* Clear data ptr. */ >>> + vq->desc_state[head].data = NULL; >>> + >>> + i = head; >>> + >>> + for (j = 0; j < vq->desc_state[head].num; j++) { >>> + desc = &vq->vring_packed.desc[i]; >>> + vring_unmap_one_packed(vq, desc); >>> + desc->flags = 0x0; >> Looks like this is unnecessary. > It's safer to zero it. If we don't zero it, after we > call virtqueue_detach_unused_buf_packed() which calls > this function, the desc is still available to the > device.Well detach_unused_buf_packed() should be called after device is stopped, otherwise even if you try to clear, there will still be a window that device may use it.> >>> + i++; >>> + if (i >= vq->vring_packed.num) >>> + i = 0; >>> + } > [...] >>> +static unsigned virtqueue_enable_cb_prepare_packed(struct virtqueue *_vq) >>> +{ >>> + struct vring_virtqueue *vq = to_vvq(_vq); >>> + u16 last_used_idx, wrap_counter, off_wrap; >>> + >>> + START_USE(vq); >>> + >>> + last_used_idx = vq->last_used_idx; >>> + wrap_counter = vq->wrap_counter; >>> + >>> + if (last_used_idx > vq->next_avail_idx) >>> + wrap_counter ^= 1; >>> + >>> + off_wrap = last_used_idx | (wrap_counter << 15); >>> + >>> + /* We optimistically turn back on interrupts, then check if there was >>> + * more to do. */ >>> + /* Depending on the VIRTIO_RING_F_EVENT_IDX feature, we need to >>> + * either clear the flags bit or point the event index at the next >>> + * entry. Always do both to keep code simple. */ >>> + if (vq->event_flags_shadow == VRING_EVENT_F_DISABLE) { >>> + vq->event_flags_shadow = vq->event ? VRING_EVENT_F_DESC: >>> + VRING_EVENT_F_ENABLE; >>> + vq->vring_packed.driver->flags = cpu_to_virtio16(_vq->vdev, >>> + vq->event_flags_shadow); >>> + } >> A smp_wmb() is missed here? >> >>> + vq->vring_packed.driver->off_wrap = cpu_to_virtio16(_vq->vdev, off_wrap); >> And according to the spec, it looks to me write a VRING_EVENT_F_ENABLE is >> sufficient here. > I didn't think much when implementing the event suppression > for packed ring previously. After I saw your comments, I found > something new. Indeed, unlike the split ring, for the packed > ring, spec doesn't say we must use VRING_EVENT_F_DESC when > EVENT_IDX is negotiated. So do you think below thought is > right or makes sense? > > - For virtqueue_enable_cb_prepare(), we just need to enable > the ring by setting flags to VRING_EVENT_F_ENABLE in any > case. > > - We will try to use VRING_EVENT_F_DESC (if EVENT_IDX is > negotiated) only when we want to delay the interrupts > virtqueue_enable_cb_delayed().This looks good to me.> >>> + END_USE(vq); >>> + return last_used_idx; >>> +} >>> + > [...] >>> @@ -1157,14 +1852,18 @@ void vring_transport_features(struct virtio_device *vdev) >>> for (i = VIRTIO_TRANSPORT_F_START; i < VIRTIO_TRANSPORT_F_END; i++) { >>> switch (i) { >>> - case VIRTIO_RING_F_INDIRECT_DESC: >>> +#if 0 >>> + case VIRTIO_RING_F_INDIRECT_DESC: // FIXME not tested yet. >>> break; >>> - case VIRTIO_RING_F_EVENT_IDX: >>> + case VIRTIO_RING_F_EVENT_IDX: // FIXME probably not work. >>> break; >>> +#endif >> It would be better if you can split EVENT_IDX and INDIRECT_DESC into >> separate patches too. > Sure. Will do it in the next version. > > Thanks for the review!Thanks.>> Thanks >>
On Tue, Apr 17, 2018 at 10:11:58AM +0800, Jason Wang wrote:> > > On 2018?04?13? 15:15, Tiwei Bie wrote: > > On Fri, Apr 13, 2018 at 12:30:24PM +0800, Jason Wang wrote: > > > On 2018?04?01? 22:12, Tiwei Bie wrote: > > > > Hello everyone, > > > > > > > > This RFC implements packed ring support for virtio driver. > > > > > > > > The code was tested with DPDK vhost (testpmd/vhost-PMD) implemented > > > > by Jens at http://dpdk.org/ml/archives/dev/2018-January/089417.html > > > > Minor changes are needed for the vhost code, e.g. to kick the guest. > > > > > > > > TODO: > > > > - Refinements and bug fixes; > > > > - Split into small patches; > > > > - Test indirect descriptor support; > > > > - Test/fix event suppression support; > > > > - Test devices other than net; > > > > > > > > RFC v1 -> RFC v2: > > > > - Add indirect descriptor support - compile test only; > > > > - Add event suppression supprt - compile test only; > > > > - Move vring_packed_init() out of uapi (Jason, MST); > > > > - Merge two loops into one in virtqueue_add_packed() (Jason); > > > > - Split vring_unmap_one() for packed ring and split ring (Jason); > > > > - Avoid using '%' operator (Jason); > > > > - Rename free_head -> next_avail_idx (Jason); > > > > - Add comments for virtio_wmb() in virtqueue_add_packed() (Jason); > > > > - Some other refinements and bug fixes; > > > > > > > > Thanks! > > > > > > > > Signed-off-by: Tiwei Bie <tiwei.bie at intel.com> > > > > --- > > > > drivers/virtio/virtio_ring.c | 1094 +++++++++++++++++++++++++++++------- > > > > include/linux/virtio_ring.h | 8 +- > > > > include/uapi/linux/virtio_config.h | 12 +- > > > > include/uapi/linux/virtio_ring.h | 61 ++ > > > > 4 files changed, 980 insertions(+), 195 deletions(-) > > [...] > > > > +static struct vring_packed_desc *alloc_indirect_packed(struct virtqueue *_vq, > > > > + unsigned int total_sg, > > > > + gfp_t gfp) > > > > +{ > > > > + struct vring_packed_desc *desc; > > > > + > > > > + /* > > > > + * We require lowmem mappings for the descriptors because > > > > + * otherwise virt_to_phys will give us bogus addresses in the > > > > + * virtqueue. > > > > + */ > > > > + gfp &= ~__GFP_HIGHMEM; > > > > + > > > > + desc = kmalloc(total_sg * sizeof(struct vring_packed_desc), gfp); > > > Can we simply check vq->packed here to avoid duplicating helpers? > > Then it would be something like this: > > > > static void *alloc_indirect(struct virtqueue *_vq, unsigned int total_sg, > > gfp_t gfp) > > { > > struct vring_virtqueue *vq = to_vvq(_vq); > > void *data; > > > > /* > > * We require lowmem mappings for the descriptors because > > * otherwise virt_to_phys will give us bogus addresses in the > > * virtqueue. > > */ > > gfp &= ~__GFP_HIGHMEM; > > > > if (vq->packed) { > > data = kmalloc(total_sg * sizeof(struct vring_packed_desc), > > gfp); > > if (!data) > > return NULL; > > } else { > > struct vring_desc *desc; > > unsigned int i; > > > > desc = kmalloc(total_sg * sizeof(struct vring_desc), gfp); > > if (!desc) > > return NULL; > > > > for (i = 0; i < total_sg; i++) > > desc[i].next = cpu_to_virtio16(_vq->vdev, i + 1); > > > > data = desc; > > } > > > > return data; > > } > > > > I would prefer to have two simpler helpers (and to the callers, > > it's already very clear about which one they should call), i.e. > > the current implementation: > > > > static struct vring_packed_desc *alloc_indirect_packed(struct virtqueue *_vq, > > unsigned int total_sg, > > gfp_t gfp) > > { > > struct vring_packed_desc *desc; > > > > /* > > * We require lowmem mappings for the descriptors because > > * otherwise virt_to_phys will give us bogus addresses in the > > * virtqueue. > > */ > > gfp &= ~__GFP_HIGHMEM; > > > > desc = kmalloc(total_sg * sizeof(struct vring_packed_desc), gfp); > > > > return desc; > > } > > > > static struct vring_desc *alloc_indirect_split(struct virtqueue *_vq, > > unsigned int total_sg, > > gfp_t gfp) > > { > > struct vring_desc *desc; > > unsigned int i; > > > > /* > > * We require lowmem mappings for the descriptors because > > * otherwise virt_to_phys will give us bogus addresses in the > > * virtqueue. > > */ > > gfp &= ~__GFP_HIGHMEM; > > > > desc = kmalloc(total_sg * sizeof(struct vring_desc), gfp); > > if (!desc) > > return NULL; > > > > for (i = 0; i < total_sg; i++) > > desc[i].next = cpu_to_virtio16(_vq->vdev, i + 1); > > return desc; > > } > > Yeah, I miss that split version needs a desc list. > > > > > > > + > > > > + return desc; > > > > +} > > [...] > > > > +static inline int virtqueue_add_packed(struct virtqueue *_vq, > > > > + struct scatterlist *sgs[], > > > > + unsigned int total_sg, > > > > + unsigned int out_sgs, > > > > + unsigned int in_sgs, > > > > + void *data, > > > > + void *ctx, > > > > + gfp_t gfp) > > > > +{ > > > > + struct vring_virtqueue *vq = to_vvq(_vq); > > > > + struct vring_packed_desc *desc; > > > > + struct scatterlist *sg; > > > > + unsigned int i, n, descs_used, uninitialized_var(prev), err_idx; > > > > + __virtio16 uninitialized_var(head_flags), flags; > > > > + int head, wrap_counter; > > > > + bool indirect; > > > > + > > > > + START_USE(vq); > > > > + > > > > + BUG_ON(data == NULL); > > > > + BUG_ON(ctx && vq->indirect); > > > > + > > > > + if (unlikely(vq->broken)) { > > > > + END_USE(vq); > > > > + return -EIO; > > > > + } > > > > + > > > > +#ifdef DEBUG > > > > + { > > > > + ktime_t now = ktime_get(); > > > > + > > > > + /* No kick or get, with .1 second between? Warn. */ > > > > + if (vq->last_add_time_valid) > > > > + WARN_ON(ktime_to_ms(ktime_sub(now, vq->last_add_time)) > > > > + > 100); > > > > + vq->last_add_time = now; > > > > + vq->last_add_time_valid = true; > > > > + } > > > > +#endif > > > > + > > > > + BUG_ON(total_sg == 0); > > > > + > > > > + head = vq->next_avail_idx; > > > > + wrap_counter = vq->wrap_counter; > > > > + > > > > + /* If the host supports indirect descriptor tables, and we have multiple > > > > + * buffers, then go indirect. FIXME: tune this threshold */ > > > > + if (vq->indirect && total_sg > 1 && vq->vq.num_free) > > > Let's introduce a helper like virtqueue_need_indirect() to avoid duplicating > > > codes and FIXME. > > Okay. > > > > > > + desc = alloc_indirect_packed(_vq, total_sg, gfp); > > > > + else { > > > > + desc = NULL; > > > > + WARN_ON_ONCE(total_sg > vq->vring_packed.num && !vq->indirect); > > > > + } > > > > + > > > > + if (desc) { > > > > + /* Use a single buffer which doesn't continue */ > > > > + indirect = true; > > > > + /* Set up rest to use this indirect table. */ > > > > + i = 0; > > > > + descs_used = 1; > > > > + } else { > > > > + indirect = false; > > > > + desc = vq->vring_packed.desc; > > > > + i = head; > > > > + descs_used = total_sg; > > > > + } > > > > + > > > > + if (vq->vq.num_free < descs_used) { > > > > + pr_debug("Can't add buf len %i - avail = %i\n", > > > > + descs_used, vq->vq.num_free); > > > > + /* FIXME: for historical reasons, we force a notify here if > > > > + * there are outgoing parts to the buffer. Presumably the > > > > + * host should service the ring ASAP. */ > > > > + if (out_sgs) > > > > + vq->notify(&vq->vq); > > > > + if (indirect) > > > > + kfree(desc); > > > > + END_USE(vq); > > > > + return -ENOSPC; > > > > + } > > > > + > > > > + for (n = 0; n < out_sgs + in_sgs; n++) { > > > > + for (sg = sgs[n]; sg; sg = sg_next(sg)) { > > > > + dma_addr_t addr = vring_map_one_sg(vq, sg, n < out_sgs ? > > > > + DMA_TO_DEVICE : DMA_FROM_DEVICE); > > > > + if (vring_mapping_error(vq, addr)) > > > > + goto unmap_release; > > > > + > > > > + flags = cpu_to_virtio16(_vq->vdev, VRING_DESC_F_NEXT | > > > > + (n < out_sgs ? 0 : VRING_DESC_F_WRITE) | > > > > + VRING_DESC_F_AVAIL(vq->wrap_counter) | > > > > + VRING_DESC_F_USED(!vq->wrap_counter)); > > > > + if (!indirect && i == head) > > > > + head_flags = flags; > > > > + else > > > > + desc[i].flags = flags; > > > > + > > > > + desc[i].addr = cpu_to_virtio64(_vq->vdev, addr); > > > > + desc[i].len = cpu_to_virtio32(_vq->vdev, sg->length); > > > > + desc[i].id = cpu_to_virtio32(_vq->vdev, head); > > > Similar to V1, we only need this for the last descriptor. > > Okay, will just set it for the last desc. > > > > > > + prev = i; > > > It looks to me there's no need to track prev inside the loop here. > > > > > > > + i++; > > > > + if (!indirect && i >= vq->vring_packed.num) { > > > > + i = 0; > > > > + vq->wrap_counter ^= 1; > > > > + } > > > > + } > > > > + } > > > > + /* Last one doesn't continue. */ > > > > + if (total_sg == 1) > > > > + head_flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT); > > > > + else > > > > + desc[prev].flags &= cpu_to_virtio16(_vq->vdev, ~VRING_DESC_F_NEXT); > > > The only case when prev != i - 1 is i == 0, we can add a if here. > > It's just a mirror of the existing implementation in split ring. > > It seems that split ring implementation needs this just because > > it's much harder for it to find the prev, which is not true for > > packed ring. So I'll take your suggestion. Thanks! > > > > [...] > > > > +static bool virtqueue_kick_prepare_packed(struct virtqueue *_vq) > > > > +{ > > > > + struct vring_virtqueue *vq = to_vvq(_vq); > > > > + u16 new, old, off_wrap; > > > > + bool needs_kick; > > > > + > > > > + START_USE(vq); > > > > + /* We need to expose the new flags value before checking notification > > > > + * suppressions. */ > > > > + virtio_mb(vq->weak_barriers); > > > > + > > > > + old = vq->next_avail_idx - vq->num_added; > > > > + new = vq->next_avail_idx; > > > > + vq->num_added = 0; > > > > + > > > > +#ifdef DEBUG > > > > + if (vq->last_add_time_valid) { > > > > + WARN_ON(ktime_to_ms(ktime_sub(ktime_get(), > > > > + vq->last_add_time)) > 100); > > > > + } > > > > + vq->last_add_time_valid = false; > > > > +#endif > > > > + > > > > + off_wrap = virtio16_to_cpu(_vq->vdev, vq->vring_packed.device->off_wrap); > > > > + > > > > + if (vq->event) { > > > It looks to me we should examine RING_EVENT_FLAGS_DESC in desc_event_flags > > > instead of vq->event here. Spec does not forces to use evenf_off and > > > event_wrap if event index is enabled. > > > > > > > + // FIXME: fix this! > > > > + needs_kick = ((off_wrap >> 15) == vq->wrap_counter) && > > > > + vring_need_event(off_wrap & ~(1<<15), new, old); > > > Why need a & here? > > Because wrap_counter (the most significant bit in off_wrap) > > isn't part of the index. > > > > > > + } else { > > > Need a smp_rmb() to make sure desc_event_flags was checked before flags. > > I don't get your point, if my understanding is correct, > > desc_event_flags is vq->vring_packed.device->flags. So > > what's the "flags"? > > Sorry, I mean we need check device.flags before off_warp. So it needs an > smp_rmb() in the middle.It's best to just read all flags atomically as u32.> It looks to me there's no guarantee that > VRING_EVENT_F_DESC is set if event index is supported. > > > > > > > + needs_kick = (vq->vring_packed.device->flags !> > > > + cpu_to_virtio16(_vq->vdev, VRING_EVENT_F_DISABLE)); > > > > + } > > > > + END_USE(vq); > > > > + return needs_kick; > > > > +} > > [...] > > > > +static int detach_buf_packed(struct vring_virtqueue *vq, unsigned int head, > > > > + void **ctx) > > > > +{ > > > > + struct vring_packed_desc *desc; > > > > + unsigned int i, j; > > > > + > > > > + /* Clear data ptr. */ > > > > + vq->desc_state[head].data = NULL; > > > > + > > > > + i = head; > > > > + > > > > + for (j = 0; j < vq->desc_state[head].num; j++) { > > > > + desc = &vq->vring_packed.desc[i]; > > > > + vring_unmap_one_packed(vq, desc); > > > > + desc->flags = 0x0; > > > Looks like this is unnecessary. > > It's safer to zero it. If we don't zero it, after we > > call virtqueue_detach_unused_buf_packed() which calls > > this function, the desc is still available to the > > device. > > Well detach_unused_buf_packed() should be called after device is stopped, > otherwise even if you try to clear, there will still be a window that device > may use it. > > > > > > > + i++; > > > > + if (i >= vq->vring_packed.num) > > > > + i = 0; > > > > + } > > [...] > > > > +static unsigned virtqueue_enable_cb_prepare_packed(struct virtqueue *_vq) > > > > +{ > > > > + struct vring_virtqueue *vq = to_vvq(_vq); > > > > + u16 last_used_idx, wrap_counter, off_wrap; > > > > + > > > > + START_USE(vq); > > > > + > > > > + last_used_idx = vq->last_used_idx; > > > > + wrap_counter = vq->wrap_counter; > > > > + > > > > + if (last_used_idx > vq->next_avail_idx) > > > > + wrap_counter ^= 1; > > > > + > > > > + off_wrap = last_used_idx | (wrap_counter << 15); > > > > + > > > > + /* We optimistically turn back on interrupts, then check if there was > > > > + * more to do. */ > > > > + /* Depending on the VIRTIO_RING_F_EVENT_IDX feature, we need to > > > > + * either clear the flags bit or point the event index at the next > > > > + * entry. Always do both to keep code simple. */ > > > > + if (vq->event_flags_shadow == VRING_EVENT_F_DISABLE) { > > > > + vq->event_flags_shadow = vq->event ? VRING_EVENT_F_DESC: > > > > + VRING_EVENT_F_ENABLE; > > > > + vq->vring_packed.driver->flags = cpu_to_virtio16(_vq->vdev, > > > > + vq->event_flags_shadow); > > > > + } > > > A smp_wmb() is missed here? > > > > > > > + vq->vring_packed.driver->off_wrap = cpu_to_virtio16(_vq->vdev, off_wrap); > > > And according to the spec, it looks to me write a VRING_EVENT_F_ENABLE is > > > sufficient here. > > I didn't think much when implementing the event suppression > > for packed ring previously. After I saw your comments, I found > > something new. Indeed, unlike the split ring, for the packed > > ring, spec doesn't say we must use VRING_EVENT_F_DESC when > > EVENT_IDX is negotiated. So do you think below thought is > > right or makes sense? > > > > - For virtqueue_enable_cb_prepare(), we just need to enable > > the ring by setting flags to VRING_EVENT_F_ENABLE in any > > case. > > > > - We will try to use VRING_EVENT_F_DESC (if EVENT_IDX is > > negotiated) only when we want to delay the interrupts > > virtqueue_enable_cb_delayed(). > > This looks good to me.I suspect this will lead to extra interrupts if host is fast. So I think for now we should always use VRING_EVENT_F_DESC if EVENT_IDX is negotiated. VRING_EVENT_F_DISABLE makes more sense to me.> > > > > > + END_USE(vq); > > > > + return last_used_idx; > > > > +} > > > > + > > [...] > > > > @@ -1157,14 +1852,18 @@ void vring_transport_features(struct virtio_device *vdev) > > > > for (i = VIRTIO_TRANSPORT_F_START; i < VIRTIO_TRANSPORT_F_END; i++) { > > > > switch (i) { > > > > - case VIRTIO_RING_F_INDIRECT_DESC: > > > > +#if 0 > > > > + case VIRTIO_RING_F_INDIRECT_DESC: // FIXME not tested yet. > > > > break; > > > > - case VIRTIO_RING_F_EVENT_IDX: > > > > + case VIRTIO_RING_F_EVENT_IDX: // FIXME probably not work. > > > > break; > > > > +#endif > > > It would be better if you can split EVENT_IDX and INDIRECT_DESC into > > > separate patches too. > > Sure. Will do it in the next version. > > > > Thanks for the review! > > Thanks. > > > > Thanks > > >
On Tue, Apr 17, 2018 at 10:11:58AM +0800, Jason Wang wrote:> On 2018?04?13? 15:15, Tiwei Bie wrote: > > On Fri, Apr 13, 2018 at 12:30:24PM +0800, Jason Wang wrote: > > > On 2018?04?01? 22:12, Tiwei Bie wrote:[...]> > > > +static int detach_buf_packed(struct vring_virtqueue *vq, unsigned int head, > > > > + void **ctx) > > > > +{ > > > > + struct vring_packed_desc *desc; > > > > + unsigned int i, j; > > > > + > > > > + /* Clear data ptr. */ > > > > + vq->desc_state[head].data = NULL; > > > > + > > > > + i = head; > > > > + > > > > + for (j = 0; j < vq->desc_state[head].num; j++) { > > > > + desc = &vq->vring_packed.desc[i]; > > > > + vring_unmap_one_packed(vq, desc); > > > > + desc->flags = 0x0; > > > Looks like this is unnecessary. > > It's safer to zero it. If we don't zero it, after we > > call virtqueue_detach_unused_buf_packed() which calls > > this function, the desc is still available to the > > device. > > Well detach_unused_buf_packed() should be called after device is stopped, > otherwise even if you try to clear, there will still be a window that device > may use it.This is not about whether the device has been stopped or not. We don't have other places to re-initialize the ring descriptors and wrap_counter. So they need to be set to the correct values when doing detach_unused_buf. Best regards, Tiwei Bie