lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Date:   Wed, 14 Feb 2018 02:28:33 -0800
From:   Sowmini Varadhan <sowmini.varadhan@...cle.com>
To:     netdev@...r.kernel.org, willemdebruijn.kernel@...il.com
Cc:     davem@...emloft.net, rds-devel@....oracle.com,
        sowmini.varadhan@...cle.com, santosh.shilimkar@...cle.com
Subject: [PATCH V2 net-next 4/7] rds: support for zcopy completion notification

RDS removes a datagram (rds_message) from the retransmit queue when
an ACK is received. The ACK indicates that the receiver has queued
the RDS datagram, so that the sender can safely forget the datagram.
When all references to the rds_message are quiesced, rds_message_purge
is called to release resources used by the rds_message

If the datagram to be removed had pinned pages set up, add
an entry to the rs->rs_znotify_queue so that the notifcation
will be sent up via rds_rm_zerocopy_callback() when the
rds_message is eventually freed by rds_message_purge.

rds_rm_zerocopy_callback() attempts to batch the number of cookies
sent with each notification  to a max of SO_EE_ORIGIN_MAX_ZCOOKIES.
This is achieved by checking the tail skb in the sk_error_queue:
if this has room for one more cookie, the cookie from the
current notification is added; else a new skb is added to the
sk_error_queue. Every invocation of rds_rm_zerocopy_callback() will
trigger a ->sk_error_report to notify the application.

Signed-off-by: Sowmini Varadhan <sowmini.varadhan@...cle.com>
---
v2: 
  - make sure to always sock_put m_rs even if there is no znotifier.
  - major rewrite of notification, resulting in much simplification.

 include/uapi/linux/errqueue.h |    2 +
 net/rds/af_rds.c              |    2 +
 net/rds/message.c             |   83 +++++++++++++++++++++++++++++++++++++---
 net/rds/rds.h                 |   14 +++++++
 net/rds/recv.c                |    2 +
 5 files changed, 96 insertions(+), 7 deletions(-)

diff --git a/include/uapi/linux/errqueue.h b/include/uapi/linux/errqueue.h
index dc64cfa..28812ed 100644
--- a/include/uapi/linux/errqueue.h
+++ b/include/uapi/linux/errqueue.h
@@ -20,11 +20,13 @@ struct sock_extended_err {
 #define SO_EE_ORIGIN_ICMP6	3
 #define SO_EE_ORIGIN_TXSTATUS	4
 #define SO_EE_ORIGIN_ZEROCOPY	5
+#define SO_EE_ORIGIN_ZCOOKIE	6
 #define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS
 
 #define SO_EE_OFFENDER(ee)	((struct sockaddr*)((ee)+1))
 
 #define SO_EE_CODE_ZEROCOPY_COPIED	1
+#define	SO_EE_ORIGIN_MAX_ZCOOKIES	8
 
 /**
  *	struct scm_timestamping - timestamps exposed through cmsg
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index b405f77..74a4bf8 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -183,6 +183,8 @@ static unsigned int rds_poll(struct file *file, struct socket *sock,
 		mask |= (POLLIN | POLLRDNORM);
 	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
 		mask |= (POLLOUT | POLLWRNORM);
+	if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
+		mask |= POLLERR;
 	read_unlock_irqrestore(&rs->rs_recv_lock, flags);
 
 	/* clear state any time we wake a seen-congested socket */
diff --git a/net/rds/message.c b/net/rds/message.c
index ef3daaf..d874b74 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -33,6 +33,9 @@
 #include <linux/kernel.h>
 #include <linux/slab.h>
 #include <linux/export.h>
+#include <linux/skbuff.h>
+#include <linux/list.h>
+#include <linux/errqueue.h>
 
 #include "rds.h"
 
@@ -53,29 +56,95 @@ void rds_message_addref(struct rds_message *rm)
 }
 EXPORT_SYMBOL_GPL(rds_message_addref);
 
