[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20210118150433.kj4wuoecddyng632@steredhat>
Date: Mon, 18 Jan 2021 16:04:33 +0100
From: Stefano Garzarella <sgarzare@...hat.com>
To: Arseny Krasnov <arseny.krasnov@...persky.com>
Cc: Stefan Hajnoczi <stefanha@...hat.com>,
"Michael S. Tsirkin" <mst@...hat.com>,
Jason Wang <jasowang@...hat.com>,
"David S. Miller" <davem@...emloft.net>,
Jakub Kicinski <kuba@...nel.org>,
Jorgen Hansen <jhansen@...are.com>,
Colin Ian King <colin.king@...onical.com>,
Andra Paraschiv <andraprs@...zon.com>,
Jeff Vander Stoep <jeffv@...gle.com>, kvm@...r.kernel.org,
virtualization@...ts.linux-foundation.org, netdev@...r.kernel.org,
linux-kernel@...r.kernel.org, stsp2@...dex.ru, oxffffaa@...il.com
Subject: Re: [RFC PATCH v2 02/13] af_vsock: separate rx loops for
STREAM/SEQPACKET.
On Fri, Jan 15, 2021 at 08:40:50AM +0300, Arseny Krasnov wrote:
>This adds two receive loops: for SOCK_STREAM and SOCK_SEQPACKET. Both are
>look like twins, but SEQPACKET is a little bit different from STREAM:
>1) It doesn't call notify callbacks.
>2) It doesn't care about 'SO_SNDLOWAT' and 'SO_RCVLOWAT' values, because
> there is no sense for these values in SEQPACKET case.
>3) It waits until whole record is received or error is found during
> receiving.
>4) It processes and sets 'MSG_TRUNC' flag.
>
>So to avoid extra conditions for two types of socket inside on loop, two
>independent functions were created.
>
>Signed-off-by: Arseny Krasnov <arseny.krasnov@...persky.com>
>---
> include/net/af_vsock.h | 5 +
> net/vmw_vsock/af_vsock.c | 202 +++++++++++++++++++++++++++++++++++++++
> 2 files changed, 207 insertions(+)
>
>diff --git a/include/net/af_vsock.h b/include/net/af_vsock.h
>index b1c717286993..46073842d489 100644
>--- a/include/net/af_vsock.h
>+++ b/include/net/af_vsock.h
>@@ -135,6 +135,11 @@ struct vsock_transport {
> bool (*stream_is_active)(struct vsock_sock *);
> bool (*stream_allow)(u32 cid, u32 port);
>
>+ /* SEQ_PACKET. */
>+ size_t (*seqpacket_seq_get_len)(struct vsock_sock *);
>+ ssize_t (*seqpacket_dequeue)(struct vsock_sock *, struct msghdr *,
>+ size_t len, int flags);
>+
> /* Notification. */
> int (*notify_poll_in)(struct vsock_sock *, size_t, bool *);
> int (*notify_poll_out)(struct vsock_sock *, size_t, bool *);
>diff --git a/net/vmw_vsock/af_vsock.c b/net/vmw_vsock/af_vsock.c
>index af716f5a93a4..afacbe9f4231 100644
>--- a/net/vmw_vsock/af_vsock.c
>+++ b/net/vmw_vsock/af_vsock.c
>@@ -1870,6 +1870,208 @@ static int vsock_wait_data(struct sock *sk, struct wait_queue_entry *wait,
> return err;
> }
>
>+static int __vsock_seqpacket_recvmsg(struct sock *sk, struct msghdr *msg,
>+ size_t len, int flags)
>+{
>+ int err = 0;
>+ size_t record_len;
>+ struct vsock_sock *vsk;
>+ const struct vsock_transport *transport;
>+ long timeout;
>+ ssize_t dequeued_total = 0;
>+ unsigned long orig_nr_segs;
>+ const struct iovec *orig_iov;
>+ DEFINE_WAIT(wait);
>+
>+ vsk = vsock_sk(sk);
>+ transport = vsk->transport;
>+
>+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
>+ msg->msg_flags &= ~MSG_EOR;
>+ orig_nr_segs = msg->msg_iter.nr_segs;
>+ orig_iov = msg->msg_iter.iov;
>+
>+ while (1) {
>+ s64 ready;
>+
>+ prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
>+ ready = vsock_stream_has_data(vsk);
>+
>+ if (ready == 0) {
>+ if (vsock_wait_data(sk, &wait, timeout, NULL, 0)) {
>+ /* In case of any loop break(timeout, signal
>+ * interrupt or shutdown), we report user that
>+ * nothing was copied.
>+ */
>+ dequeued_total = 0;
>+ break;
>+ }
Maybe here we can do 'continue', remove the next line, and reduce the
indentation on the next block.
>+ } else {
>+ ssize_t dequeued;
>+
>+ finish_wait(sk_sleep(sk), &wait);
>+
>+ if (ready < 0) {
>+ err = -ENOMEM;
>+ goto out;
>+ }
>+
>+ if (dequeued_total == 0) {
>+ record_len =
>+ transport->seqpacket_seq_get_len(vsk);
>+
>+ if (record_len == 0)
>+ continue;
>+ }
>+
>+ /* 'msg_iter.count' is number of unused bytes in iov.
>+ * On every copy to iov iterator it is decremented at
>+ * size of data.
>+ */
>+ dequeued = transport->seqpacket_dequeue(vsk, msg,
>+ msg->msg_iter.count, flags);
>+
>+ if (dequeued < 0) {
>+ dequeued_total = 0;
>+
>+ if (dequeued == -EAGAIN) {
>+ iov_iter_init(&msg->msg_iter, READ,
>+ orig_iov, orig_nr_segs,
>+ len);
>+ msg->msg_flags &= ~MSG_EOR;
>+ continue;
>+ }
>+
>+ err = -ENOMEM;
>+ break;
>+ }
>+
>+ dequeued_total += dequeued;
>+
>+ if (dequeued_total >= record_len)
>+ break;
>+ }
>+ }
>+ if (sk->sk_err)
>+ err = -sk->sk_err;
>+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
>+ err = 0;
>+
>+ if (dequeued_total > 0) {
>+ /* User sets MSG_TRUNC, so return real length of
>+ * packet.
>+ */
>+ if (flags & MSG_TRUNC)
>+ err = record_len;
>+ else
>+ err = len - msg->msg_iter.count;
>+
>+ /* Always set MSG_TRUNC if real length of packet is
>+ * bigger that user buffer.
>+ */
>+ if (record_len > len)
>+ msg->msg_flags |= MSG_TRUNC;
>+ }
>+out:
>+ return err;
>+}
>+
>+static int __vsock_stream_recvmsg(struct sock *sk, struct msghdr *msg,
>+ size_t len, int flags)
>+{
>+ int err;
>+ const struct vsock_transport *transport;
>+ struct vsock_sock *vsk;
>+ size_t target;
>+ struct vsock_transport_recv_notify_data recv_data;
>+ long timeout;
>+ ssize_t copied;
>+
>+ DEFINE_WAIT(wait);
>+
>+ vsk = vsock_sk(sk);
>+ transport = vsk->transport;
>+
>+ /* We must not copy less than target bytes into the user's buffer
>+ * before returning successfully, so we wait for the consume queue to
>+ * have that much data to consume before dequeueing. Note that this
>+ * makes it impossible to handle cases where target is greater than the
>+ * queue size.
>+ */
>+ target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
>+ if (target >= transport->stream_rcvhiwat(vsk)) {
>+ err = -ENOMEM;
>+ goto out;
>+ }
>+ timeout = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
>+ copied = 0;
>+
>+ err = transport->notify_recv_init(vsk, target, &recv_data);
>+ if (err < 0)
>+ goto out;
>+
>+ while (1) {
>+ s64 ready;
>+
>+ prepare_to_wait(sk_sleep(sk), &wait, TASK_INTERRUPTIBLE);
>+ ready = vsock_stream_has_data(vsk);
>+
>+ if (ready == 0) {
>+ if (vsock_wait_data(sk, &wait, timeout,
>&recv_data, target))
>+ break;
The same also here.
>+ } else {
>+ ssize_t read;
>+
>+ finish_wait(sk_sleep(sk), &wait);
>+
>+ if (ready < 0) {
>+ /* Invalid queue pair content. XXX This should
>+ * be changed to a connection reset in a later
>+ * change.
>+ */
>+
>+ err = -ENOMEM;
>+ goto out;
>+ }
>+
>+ err = transport->notify_recv_pre_dequeue(vsk,
>+ target, &recv_data);
>+ if (err < 0)
>+ break;
>+ read = transport->stream_dequeue(vsk, msg, len - copied, flags);
>+
>+ if (read < 0) {
>+ err = -ENOMEM;
>+ break;
>+ }
>+
>+ copied += read;
>+
>+ err = transport->notify_recv_post_dequeue(vsk,
>+ target, read,
>+ !(flags & MSG_PEEK), &recv_data);
>+ if (err < 0)
>+ goto out;
>+
>+ if (read >= target || flags & MSG_PEEK)
>+ break;
>+
>+ target -= read;
>+ }
>+ }
>+
>+ if (sk->sk_err)
>+ err = -sk->sk_err;
>+ else if (sk->sk_shutdown & RCV_SHUTDOWN)
>+ err = 0;
>+ if (copied > 0)
>+ err = copied;
>+
>+out:
>+ release_sock(sk);
>+ return err;
>+}
>+
> static int
> vsock_stream_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
> int flags)
>--
>2.25.1
>
Powered by blists - more mailing lists