Stefano Garzarella
2021-Jan-29 09:21 UTC
[RFC PATCH v3 03/13] af_vsock: implement SEQPACKET rx loop
On Fri, Jan 29, 2021 at 09:28:49AM +0300, Arseny Krasnov wrote:> >On 28.01.2021 19:55, Stefano Garzarella wrote: >> On Mon, Jan 25, 2021 at 02:12:36PM +0300, Arseny Krasnov wrote: >>> This adds receive loop for SEQPACKET. It looks like receive loop for >>> SEQPACKET, but there is a little bit difference: >>> 1) It doesn't call notify callbacks. >>> 2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because >>> there is no sense for these values in SEQPACKET case. >>> 3) It waits until whole record is received or error is found during >>> receiving. >>> 4) It processes and sets 'MSG_TRUNC' flag. >>> >>> So to avoid extra conditions for two types of socket inside one loop, two >>> independent functions were created. >>> >>> Signed-off-by: Arseny Krasnov <arseny.krasnov at kaspersky.com> >>> --- >>> include/net/af_vsock.h | 5 ++ >>> net/vmw_vsock/af_vsock.c | 102 ++++++++++++++++++++++++++++++++++++++- >>> 2 files changed, 106 insertions(+), 1 deletion(-) >>> >>> diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h >>> index b1c717286993..46073842d489 100644 >>> --- a/include/net/af_vsock.h >>> +++ b/include/net/af_vsock.h >>> @@ -135,6 +135,11 @@ struct vsock_transport { >>> bool (*stream_is_active)(struct vsock_sock *); >>> bool (*stream_allow)(u32 cid, u32 port); >>> >>> + /* SEQ_PACKET. */ >>> + size_t (*seqpacket_seq_get_len)(struct vsock_sock *); >>> + ssize_t (*seqpacket_dequeue)(struct vsock_sock *, struct msghdr *, >>> + size_t len, int flags); >>> + >>> /* Notification. */ >>> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *); >>> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *); >>> diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c >>> index 524df8fc84cd..3b266880b7c8 100644 >>> --- a/net/vmw_vsock/af_vsock.c >>> +++ b/net/vmw_vsock/af_vsock.c >>> @@ -2006,7 +2006,107 @@ static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg, >>> static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg, >>> size_t len, int flags) >>> { >>> - return -1; >>> + const struct vsock_transport *transport; >>> + const struct iovec *orig_iov; >>> + unsigned long orig_nr_segs; >>> + ssize_t dequeued_total = 0; >>> + struct vsock_sock *vsk; >>> + size_t record_len; >>> + long timeout; >>> + int err = 0; >>> + DEFINE_WAIT(wait); >>> + >>> + vsk = vsock_sk(sk); >>> + transport = vsk->transport; >>> + >>> + timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT); >>> + msg->msg_flags &= ~MSG_EOR; >> Maybe add a comment about why we need to clear MSG_EOR. >> >>> + orig_nr_segs = msg->msg_iter.nr_segs; >>> + orig_iov = msg->msg_iter.iov; >>> + >>> + while (1) { >>> + ssize_t dequeued; >>> + s64 ready; >>> + >>> + prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE); >>> + ready = vsock_stream_has_data(vsk); >>> + >>> + if (ready == 0) { >>> + if (vsock_wait_data(sk, &wait, timeout, NULL, 0)) { >>> + /* In case of any loop break(timeout, signal >>> + * interrupt or shutdown), we report user that >>> + * nothing was copied. >>> + */ >>> + dequeued_total = 0; >>> + break; >>> + } >>> + continue; >>> + } >>> + >>> + finish_wait(sk_sleep(sk), &wait); >>> + >>> + if (ready < 0) { >>> + err = -ENOMEM; >>> + goto out; >>> + } >>> + >>> + if (dequeued_total == 0) { >>> + record_len >>> + transport->seqpacket_seq_get_len(vsk); >>> + >>> + if (record_len == 0) >>> + continue; >>> + } >>> + >>> + /* 'msg_iter.count' is number of unused bytes in iov. >>> + * On every copy to iov iterator it is decremented at >>> + * size of data. >>> + */ >>> + dequeued = transport->seqpacket_dequeue(vsk, msg, >>> + msg->msg_iter.count, flags); >> ^ >> Is this needed or 'msg' can be >> used in the transport? >Yes, right >>> + >>> + if (dequeued < 0) { >>> + dequeued_total = 0; >>> + >>> + if (dequeued == -EAGAIN) { >>> + iov_iter_init(&msg->msg_iter, READ, >>> + orig_iov, orig_nr_segs, >>> + len); >>> + msg->msg_flags &= ~MSG_EOR; >>> + continue; >> Why we need to reset MSG_EOR here? > >Because if previous attempt to receive record was failed, but > >MSG_EOR was set, so we clear it for next attempt to get recordYes, I saw later when I looked at the implementation in the transport. Maybe better to put a comment saying that seqpacket_dequeue() can set that flag. Thanks, Stefano