+static inline bool skb_zcookie_add(struct sk_buff *skb, u32 cookie)
+{
+	struct sock_exterr_skb *serr = SKB_EXT_ERR(skb);
+	int ncookies;
+	u32 *ptr;
+
+	if (serr->ee.ee_origin != SO_EE_ORIGIN_ZCOOKIE)
+		return false;
+	ncookies = serr->ee.ee_data;
+	if (ncookies == SO_EE_ORIGIN_MAX_ZCOOKIES)
+		return false;
+	ptr = skb_put(skb, sizeof(u32));
+	*ptr = cookie;
+	serr->ee.ee_data = ++ncookies;
+	return true;
+}
+
+static void rds_rm_zerocopy_callback(struct rds_sock *rs,
+				     struct rds_znotifier *znotif)
+{
+	struct sock *sk = rds_rs_to_sk(rs);
+	struct sk_buff *skb, *tail;
+	struct sock_exterr_skb *serr;
+	unsigned long flags;
+	struct sk_buff_head *q;
+	u32 cookie = znotif->z_cookie;
+
+	q = &sk->sk_error_queue;
+	spin_lock_irqsave(&q->lock, flags);
+	tail = skb_peek_tail(q);
+
+	if (tail && skb_zcookie_add(tail, cookie)) {
+		spin_unlock_irqrestore(&q->lock, flags);
+		mm_unaccount_pinned_pages(&znotif->z_mmp);
+		consume_skb(rds_skb_from_znotifier(znotif));
+		sk->sk_error_report(sk);
+		return;
+	}
+
+	skb = rds_skb_from_znotifier(znotif);
+	serr = SKB_EXT_ERR(skb);
+	serr->ee.ee_errno = 0;
+	serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
+	serr->ee.ee_info = 0;
+	serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;
+	WARN_ON(!skb_zcookie_add(skb, cookie));
+
+	__skb_queue_tail(q, skb);
+
+	spin_unlock_irqrestore(&q->lock, flags);
+	sk->sk_error_report(sk);
+
+	mm_unaccount_pinned_pages(&znotif->z_mmp);
+}
+
 /*
  * This relies on dma_map_sg() not touching sg[].page during merging.
  */
 static void rds_message_purge(struct rds_message *rm)
 {
 	unsigned long i, flags;
+	bool zcopy = false;
 
 	if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
 		return;
 
-	for (i = 0; i < rm->data.op_nents; i++) {
-		rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
-		/* XXX will have to put_page for page refs */
-		__free_page(sg_page(&rm->data.op_sg[i]));
-	}
-	rm->data.op_nents = 0;
 	spin_lock_irqsave(&rm->m_rs_lock, flags);
 	if (rm->m_rs) {
-		sock_put(rds_rs_to_sk(rm->m_rs));
+		struct rds_sock *rs = rm->m_rs;
+
+		if (rm->data.op_mmp_znotifier) {
+			zcopy = true;
+			rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier);
+			rm->data.op_mmp_znotifier = NULL;
+		}
+		sock_put(rds_rs_to_sk(rs));
 		rm->m_rs = NULL;
 	}
 	spin_unlock_irqrestore(&rm->m_rs_lock, flags);
 
+	for (i = 0; i < rm->data.op_nents; i++) {
+		/* XXX will have to put_page for page refs */
+		if (!zcopy)
+			__free_page(sg_page(&rm->data.op_sg[i]));
+		else
+			put_page(sg_page(&rm->data.op_sg[i]));
+	}
+	rm->data.op_nents = 0;
+
 	if (rm->rdma.op_active)
 		rds_rdma_free_op(&rm->rdma);
 	if (rm->rdma.op_rdma_mr)
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 374ae83..6e8fc4c 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -356,6 +356,19 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
 #define RDS_MSG_PAGEVEC		7
 #define RDS_MSG_FLUSH		8
 
+struct rds_znotifier {
+	struct list_head	z_list;
+	struct mmpin		z_mmp;
+	u32			z_cookie;
+};
+
+#define	RDS_ZCOPY_SKB(__skb)	((struct rds_znotifier *)&((__skb)->cb[0]))
+
+static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z)
+{
+	return container_of((void *)z, struct sk_buff, cb);
+}
+
 struct rds_message {
 	refcount_t		m_refcount;
 	struct list_head	m_sock_item;
@@ -436,6 +449,7 @@ struct rds_message {
 			unsigned int		op_count;
 			unsigned int		op_dmasg;
 			unsigned int		op_dmaoff;
+			struct rds_znotifier	*op_mmp_znotifier;
 			struct scatterlist	*op_sg;
 		} data;
 	};
diff --git a/net/rds/recv.c b/net/rds/recv.c
index b25bcfe..b080961 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -594,6 +594,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
 
 	if (msg_flags & MSG_OOB)
 		goto out;
+	if (msg_flags & MSG_ERRQUEUE)
+		return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
 
 	while (1) {
 		/* If there are pending notifications, do those - and nothing else */
-- 
1.7.1

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