[<prev] [next>] [thread-next>] [day] [month] [year] [list]
Message-Id: <1551695569-3796-1-git-send-email-ka-cheong.poon@oracle.com>
Date: Mon, 4 Mar 2019 02:32:49 -0800
From: Ka-Cheong Poon <ka-cheong.poon@...cle.com>
To: netdev@...r.kernel.org
Cc: santosh.shilimkar@...cle.com
Subject: [PATCH UEK5-{Master,U2,U1},UEK4-QU7] rds: Add per peer RDS socket send buffer
Currently, an RDS socket's send buffer is shared by all the peers this
socket communicates with. When one or more peers are slow to receive,
their unacknowledged data can consume a large portion of the socket's
buffer. This will affect the communication with other peers of the
socket.
To resolve this issue, an RDS socket's send buffer is now per peer.
This works like opening multiple TCP sockets to different peers, each
peer has its own send buffer (in each socket). In RDS, each peer has
its own buffer space in the socket. With this per peer send buffer,
when one or more peers are slow, their data will not interfere with
data sent to other peers.
A sysctl parameter, sock_max_peers, is added to limit the number of
peers a socket can communicate with. The default is 8192. Its range
of valid value is 128 to 65536.
Orabug: 28314151
Tested-by: Rose Wang <rose.wang@...cle.com>
Signed-off-by: Ka-Cheong Poon <ka-cheong.poon@...cle.com>
---
net/rds/af_rds.c | 129 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
net/rds/rds.h | 49 +++++++++++++++++----
net/rds/recv.c | 6 +--
net/rds/send.c | 90 +++++++++++++++++++++++++-------------
net/rds/sysctl.c | 15 ++++++-
5 files changed, 244 insertions(+), 45 deletions(-)
diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
index 0b86de5..ad51714 100644
--- a/net/rds/af_rds.c
+++ b/net/rds/af_rds.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
@@ -80,6 +80,17 @@ char *rds_str_array(char **array, size_t elements, size_t index)
static LIST_HEAD(rds_sock_list);
DECLARE_WAIT_QUEUE_HEAD(rds_poll_waitq);
+/* kmem cache slab for struct rds_buf_info */
+static struct kmem_cache *rds_rs_buf_info_slab;
+
+/* Helper function to be passed to rhashtable_free_and_destroy() to free a
+ * struct rs_buf_info.
+ */
+static void rds_buf_info_free(void *rsbi, void *arg)
+{
+ kmem_cache_free(rds_rs_buf_info_slab, rsbi);
+}
+
/*
* This is called as the final descriptor referencing this socket is closed.
* We have to unbind the socket so that another socket can be bound to the
@@ -112,6 +123,9 @@ static int rds_release(struct socket *sock)
rds_rdma_drop_keys(rs);
rds_notify_queue_get(rs, NULL);
+ rhashtable_free_and_destroy(&rs->rs_buf_info_tbl, rds_buf_info_free,
+ NULL);
+
spin_lock_bh(&rds_sock_lock);
list_del_init(&rs->rs_item);
rds_sock_count--;
@@ -272,10 +286,18 @@ static unsigned int rds_poll(struct file *file, struct socket *sock,
if (!list_empty(&rs->rs_recv_queue)
|| !list_empty(&rs->rs_notify_queue))
mask |= (POLLIN | POLLRDNORM);
- if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
- mask |= (POLLOUT | POLLWRNORM);
read_unlock_irqrestore(&rs->rs_recv_lock, flags);
+ /* Use the number of destination this socket has to estimate the
+ * send buffer size. When there is no peer yet, return the default
+ * send buffer size.
+ */
+ spin_lock_irqsave(&rs->rs_snd_lock, flags);
+ if (rs->rs_snd_bytes < max(rs->rs_buf_info_dest_cnt, (u32)1) *
+ rds_sk_sndbuf(rs))
+ mask |= (POLLOUT | POLLWRNORM);
+ spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
+
/* clear state any time we wake a seen-congested socket */
if (mask)
rs->rs_seen_congestion = 0;
@@ -712,6 +734,80 @@ static int rds_getsockopt(struct socket *sock, int level, int optname,
}
+/* Check if there is a rs_buf_info associated with the given address. If not,
+ * add one to the rds_sock. The found or added rs_buf_info is returned. If
+ * there is no rs_buf_info found and a new rs_buf_info cannot be allocated,
+ * NULL is returned and ret is set to the error. Once an address' rs_buf_info
+ * is added, it will not be removed until the rs_sock is closed.
+ */
+struct rs_buf_info *rds_add_buf_info(struct rds_sock *rs, struct in6_addr *addr,
+ int *ret, gfp_t gfp)
+{
+ struct rs_buf_info *info, *tmp_info;
+ unsigned long flags;
+
+ /* Normal path, peer is expected to be found most of the time. */
+ rcu_read_lock();
+ info = rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
+ rs_buf_info_params);
+ if (info) {
+ rcu_read_unlock();
+ *ret = 0;
+ return info;
+ }
+ rcu_read_unlock();
+
+ /* Allocate the buffer outside of lock first. */
+ tmp_info = kmem_cache_alloc(rds_rs_buf_info_slab, gfp);
+ if (!tmp_info) {
+ *ret = -ENOMEM;
+ return NULL;
+ }
+
+ spin_lock_irqsave(&rs->rs_snd_lock, flags);
+
+ /* Cannot add more peer. */
+ if (rs->rs_buf_info_dest_cnt + 1 > rds_sock_max_peers) {
+ spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
+ *ret = -ENFILE;
+ return NULL;
+ }
+
+ tmp_info->rsbi_key = *addr;
+ tmp_info->rsbi_snd_bytes = 0;
+ *ret = rhashtable_insert_fast(&rs->rs_buf_info_tbl,
+ &tmp_info->rsbi_link, rs_buf_info_params);
+ if (!*ret) {
+ rs->rs_buf_info_dest_cnt++;
+ spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
+ return tmp_info;
+ } else if (*ret != -EEXIST) {
+ spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
+ kmem_cache_free(rds_rs_buf_info_slab, tmp_info);
+ /* Very unlikely to happen... */
+ printk(KERN_ERR "%s: cannot add rs_buf_info for %pI6c: %d\n",
+ __func__, addr, *ret);
+ return NULL;
+ }
+
+ /* Another thread beats us in adding the rs_buf_info.... */
+ info = rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
+ rs_buf_info_params);
+ spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
+ kmem_cache_free(rds_rs_buf_info_slab, tmp_info);
+
+ if (info) {
+ *ret = 0;
+ return info;
+ }
+
+ /* Should not happen... */
+ printk(KERN_ERR "%s: cannot find rs_buf_info for %pI6c\n", __func__,
+ addr);
+ *ret = -EINVAL;
+ return NULL;
+}
+
static int rds_connect(struct socket *sock, struct sockaddr *uaddr,
int addr_len, int flags)
{
@@ -800,6 +896,12 @@ static int rds_connect(struct socket *sock, struct sockaddr *uaddr,
break;
}
+ if (!ret &&
+ !rds_add_buf_info(rs, &rs->rs_conn_addr, &ret, GFP_KERNEL)) {
+ /* Need to clear the connected info in case of error. */
+ rs->rs_conn_addr = in6addr_any;
+ rs->rs_conn_port = 0;
+ }
release_sock(sk);
return ret;
}
@@ -842,6 +944,7 @@ static void rds_sock_destruct(struct sock *sk)
static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
{
struct rds_sock *rs;
+ int ret;
sock_init_data(sock, sk);
sock->ops = &rds_proto_ops;
@@ -863,6 +966,11 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
rs->rs_netfilter_enabled = 0;
rs->rs_rx_traces = 0;
+ spin_lock_init(&rs->rs_snd_lock);
+ ret = rhashtable_init(&rs->rs_buf_info_tbl, &rs_buf_info_params);
+ if (ret)
+ return ret;
+
if (!ipv6_addr_any(&rs->rs_bound_addr)) {
printk(KERN_CRIT "bound addr %pI6c at create\n",
&rs->rs_bound_addr);
@@ -879,6 +987,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
static int rds_create(struct net *net, struct socket *sock, int protocol, int kern)
{
struct sock *sk;
+ int ret;
if (sock->type != SOCK_SEQPACKET ||
(protocol && IPPROTO_OKA != protocol))
@@ -888,7 +997,10 @@ static int rds_create(struct net *net, struct socket *sock, int protocol, int ke
if (!sk)
return -ENOMEM;
- return __rds_create(sock, sk, protocol);
+ ret = __rds_create(sock, sk, protocol);
+ if (ret)
+ sk_free(sk);
+ return ret;
}
void debug_sock_hold(struct sock *sk)
@@ -1194,6 +1306,7 @@ static void __exit rds_exit(void)
rds_info_deregister_func(RDS6_INFO_SOCKETS, rds6_sock_info);
rds_info_deregister_func(RDS6_INFO_RECV_MESSAGES, rds6_sock_inc_info);
#endif
+ kmem_cache_destroy(rds_rs_buf_info_slab);
}
module_exit(rds_exit);
@@ -1204,6 +1317,14 @@ static int __init rds_init(void)
{
int ret;
+ rds_rs_buf_info_slab = kmem_cache_create("rds_rs_buf_info",
+ sizeof(struct rs_buf_info),
+ 0, SLAB_HWCACHE_ALIGN, NULL);
+ if (!rds_rs_buf_info_slab) {
+ ret = -ENOMEM;
+ goto out;
+ }
+
net_get_random_once(&rds_gen_num, sizeof(rds_gen_num));
rds_bind_lock_init();
diff --git a/net/rds/rds.h b/net/rds/rds.h
index 31ab639..435caa5 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -12,6 +12,7 @@
#include <uapi/linux/rds.h>
#include <linux/in6.h>
#include <linux/sizes.h>
+#include <linux/rhashtable.h>
#include "info.h"
@@ -725,6 +726,13 @@ struct rds_transport {
struct rdma_cm_event *event);
};
+/* Used to store per peer socket buffer info. */
+struct rs_buf_info {
+ struct in6_addr rsbi_key;
+ struct rhash_head rsbi_link;
+ u32 rsbi_snd_bytes;
+};
+
struct rds_sock {
struct sock rs_sk;
@@ -757,28 +765,34 @@ struct rds_sock {
/* seen congestion (ENOBUFS) when sending? */
int rs_seen_congestion;
- /* rs_lock protects all these adjacent members before the newline */
- spinlock_t rs_lock;
- struct list_head rs_send_queue;
- u32 rs_snd_bytes;
- int rs_rcv_bytes;
- struct list_head rs_notify_queue; /* currently used for failed RDMAs */
-
- /* Congestion wake_up. If rs_cong_monitor is set, we use cong_mask
+ /* rs_lock protects all these adjacent members before the newline.
+ *
+ * Congestion wake_up. If rs_cong_monitor is set, we use cong_mask
* to decide whether the application should be woken up.
* If not set, we use rs_cong_track to find out whether a cong map
* update arrived.
*/
+ spinlock_t rs_lock;
uint64_t rs_cong_mask;
uint64_t rs_cong_notify;
struct list_head rs_cong_list;
unsigned long rs_cong_track;
+ struct list_head rs_notify_queue; /* currently used for failed RDMAs */
+
+ /* rs_snd_lock protects all these adjacent members before the
+ * newline */
+ spinlock_t rs_snd_lock;
+ struct list_head rs_send_queue;
+ u32 rs_snd_bytes; /* Total bytes to all peers */
+ u32 rs_buf_info_dest_cnt;
+ struct rhashtable rs_buf_info_tbl;
/*
* rs_recv_lock protects the receive queue, and is
* used to serialize with rds_release.
*/
rwlock_t rs_recv_lock;
+ int rs_rcv_bytes;
struct list_head rs_recv_queue;
/* just for stats reporting */
@@ -867,6 +881,25 @@ struct rds_statistics {
};
/* af_rds.c */
+#define RDS_SOCK_BUF_INFO_HTBL_SIZE 512
+static const struct rhashtable_params rs_buf_info_params = {
+ .nelem_hint = RDS_SOCK_BUF_INFO_HTBL_SIZE,
+ .key_len = sizeof(struct in6_addr),
+ .key_offset = offsetof(struct rs_buf_info, rsbi_key),
+ .head_offset = offsetof(struct rs_buf_info, rsbi_link),
+};
+
+/* Maximum number of peers a socket can communicate with */
+extern unsigned int rds_sock_max_peers;
+
+struct rs_buf_info *rds_add_buf_info(struct rds_sock *, struct in6_addr *,
+ int *, gfp_t);
+static inline struct rs_buf_info *rds_get_buf_info(struct rds_sock *rs,
+ struct in6_addr *addr)
+{
+ return rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
+ rs_buf_info_params);
+}
char *rds_str_array(char **array, size_t elements, size_t index);
void rds_sock_addref(struct rds_sock *rs);
void rds_sock_put(struct rds_sock *rs);
diff --git a/net/rds/recv.c b/net/rds/recv.c
index 7138f75..4402b02 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
@@ -900,9 +900,9 @@ static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
if (err)
return err;
- spin_lock_irqsave(&rs->rs_lock, flags);
+ spin_lock_irqsave(&rs->rs_snd_lock, flags);
rs->rs_cong_notify &= ~notify;
- spin_unlock_irqrestore(&rs->rs_lock, flags);
+ spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
return 0;
}
diff --git a/net/rds/send.c b/net/rds/send.c
index 676f38b..1f1a54c 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
+ * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
@@ -548,10 +548,16 @@ int rds_send_xmit(struct rds_conn_path *cp)
static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
{
u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
+ struct rs_buf_info *bufi;
- assert_spin_locked(&rs->rs_lock);
+ assert_spin_locked(&rs->rs_snd_lock);
+ bufi = rds_get_buf_info(rs, &rm->m_daddr);
+ /* bufi cannot be NULL as an address's rs_buf_info is never deleted. */
+ BUG_ON(!bufi);
+ BUG_ON(bufi->rsbi_snd_bytes < len);
BUG_ON(rs->rs_snd_bytes < len);
+ bufi->rsbi_snd_bytes -= len;
rs->rs_snd_bytes -= len;
if (rs->rs_snd_bytes == 0)
@@ -764,11 +770,12 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
rs = rm->m_rs;
debug_sock_hold(rds_rs_to_sk(rs));
}
- spin_lock(&rs->rs_lock);
+ spin_lock(&rs->rs_snd_lock);
if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
list_del_init(&rm->m_sock_item);
rds_send_sndbuf_remove(rs, rm);
+ spin_unlock(&rs->rs_snd_lock);
if (rm->rdma.op_active && rm->rdma.op_notifier) {
struct rm_rdma_op *ro = &rm->rdma;
@@ -776,8 +783,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
if (ro->op_notify || status) {
notifier = ro->op_notifier;
+ spin_lock(&rs->rs_lock);
list_add_tail(¬ifier->n_list,
&rs->rs_notify_queue);
+ spin_unlock(&rs->rs_lock);
if (!notifier->n_status)
notifier->n_status = status;
} else
@@ -789,8 +798,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
if (ao->op_notify || status) {
notifier = ao->op_notifier;
+ spin_lock(&rs->rs_lock);
list_add_tail(¬ifier->n_list,
&rs->rs_notify_queue);
+ spin_unlock(&rs->rs_lock);
if (!notifier->n_status)
notifier->n_status = status;
} else
@@ -802,8 +813,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
if (so->op_notify || status) {
notifier = so->op_notifier;
+ spin_lock(&rs->rs_lock);
list_add_tail(¬ifier->n_list,
&rs->rs_notify_queue);
+ spin_unlock(&rs->rs_lock);
if (!notifier->n_status)
notifier->n_status = status;
} else
@@ -813,8 +826,8 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
was_on_sock = 1;
rm->m_rs = NULL;
- }
- spin_unlock(&rs->rs_lock);
+ } else
+ spin_unlock(&rs->rs_snd_lock);
unlock_and_drop:
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
@@ -885,8 +898,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
LIST_HEAD(list);
int conn_dropped = 0;
- /* get all the messages we're dropping under the rs lock */
- spin_lock_irqsave(&rs->rs_lock, flags);
+ /* get all the messages we're dropping under the rs_snd_lock */
+ spin_lock_irqsave(&rs->rs_snd_lock, flags);
list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
if (dest &&
@@ -899,10 +912,10 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
}
- /* order flag updates with the rs lock */
+ /* order flag updates with the rs_snd_lock */
smp_mb__after_atomic();
- spin_unlock_irqrestore(&rs->rs_lock, flags);
+ spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
if (list_empty(&list))
return;
@@ -935,9 +948,9 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
*/
spin_lock_irqsave(&rm->m_rs_lock, flags);
- spin_lock(&rs->rs_lock);
+ spin_lock(&rs->rs_snd_lock);
__rds_send_complete(rs, rm, RDS_RDMA_SEND_CANCELED);
- spin_unlock(&rs->rs_lock);
+ spin_unlock(&rs->rs_snd_lock);
rm->m_rs = NULL;
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
@@ -970,9 +983,9 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
*/
spin_lock_irqsave(&rm->m_rs_lock, flags);
- spin_lock(&rs->rs_lock);
+ spin_lock(&rs->rs_snd_lock);
__rds_send_complete(rs, rm, RDS_RDMA_SEND_CANCELED);
- spin_unlock(&rs->rs_lock);
+ spin_unlock(&rs->rs_snd_lock);
rm->m_rs = NULL;
spin_unlock_irqrestore(&rm->m_rs_lock, flags);
@@ -989,7 +1002,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
struct rds_conn_path *cp,
struct rds_message *rm, __be16 sport,
- __be16 dport, int *queued)
+ __be16 dport, int *queued,
+ struct rs_buf_info *bufi)
{
unsigned long flags;
u32 len;
@@ -999,9 +1013,9 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
- /* this is the only place which holds both the socket's rs_lock
+ /* this is the only place which holds both the socket's rs_snd_lock
* and the connection's c_lock */
- spin_lock_irqsave(&rs->rs_lock, flags);
+ spin_lock_irqsave(&rs->rs_snd_lock, flags);
/*
* If there is a little space in sndbuf, we don't queue anything,
@@ -1011,7 +1025,10 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
* rs_snd_bytes here to allow the last msg to exceed the buffer,
* and poll() now knows no more data can be sent.
*/
- if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
+ if (bufi->rsbi_snd_bytes < rds_sk_sndbuf(rs)) {
+ bufi->rsbi_snd_bytes += len;
+
+ /* Record the total number of snd_bytes of all peers. */
rs->rs_snd_bytes += len;
/* let recv side know we are close to send space exhaustion.
@@ -1019,7 +1036,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
* means we set the flag on *all* messages as soon as our
* throughput hits a certain threshold.
*/
- if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
+ if (bufi->rsbi_snd_bytes >= rds_sk_sndbuf(rs) / 2)
set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
@@ -1037,7 +1054,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
spin_lock(&cp->cp_lock);
if (cp->cp_pending_flush) {
spin_unlock(&cp->cp_lock);
- spin_unlock_irqrestore(&rs->rs_lock, flags);
+ spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
goto out;
}
rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
@@ -1046,14 +1063,14 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
spin_unlock(&cp->cp_lock);
- rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
- rm, len, rs, rs->rs_snd_bytes,
+ rdsdebug("queued msg %p len %d, rs %p bytes %u (%u) seq %llu\n",
+ rm, len, rs, rs->rs_snd_bytes, bufi->rsbi_snd_bytes,
(unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
*queued = 1;
}
- spin_unlock_irqrestore(&rs->rs_lock, flags);
+ spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
out:
return *queued;
}
@@ -1257,6 +1274,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
long timeo = sock_sndtimeo(sk, nonblock);
size_t total_payload_len = payload_len, rdma_payload_len = 0;
struct rds_conn_path *cpath;
+ struct rs_buf_info *bufi;
struct in6_addr daddr;
__u32 scope_id = 0;
int namelen;
@@ -1395,19 +1413,25 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
goto out;
}
+
+ bufi = rds_add_buf_info(rs, &daddr, &ret, GFP_KERNEL);
+ if (!bufi)
+ goto out;
+
/*
* Avoid copying the message from user-space if we already
* know there's no space in the send buffer.
* The check is a negated version of the condition used inside
- * function "rds_send_queue_rm": "if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))",
+ * function "rds_send_queue_rm":
+ * "if (bufi->rsbi_snd_bytes < rds_sk_sndbuf(rs))",
* which needs some reconsideration, as it unexpectedly checks
* if half of the send-buffer space is available, instead of
* checking if the given message would fit.
*/
if (nonblock) {
- spin_lock_irqsave(&rs->rs_lock, flags);
- no_space = rs->rs_snd_bytes >= rds_sk_sndbuf(rs);
- spin_unlock_irqrestore(&rs->rs_lock, flags);
+ spin_lock_irqsave(&rs->rs_snd_lock, flags);
+ no_space = bufi->rsbi_snd_bytes >= rds_sk_sndbuf(rs);
+ spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
if (no_space) {
ret = -EAGAIN;
goto out;
@@ -1525,7 +1549,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
}
while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
- dport, &queued)) {
+ dport, &queued, bufi)) {
rds_stats_inc(s_send_queue_full);
if (nonblock) {
@@ -1541,7 +1565,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
rds_send_queue_rm(rs, conn, cpath, rm,
rs->rs_bound_port,
dport,
- &queued),
+ &queued, bufi),
timeo);
rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
@@ -1591,6 +1615,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs,
struct sk_buff *skb, gfp_t gfp)
{
+ struct rs_buf_info *bufi;
struct rds_nf_hdr *dst;
struct rds_message *rm = NULL;
struct scatterlist *sg;
@@ -1609,6 +1634,13 @@ int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs,
if (ret < 0)
goto out;
+ bufi = rds_add_buf_info(rs, &dst->daddr, &ret, gfp);
+ if (!bufi) {
+ rds_rtd(RDS_RTD_ERR, "failed to allocate rs %p sbuf to %pI6c",
+ rs, &dst->daddr);
+ goto out;
+ }
+
/* create ourselves a new message to send out the data */
rm = rds_message_alloc(ret, gfp);
if (!rm) {
@@ -1677,7 +1709,7 @@ int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs,
/* only take a single pass */
if (!rds_send_queue_rm(rs, conn, &conn->c_path[0], rm,
- rs->rs_bound_port, dst->dport, &queued)) {
+ rs->rs_bound_port, dst->dport, &queued, bufi)) {
rds_rtd(RDS_RTD_SND, "cannot block on internal send rs %p", rs);
rds_stats_inc(s_send_queue_full);
diff --git a/net/rds/sysctl.c b/net/rds/sysctl.c
index b22e8b8..21bc056e 100644
--- a/net/rds/sysctl.c
+++ b/net/rds/sysctl.c
@@ -1,5 +1,5 @@
/*
- * Copyright (c) 2006 Oracle. All rights reserved.
+ * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
*
* This software is available to you under a choice of one of two
* licenses. You may choose to be licensed under the terms of the GNU
@@ -52,6 +52,10 @@
unsigned int rds_sysctl_shutdown_trace_start_time;
unsigned int rds_sysctl_shutdown_trace_end_time;
+unsigned int rds_sock_max_peers_min = 128;
+unsigned int rds_sock_max_peers_max = 65536;
+unsigned int rds_sock_max_peers = 8192;
+
/*
* We have official values, but must maintain the sysctl interface for existing
* software that expects to find these values here.
@@ -127,6 +131,15 @@
.mode = 0644,
.proc_handler = &proc_dointvec,
},
+ {
+ .procname = "sock_max_peers",
+ .data = &rds_sock_max_peers,
+ .maxlen = sizeof(int),
+ .mode = 0644,
+ .proc_handler = &proc_dointvec_minmax,
+ .extra1 = &rds_sock_max_peers_min,
+ .extra2 = &rds_sock_max_peers_max
+ },
{ }
};
--
1.8.3.1
Powered by blists - more mailing lists