[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <CAF=yD-JEYZ=nw0eFNTHJW3zZVCzYRtJ0yGL90_Mgjay4HcVLXQ@mail.gmail.com>
Date: Sun, 25 Feb 2018 10:56:08 -0500
From: Willem de Bruijn <willemdebruijn.kernel@...il.com>
To: Sowmini Varadhan <sowmini.varadhan@...cle.com>
Cc: Network Development <netdev@...r.kernel.org>,
David Miller <davem@...emloft.net>,
Santosh Shilimkar <santosh.shilimkar@...cle.com>
Subject: Re: [PATCH V2 net-next 2/3] rds: deliver zerocopy completion
notification with data
On Fri, Feb 23, 2018 at 5:08 PM, Sowmini Varadhan
<sowmini.varadhan@...cle.com> wrote:
> This commit is an optimization of the commit 01883eda72bd
> ("rds: support for zcopy completion notification") for PF_RDS sockets.
>
> RDS applications are predominantly request-response transactions, so
> it is more efficient to reduce the number of system calls and have
> zerocopy completion notification delivered as ancillary data on the
> POLLIN channel.
>
> Cookies are passed up as ancillary data (at level SOL_RDS) in a
> struct rds_zcopy_cookies when the returned value of recvmsg() is
> greater than, or equal to, 0. A max of RDS_MAX_ZCOOKIES may be passed
> with each message.
>
> This commit removes support for zerocopy completion notification on
> MSG_ERRQUEUE for PF_RDS sockets.
>
> Signed-off-by: Sowmini Varadhan <sowmini.varadhan@...cle.com>
> ---
> diff --git a/net/rds/message.c b/net/rds/message.c
> index 6518345..2e8bdaf 100644
> --- a/net/rds/message.c
> +++ b/net/rds/message.c
> @@ -58,32 +58,26 @@ void rds_message_addref(struct rds_message *rm)
>
> static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
> {
> - struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
> - int ncookies;
> - u32 *ptr;
> + struct rds_zcopy_cookies *ck = (struct rds_zcopy_cookies *)skb->cb;
please add a bounds check (not necessarily right here)
BUILD_BUG_ON(sizeof(*ck) > sizeof(skb->cb));
> static void rds_rm_zerocopy_callback(struct rds_sock *rs,
> struct rds_znotifier *znotif)
> {
> - struct sock *sk = rds_rs_to_sk(rs);
> struct sk_buff *skb, *tail;
> - struct sock_exterr_skb *serr;
> unsigned long flags;
> struct sk_buff_head *q;
> u32 cookie = znotif->z_cookie;
> + struct rds_zcopy_cookies *ck;
>
> - q = &sk->sk_error_queue;
> + q = &rs->rs_zcookie_queue;
> spin_lock_irqsave(&q->lock, flags);
> tail = skb_peek_tail(q);
>
> @@ -91,22 +85,19 @@ static void rds_rm_zerocopy_callback(struct rds_sock *rs,
> spin_unlock_irqrestore(&q->lock, flags);
> mm_unaccount_pinned_pages(&znotif->z_mmp);
> consume_skb(rds_skb_from_znotifier(znotif));
> - sk->sk_error_report(sk);
> + /* caller should wake up POLLIN */
sk->sk_data_ready(sk);
> @@ -362,8 +354,7 @@ int rds_message_copy_from_user(struct rds_message *rm, struct iov_iter *from,
> int total_copied = 0;
> struct sk_buff *skb;
>
> - skb = alloc_skb(SO_EE_ORIGIN_MAX_ZCOOKIES * sizeof(u32),
> - GFP_KERNEL);
> + skb = alloc_skb(0, GFP_KERNEL);
Without the error queue, the struct no longer needs to be an skb,
per se. Converting to a different struct with list_head is definitely
a longer patch. But kmalloc will be cheaper than alloc_skb.
Perhaps something to try (as separate follow-on work).
> +static int rds_recvmsg_zcookie(struct rds_sock *rs, struct msghdr *msg)
> +{
> + struct sk_buff *skb;
> + struct sk_buff_head *q = &rs->rs_zcookie_queue;
> + struct rds_zcopy_cookies *done;
> + int ret;
> +
> + if (!sock_flag(rds_rs_to_sk(rs), SOCK_ZEROCOPY) || !skb_peek(q))
> + return 0;
Racy read?
> +
> + if (!msg->msg_control ||
I'd move this first, so that the cookie queue need not even be probed
in the common case.
> + msg->msg_controllen < CMSG_SPACE(sizeof(*done)))
> + return 0;
if caller does not satisfy the contract on controllen size, can be
more explicit and return an error.
> +
> + skb = skb_dequeue(q);
> + if (!skb)
> + return 0;
> + done = (struct rds_zcopy_cookies *)skb->cb;
> + ret = done->num;
done->num is guaranteed to be >= 1, so ret is not strictly needed.
> + if (put_cmsg(msg, SOL_RDS, RDS_CMSG_ZCOPY_COMPLETION, sizeof(*done),
> + done)) {
> + skb_queue_head(q, skb);
> + ret = 0;
> + } else {
> + consume_skb(skb);
> + }
> + return ret;
> +}
> +
> int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
> int msg_flags)
> {
> @@ -586,6 +615,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
> int ret = 0, nonblock = msg_flags & MSG_DONTWAIT;
> DECLARE_SOCKADDR(struct sockaddr_in *, sin, msg->msg_name);
> struct rds_incoming *inc = NULL;
> + int ncookies;
>
> /* udp_recvmsg()->sock_recvtimeo() gets away without locking too.. */
> timeo = sock_rcvtimeo(sk, nonblock);
> @@ -611,7 +641,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
>
> if (!rds_next_incoming(rs, &inc)) {
> if (nonblock) {
> - ret = -EAGAIN;
> + ncookies = rds_recvmsg_zcookie(rs, msg);
> + ret = ncookies ? 0 : -EAGAIN;
> break;
> }
>
> @@ -660,6 +691,7 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
> ret = -EFAULT;
> goto out;
> }
> + ncookies = rds_recvmsg_zcookie(rs, msg);
return value not used. can make rds_recvmsg_zcookie return a bool for
the non-blocking case.
>
> rds_stats_inc(s_recv_delivered);
>
> --
> 1.7.1
Powered by blists - more mailing lists