[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1193bb432985523ff75715ce68eb7126dac7f018.1516793252.git.sowmini.varadhan@oracle.com>
Date: Wed, 24 Jan 2018 03:45:59 -0800
From: Sowmini Varadhan <sowmini.varadhan@...cle.com>
To: sowmini.varadhan@...cle.com, willemdebruijn.kernel@...il.com,
netdev@...r.kernel.org
Cc: davem@...emloft.net, rds-devel@....oracle.com,
santosh.shilimkar@...cle.com
Subject: [PATCH net-next 4/7] rds: support for zcopy completion notification
RDS removes a datagram (rds_message) from the retransmit queue when
an ACK is received. The ACK indicates that the receiver has queued
the RDS datagram, so that the sender can safely forget the datagram.
When all references to the rds_message are quiesced, rds_message_purge
is called to release resources used by the rds_message
If the datagram to be removed had pinned pages set up, add
an entry to the rs->rs_znotify_queue so that the notifcation
will be sent up via rds_rm_zerocopy_callback() when the
rds_message is eventually freed by rds_message_purge.
rds_rm_zerocopy_callback() attempts to batch the number of cookies
sent with each notification to a max of SO_EE_ORIGIN_MAX_ZCOOKIES.
Each time a cookie is released by rds_message_purge(), the
rs_znotify_queue is checked to see if the MAX_ZCOOKIES batch limit
has been exceeded (in which case we send up a notification). If the
limit has not been exceeded, the cookie is added to the rs_znotify_queue
and a timer is set up, to make sure the cookie notification will
be sent after an upper bound of RDS_REAP_TIMEOUT (should the
traffic rate slow down)
Signed-off-by: Sowmini Varadhan <sowmini.varadhan@...cle.com>
---
include/uapi/linux/errqueue.h | 2 +
net/rds/af_rds.c | 7 +++
net/rds/message.c | 104 ++++++++++++++++++++++++++++++++++++++---
net/rds/rds.h | 20 ++++++++
net/rds/recv.c | 2 +
5 files changed, 128 insertions(+), 7 deletions(-)
diff --git a/include/uapi/linux/errqueue.h b/include/uapi/linux/errqueue.h
index dc64cfa..28812ed 100644
--- a/include/uapi/linux/errqueue.h
+++ b/include/uapi/linux/errqueue.h
@@ -20,11 +20,13 @@ struct sock_extended_err {
#define SO_EE_ORIGIN_ICMP6 3
#define SO_EE_ORIGIN_TXSTATUS 4
#define SO_EE_ORIGIN_ZEROCOPY 5
+#define SO_EE_ORIGIN_ZCOOKIE 6
#define SO_EE_ORIGIN_TIMESTAMPING SO_EE_ORIGIN_TXSTATUS
#define SO_EE_OFFENDER(ee) ((struct sockaddr*)((ee)+1))
#define SO_EE_CODE_ZEROCOPY_COPIED 1
+#define SO_EE_ORIGIN_MAX_ZCOOKIES 8
/**
* struct scm_timestamping - timestamps exposed through cmsg
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index b405f77..49a81e8 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -66,6 +66,8 @@ static int rds_release(struct socket *sock)
rs = rds_sk_to_rs(sk);
sock_orphan(sk);
+
+ del_timer_sync(&rs->rs_cookie_timer);
/* Note - rds_clear_recv_queue grabs rs_recv_lock, so
* that ensures the recv path has completed messing
* with the socket. */
@@ -183,6 +185,8 @@ static unsigned int rds_poll(struct file *file, struct socket *sock,
mask |= (POLLIN | POLLRDNORM);
if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
mask |= (POLLOUT | POLLWRNORM);
+ if (sk->sk_err || !skb_queue_empty(&sk->sk_error_queue))
+ mask |= POLLERR;
read_unlock_irqrestore(&rs->rs_recv_lock, flags);
/* clear state any time we wake a seen-congested socket */
@@ -511,6 +515,9 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
INIT_LIST_HEAD(&rs->rs_send_queue);
INIT_LIST_HEAD(&rs->rs_recv_queue);
INIT_LIST_HEAD(&rs->rs_notify_queue);
+ INIT_LIST_HEAD(&rs->rs_znotify_queue);
+ rs->rs_ncookies = 0;
+ timer_setup(&rs->rs_cookie_timer, rs_zcopy_notify, 0);
INIT_LIST_HEAD(&rs->rs_cong_list);
spin_lock_init(&rs->rs_rdma_lock);
rs->rs_rdma_keys = RB_ROOT;
diff --git a/net/rds/message.c b/net/rds/message.c
index ef3daaf..7ca968a 100644
--- a/net/rds/message.c
+++ b/net/rds/message.c
@@ -33,6 +33,9 @@
#include <linux/kernel.h>
#include <linux/slab.h>
#include <linux/export.h>
+#include <linux/skbuff.h>
+#include <linux/list.h>
+#include <linux/errqueue.h>
#include "rds.h"
@@ -53,28 +56,115 @@ void rds_message_addref(struct rds_message *rm)
}
EXPORT_SYMBOL_GPL(rds_message_addref);
+static void rds_rm_zerocopy_callback(struct rds_sock *rs,
+ struct rds_znotifier *znotifier,
+ bool force)
+{
+ struct sock *sk = rds_rs_to_sk(rs);
+ struct sk_buff *skb;
+ struct sock_exterr_skb *serr;
+ unsigned long flags;
+ u32 *ptr;
+ int ncookies = 0, i;
+ struct rds_znotifier *znotif, *ztmp, *first;
+ LIST_HEAD(tmp_list);
+
+ spin_lock_irqsave(&rs->rs_lock, flags);
+ ncookies = rs->rs_ncookies;
+ if (ncookies < SO_EE_ORIGIN_MAX_ZCOOKIES && !force) {
+ if (znotifier) { /* add this cookie to the list and return */
+ list_add_tail(&znotifier->z_list,
+ &rs->rs_znotify_queue);
+ rs->rs_ncookies++;
+ }
+ spin_unlock_irqrestore(&rs->rs_lock, flags);
+ return;
+ }
+ if (!ncookies) { /* timer finds a reaped list */
+ spin_unlock_irqrestore(&rs->rs_lock, flags);
+ return;
+ }
+ /* reap existing cookie list if we have hit the max, then add
+ * new cookie to the list for next round of reaping.
+ */
+ list_splice(&rs->rs_znotify_queue, &tmp_list); /* reap now */
+ INIT_LIST_HEAD(&rs->rs_znotify_queue);
+ rs->rs_ncookies = 0;
+ if (znotifier) { /* for next round */
+ list_add_tail(&znotifier->z_list, &rs->rs_znotify_queue);
+ rs->rs_ncookies++;
+ }
+ spin_unlock_irqrestore(&rs->rs_lock, flags);
+
+ first = list_first_entry(&tmp_list, struct rds_znotifier, z_list);
+ znotif = list_next_entry(first, z_list);
+ list_del(&first->z_list);
+
+ skb = rds_skb_from_znotifier(first);
+ ptr = skb_put(skb, ncookies * sizeof(u32));
+ i = 0;
+ ptr[i++] = first->z_cookie;
+
+ list_for_each_entry_safe(znotif, ztmp, &tmp_list, z_list) {
+ list_del(&znotif->z_list);
+ ptr[i++] = znotif->z_cookie;
+ mm_unaccount_pinned_pages(&znotif->z_mmp);
+ consume_skb(rds_skb_from_znotifier(znotif));
+ }
+ WARN_ON(!list_empty(&tmp_list));
+
+ serr = SKB_EXT_ERR(skb);
+ serr->ee.ee_errno = 0;
+ serr->ee.ee_origin = SO_EE_ORIGIN_ZCOOKIE;
+ serr->ee.ee_data = ncookies;
+ serr->ee.ee_info = 0;
+ serr->ee.ee_code |= SO_EE_CODE_ZEROCOPY_COPIED;
+
+ if (sock_queue_err_skb(sk, skb))
+ consume_skb(skb);
+}
+
+void rs_zcopy_notify(struct timer_list *t)
+{
+ struct rds_sock *rs = from_timer(rs, t, rs_cookie_timer);
+
+ rds_rm_zerocopy_callback(rs, NULL, true);
+}
+
/*
* This relies on dma_map_sg() not touching sg[].page during merging.
*/
static void rds_message_purge(struct rds_message *rm)
{
unsigned long i, flags;
+ bool zcopy = false;
if (unlikely(test_bit(RDS_MSG_PAGEVEC, &rm->m_flags)))
return;
+ spin_lock_irqsave(&rm->m_rs_lock, flags);
+ if (rm->data.op_mmp_znotifier && rm->m_rs) {
+ struct rds_sock *rs = rm->m_rs;
+
+ zcopy = true;
+ rds_rm_zerocopy_callback(rs, rm->data.op_mmp_znotifier, false);
+ rm->data.op_mmp_znotifier = NULL;
+ (void)mod_timer(&rs->rs_cookie_timer, RDS_REAP_TIMEOUT);
+
+ sock_put(rds_rs_to_sk(rs));
+ rm->m_rs = NULL;
+ }
+ spin_unlock_irqrestore(&rm->m_rs_lock, flags);
+
for (i = 0; i < rm->data.op_nents; i++) {
rdsdebug("putting data page %p\n", (void *)sg_page(&rm->data.op_sg[i]));
/* XXX will have to put_page for page refs */
- __free_page(sg_page(&rm->data.op_sg[i]));
+ if (!zcopy)
+ __free_page(sg_page(&rm->data.op_sg[i]));
+ else
+ put_page(sg_page(&rm->data.op_sg[i]));
}
rm->data.op_nents = 0;
- spin_lock_irqsave(&rm->m_rs_lock, flags);
- if (rm->m_rs) {
- sock_put(rds_rs_to_sk(rm->m_rs));
- rm->m_rs = NULL;
- }
- spin_unlock_irqrestore(&rm->m_rs_lock, flags);
if (rm->rdma.op_active)
rds_rdma_free_op(&rm->rdma);
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 374ae83..c375dd8 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -356,6 +356,19 @@ static inline u32 rds_rdma_cookie_offset(rds_rdma_cookie_t cookie)
#define RDS_MSG_PAGEVEC 7
#define RDS_MSG_FLUSH 8
+struct rds_znotifier {
+ struct list_head z_list;
+ struct mmpin z_mmp;
+ u32 z_cookie;
+};
+
+#define RDS_ZCOPY_SKB(__skb) ((struct rds_znotifier *)&((__skb)->cb[0]))
+
+static inline struct sk_buff *rds_skb_from_znotifier(struct rds_znotifier *z)
+{
+ return container_of((void *)z, struct sk_buff, cb);
+}
+
struct rds_message {
refcount_t m_refcount;
struct list_head m_sock_item;
@@ -436,6 +449,7 @@ struct rds_message {
unsigned int op_count;
unsigned int op_dmasg;
unsigned int op_dmaoff;
+ struct rds_znotifier *op_mmp_znotifier;
struct scatterlist *op_sg;
} data;
};
@@ -588,6 +602,11 @@ struct rds_sock {
/* Socket receive path trace points*/
u8 rs_rx_traces;
u8 rs_rx_trace[RDS_MSG_RX_DGRAM_TRACE_MAX];
+
+ struct list_head rs_znotify_queue; /* zerocopy completion */
+ int rs_ncookies;
+ struct timer_list rs_cookie_timer;
+#define RDS_REAP_TIMEOUT ((HZ / 100) + 1)
};
static inline struct rds_sock *rds_sk_to_rs(const struct sock *sk)
@@ -785,6 +804,7 @@ int rds_message_next_extension(struct rds_header *hdr,
void rds_message_put(struct rds_message *rm);
void rds_message_wait(struct rds_message *rm);
void rds_message_unmapped(struct rds_message *rm);
+void rs_zcopy_notify(struct timer_list *t);
static inline void rds_message_make_checksum(struct rds_header *hdr)
{
diff --git a/net/rds/recv.c b/net/rds/recv.c
index b25bcfe..b080961 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -594,6 +594,8 @@ int rds_recvmsg(struct socket *sock, struct msghdr *msg, size_t size,
if (msg_flags & MSG_OOB)
goto out;
+ if (msg_flags & MSG_ERRQUEUE)
+ return sock_recv_errqueue(sk, msg, size, SOL_IP, IP_RECVERR);
while (1) {
/* If there are pending notifications, do those - and nothing else */
--
1.7.1
Powered by blists - more mailing lists