[<prev] [next>] [<thread-prev] [day] [month] [year] [list]
Message-ID: <a68fec8a-cede-448a-1f7b-024047e7d73d@oracle.com>
Date: Mon, 4 Mar 2019 18:35:55 +0800
From: Ka-Cheong Poon <ka-cheong.poon@...cle.com>
To: netdev@...r.kernel.org
Cc: santosh.shilimkar@...cle.com
Subject: Re: [PATCH UEK5-{Master,U2,U1},UEK4-QU7] rds: Add per peer RDS socket
send buffer
Please ignore this review request. It was sent to the
wrong alias. I'm sorry about this.
On 3/4/19 6:32 PM, Ka-Cheong Poon wrote:
> Currently, an RDS socket's send buffer is shared by all the peers this
> socket communicates with. When one or more peers are slow to receive,
> their unacknowledged data can consume a large portion of the socket's
> buffer. This will affect the communication with other peers of the
> socket.
>
> To resolve this issue, an RDS socket's send buffer is now per peer.
> This works like opening multiple TCP sockets to different peers, each
> peer has its own send buffer (in each socket). In RDS, each peer has
> its own buffer space in the socket. With this per peer send buffer,
> when one or more peers are slow, their data will not interfere with
> data sent to other peers.
>
> A sysctl parameter, sock_max_peers, is added to limit the number of
> peers a socket can communicate with. The default is 8192. Its range
> of valid value is 128 to 65536.
>
> Orabug: 28314151
>
> Tested-by: Rose Wang <rose.wang@...cle.com>
> Signed-off-by: Ka-Cheong Poon <ka-cheong.poon@...cle.com>
> ---
> net/rds/af_rds.c | 129 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
> net/rds/rds.h | 49 +++++++++++++++++----
> net/rds/recv.c | 6 +--
> net/rds/send.c | 90 +++++++++++++++++++++++++-------------
> net/rds/sysctl.c | 15 ++++++-
> 5 files changed, 244 insertions(+), 45 deletions(-)
>
> diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
> index 0b86de5..ad51714 100644
> --- a/net/rds/af_rds.c
> +++ b/net/rds/af_rds.c
> @@ -1,5 +1,5 @@
> /*
> - * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
> + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
> *
> * This software is available to you under a choice of one of two
> * licenses. You may choose to be licensed under the terms of the GNU
> @@ -80,6 +80,17 @@ char *rds_str_array(char **array, size_t elements, size_t index)
> static LIST_HEAD(rds_sock_list);
> DECLARE_WAIT_QUEUE_HEAD(rds_poll_waitq);
>
> +/* kmem cache slab for struct rds_buf_info */
> +static struct kmem_cache *rds_rs_buf_info_slab;
> +
> +/* Helper function to be passed to rhashtable_free_and_destroy() to free a
> + * struct rs_buf_info.
> + */
> +static void rds_buf_info_free(void *rsbi, void *arg)
> +{
> + kmem_cache_free(rds_rs_buf_info_slab, rsbi);
> +}
> +
> /*
> * This is called as the final descriptor referencing this socket is closed.
> * We have to unbind the socket so that another socket can be bound to the
> @@ -112,6 +123,9 @@ static int rds_release(struct socket *sock)
> rds_rdma_drop_keys(rs);
> rds_notify_queue_get(rs, NULL);
>
> + rhashtable_free_and_destroy(&rs->rs_buf_info_tbl, rds_buf_info_free,
> + NULL);
> +
> spin_lock_bh(&rds_sock_lock);
> list_del_init(&rs->rs_item);
> rds_sock_count--;
> @@ -272,10 +286,18 @@ static unsigned int rds_poll(struct file *file, struct socket *sock,
> if (!list_empty(&rs->rs_recv_queue)
> || !list_empty(&rs->rs_notify_queue))
> mask |= (POLLIN | POLLRDNORM);
> - if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
> - mask |= (POLLOUT | POLLWRNORM);
> read_unlock_irqrestore(&rs->rs_recv_lock, flags);
>
> + /* Use the number of destination this socket has to estimate the
> + * send buffer size. When there is no peer yet, return the default
> + * send buffer size.
> + */
> + spin_lock_irqsave(&rs->rs_snd_lock, flags);
> + if (rs->rs_snd_bytes < max(rs->rs_buf_info_dest_cnt, (u32)1) *
> + rds_sk_sndbuf(rs))
> + mask |= (POLLOUT | POLLWRNORM);
> + spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> +
> /* clear state any time we wake a seen-congested socket */
> if (mask)
> rs->rs_seen_congestion = 0;
> @@ -712,6 +734,80 @@ static int rds_getsockopt(struct socket *sock, int level, int optname,
>
> }
>
> +/* Check if there is a rs_buf_info associated with the given address. If not,
> + * add one to the rds_sock. The found or added rs_buf_info is returned. If
> + * there is no rs_buf_info found and a new rs_buf_info cannot be allocated,
> + * NULL is returned and ret is set to the error. Once an address' rs_buf_info
> + * is added, it will not be removed until the rs_sock is closed.
> + */
> +struct rs_buf_info *rds_add_buf_info(struct rds_sock *rs, struct in6_addr *addr,
> + int *ret, gfp_t gfp)
> +{
> + struct rs_buf_info *info, *tmp_info;
> + unsigned long flags;
> +
> + /* Normal path, peer is expected to be found most of the time. */
> + rcu_read_lock();
> + info = rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
> + rs_buf_info_params);
> + if (info) {
> + rcu_read_unlock();
> + *ret = 0;
> + return info;
> + }
> + rcu_read_unlock();
> +
> + /* Allocate the buffer outside of lock first. */
> + tmp_info = kmem_cache_alloc(rds_rs_buf_info_slab, gfp);
> + if (!tmp_info) {
> + *ret = -ENOMEM;
> + return NULL;
> + }
> +
> + spin_lock_irqsave(&rs->rs_snd_lock, flags);
> +
> + /* Cannot add more peer. */
> + if (rs->rs_buf_info_dest_cnt + 1 > rds_sock_max_peers) {
> + spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> + *ret = -ENFILE;
> + return NULL;
> + }
> +
> + tmp_info->rsbi_key = *addr;
> + tmp_info->rsbi_snd_bytes = 0;
> + *ret = rhashtable_insert_fast(&rs->rs_buf_info_tbl,
> + &tmp_info->rsbi_link, rs_buf_info_params);
> + if (!*ret) {
> + rs->rs_buf_info_dest_cnt++;
> + spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> + return tmp_info;
> + } else if (*ret != -EEXIST) {
> + spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> + kmem_cache_free(rds_rs_buf_info_slab, tmp_info);
> + /* Very unlikely to happen... */
> + printk(KERN_ERR "%s: cannot add rs_buf_info for %pI6c: %d\n",
> + __func__, addr, *ret);
> + return NULL;
> + }
> +
> + /* Another thread beats us in adding the rs_buf_info.... */
> + info = rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
> + rs_buf_info_params);
> + spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> + kmem_cache_free(rds_rs_buf_info_slab, tmp_info);
> +
> + if (info) {
> + *ret = 0;
> + return info;
> + }
> +
> + /* Should not happen... */
> + printk(KERN_ERR "%s: cannot find rs_buf_info for %pI6c\n", __func__,
> + addr);
> + *ret = -EINVAL;
> + return NULL;
> +}
> +
> static int rds_connect(struct socket *sock, struct sockaddr *uaddr,
> int addr_len, int flags)
> {
> @@ -800,6 +896,12 @@ static int rds_connect(struct socket *sock, struct sockaddr *uaddr,
> break;
> }
>
> + if (!ret &&
> + !rds_add_buf_info(rs, &rs->rs_conn_addr, &ret, GFP_KERNEL)) {
> + /* Need to clear the connected info in case of error. */
> + rs->rs_conn_addr = in6addr_any;
> + rs->rs_conn_port = 0;
> + }
> release_sock(sk);
> return ret;
> }
> @@ -842,6 +944,7 @@ static void rds_sock_destruct(struct sock *sk)
> static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
> {
> struct rds_sock *rs;
> + int ret;
>
> sock_init_data(sock, sk);
> sock->ops = &rds_proto_ops;
> @@ -863,6 +966,11 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
> rs->rs_netfilter_enabled = 0;
> rs->rs_rx_traces = 0;
>
> + spin_lock_init(&rs->rs_snd_lock);
> + ret = rhashtable_init(&rs->rs_buf_info_tbl, &rs_buf_info_params);
> + if (ret)
> + return ret;
> +
> if (!ipv6_addr_any(&rs->rs_bound_addr)) {
> printk(KERN_CRIT "bound addr %pI6c at create\n",
> &rs->rs_bound_addr);
> @@ -879,6 +987,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
> static int rds_create(struct net *net, struct socket *sock, int protocol, int kern)
> {
> struct sock *sk;
> + int ret;
>
> if (sock->type != SOCK_SEQPACKET ||
> (protocol && IPPROTO_OKA != protocol))
> @@ -888,7 +997,10 @@ static int rds_create(struct net *net, struct socket *sock, int protocol, int ke
> if (!sk)
> return -ENOMEM;
>
> - return __rds_create(sock, sk, protocol);
> + ret = __rds_create(sock, sk, protocol);
> + if (ret)
> + sk_free(sk);
> + return ret;
> }
>
> void debug_sock_hold(struct sock *sk)
> @@ -1194,6 +1306,7 @@ static void __exit rds_exit(void)
> rds_info_deregister_func(RDS6_INFO_SOCKETS, rds6_sock_info);
> rds_info_deregister_func(RDS6_INFO_RECV_MESSAGES, rds6_sock_inc_info);
> #endif
> + kmem_cache_destroy(rds_rs_buf_info_slab);
> }
>
> module_exit(rds_exit);
> @@ -1204,6 +1317,14 @@ static int __init rds_init(void)
> {
> int ret;
>
> + rds_rs_buf_info_slab = kmem_cache_create("rds_rs_buf_info",
> + sizeof(struct rs_buf_info),
> + 0, SLAB_HWCACHE_ALIGN, NULL);
> + if (!rds_rs_buf_info_slab) {
> + ret = -ENOMEM;
> + goto out;
> + }
> +
> net_get_random_once(&rds_gen_num, sizeof(rds_gen_num));
>
> rds_bind_lock_init();
> diff --git a/net/rds/rds.h b/net/rds/rds.h
> index 31ab639..435caa5 100644
> --- a/net/rds/rds.h
> +++ b/net/rds/rds.h
> @@ -12,6 +12,7 @@
> #include <uapi/linux/rds.h>
> #include <linux/in6.h>
> #include <linux/sizes.h>
> +#include <linux/rhashtable.h>
>
> #include "info.h"
>
> @@ -725,6 +726,13 @@ struct rds_transport {
> struct rdma_cm_event *event);
> };
>
> +/* Used to store per peer socket buffer info. */
> +struct rs_buf_info {
> + struct in6_addr rsbi_key;
> + struct rhash_head rsbi_link;
> + u32 rsbi_snd_bytes;
> +};
> +
> struct rds_sock {
> struct sock rs_sk;
>
> @@ -757,28 +765,34 @@ struct rds_sock {
> /* seen congestion (ENOBUFS) when sending? */
> int rs_seen_congestion;
>
> - /* rs_lock protects all these adjacent members before the newline */
> - spinlock_t rs_lock;
> - struct list_head rs_send_queue;
> - u32 rs_snd_bytes;
> - int rs_rcv_bytes;
> - struct list_head rs_notify_queue; /* currently used for failed RDMAs */
> -
> - /* Congestion wake_up. If rs_cong_monitor is set, we use cong_mask
> + /* rs_lock protects all these adjacent members before the newline.
> + *
> + * Congestion wake_up. If rs_cong_monitor is set, we use cong_mask
> * to decide whether the application should be woken up.
> * If not set, we use rs_cong_track to find out whether a cong map
> * update arrived.
> */
> + spinlock_t rs_lock;
> uint64_t rs_cong_mask;
> uint64_t rs_cong_notify;
> struct list_head rs_cong_list;
> unsigned long rs_cong_track;
> + struct list_head rs_notify_queue; /* currently used for failed RDMAs */
> +
> + /* rs_snd_lock protects all these adjacent members before the
> + * newline */
> + spinlock_t rs_snd_lock;
> + struct list_head rs_send_queue;
> + u32 rs_snd_bytes; /* Total bytes to all peers */
> + u32 rs_buf_info_dest_cnt;
> + struct rhashtable rs_buf_info_tbl;
>
> /*
> * rs_recv_lock protects the receive queue, and is
> * used to serialize with rds_release.
> */
> rwlock_t rs_recv_lock;
> + int rs_rcv_bytes;
> struct list_head rs_recv_queue;
>
> /* just for stats reporting */
> @@ -867,6 +881,25 @@ struct rds_statistics {
> };
>
> /* af_rds.c */
> +#define RDS_SOCK_BUF_INFO_HTBL_SIZE 512
> +static const struct rhashtable_params rs_buf_info_params = {
> + .nelem_hint = RDS_SOCK_BUF_INFO_HTBL_SIZE,
> + .key_len = sizeof(struct in6_addr),
> + .key_offset = offsetof(struct rs_buf_info, rsbi_key),
> + .head_offset = offsetof(struct rs_buf_info, rsbi_link),
> +};
> +
> +/* Maximum number of peers a socket can communicate with */
> +extern unsigned int rds_sock_max_peers;
> +
> +struct rs_buf_info *rds_add_buf_info(struct rds_sock *, struct in6_addr *,
> + int *, gfp_t);
> +static inline struct rs_buf_info *rds_get_buf_info(struct rds_sock *rs,
> + struct in6_addr *addr)
> +{
> + return rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
> + rs_buf_info_params);
> +}
> char *rds_str_array(char **array, size_t elements, size_t index);
> void rds_sock_addref(struct rds_sock *rs);
> void rds_sock_put(struct rds_sock *rs);
> diff --git a/net/rds/recv.c b/net/rds/recv.c
> index 7138f75..4402b02 100644
> --- a/net/rds/recv.c
> +++ b/net/rds/recv.c
> @@ -1,5 +1,5 @@
> /*
> - * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
> + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
> *
> * This software is available to you under a choice of one of two
> * licenses. You may choose to be licensed under the terms of the GNU
> @@ -900,9 +900,9 @@ static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
> if (err)
> return err;
>
> - spin_lock_irqsave(&rs->rs_lock, flags);
> + spin_lock_irqsave(&rs->rs_snd_lock, flags);
> rs->rs_cong_notify &= ~notify;
> - spin_unlock_irqrestore(&rs->rs_lock, flags);
> + spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
>
> return 0;
> }
> diff --git a/net/rds/send.c b/net/rds/send.c
> index 676f38b..1f1a54c 100644
> --- a/net/rds/send.c
> +++ b/net/rds/send.c
> @@ -1,5 +1,5 @@
> /*
> - * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
> + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
> *
> * This software is available to you under a choice of one of two
> * licenses. You may choose to be licensed under the terms of the GNU
> @@ -548,10 +548,16 @@ int rds_send_xmit(struct rds_conn_path *cp)
> static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
> {
> u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
> + struct rs_buf_info *bufi;
>
> - assert_spin_locked(&rs->rs_lock);
> + assert_spin_locked(&rs->rs_snd_lock);
>
> + bufi = rds_get_buf_info(rs, &rm->m_daddr);
> + /* bufi cannot be NULL as an address's rs_buf_info is never deleted. */
> + BUG_ON(!bufi);
> + BUG_ON(bufi->rsbi_snd_bytes < len);
> BUG_ON(rs->rs_snd_bytes < len);
> + bufi->rsbi_snd_bytes -= len;
> rs->rs_snd_bytes -= len;
>
> if (rs->rs_snd_bytes == 0)
> @@ -764,11 +770,12 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
> rs = rm->m_rs;
> debug_sock_hold(rds_rs_to_sk(rs));
> }
> - spin_lock(&rs->rs_lock);
>
> + spin_lock(&rs->rs_snd_lock);
> if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
> list_del_init(&rm->m_sock_item);
> rds_send_sndbuf_remove(rs, rm);
> + spin_unlock(&rs->rs_snd_lock);
>
> if (rm->rdma.op_active && rm->rdma.op_notifier) {
> struct rm_rdma_op *ro = &rm->rdma;
> @@ -776,8 +783,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
>
> if (ro->op_notify || status) {
> notifier = ro->op_notifier;
> + spin_lock(&rs->rs_lock);
> list_add_tail(¬ifier->n_list,
> &rs->rs_notify_queue);
> + spin_unlock(&rs->rs_lock);
> if (!notifier->n_status)
> notifier->n_status = status;
> } else
> @@ -789,8 +798,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
>
> if (ao->op_notify || status) {
> notifier = ao->op_notifier;
> + spin_lock(&rs->rs_lock);
> list_add_tail(¬ifier->n_list,
> &rs->rs_notify_queue);
> + spin_unlock(&rs->rs_lock);
> if (!notifier->n_status)
> notifier->n_status = status;
> } else
> @@ -802,8 +813,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
>
> if (so->op_notify || status) {
> notifier = so->op_notifier;
> + spin_lock(&rs->rs_lock);
> list_add_tail(¬ifier->n_list,
> &rs->rs_notify_queue);
> + spin_unlock(&rs->rs_lock);
> if (!notifier->n_status)
> notifier->n_status = status;
> } else
> @@ -813,8 +826,8 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
>
> was_on_sock = 1;
> rm->m_rs = NULL;
> - }
> - spin_unlock(&rs->rs_lock);
> + } else
> + spin_unlock(&rs->rs_snd_lock);
>
> unlock_and_drop:
> spin_unlock_irqrestore(&rm->m_rs_lock, flags);
> @@ -885,8 +898,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
> LIST_HEAD(list);
> int conn_dropped = 0;
>
> - /* get all the messages we're dropping under the rs lock */
> - spin_lock_irqsave(&rs->rs_lock, flags);
> + /* get all the messages we're dropping under the rs_snd_lock */
> + spin_lock_irqsave(&rs->rs_snd_lock, flags);
>
> list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
> if (dest &&
> @@ -899,10 +912,10 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
> clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
> }
>
> - /* order flag updates with the rs lock */
> + /* order flag updates with the rs_snd_lock */
> smp_mb__after_atomic();
>
> - spin_unlock_irqrestore(&rs->rs_lock, flags);
> + spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
>
> if (list_empty(&list))
> return;
> @@ -935,9 +948,9 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
> */
> spin_lock_irqsave(&rm->m_rs_lock, flags);
>
> - spin_lock(&rs->rs_lock);
> + spin_lock(&rs->rs_snd_lock);
> __rds_send_complete(rs, rm, RDS_RDMA_SEND_CANCELED);
> - spin_unlock(&rs->rs_lock);
> + spin_unlock(&rs->rs_snd_lock);
>
> rm->m_rs = NULL;
> spin_unlock_irqrestore(&rm->m_rs_lock, flags);
> @@ -970,9 +983,9 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
> */
> spin_lock_irqsave(&rm->m_rs_lock, flags);
>
> - spin_lock(&rs->rs_lock);
> + spin_lock(&rs->rs_snd_lock);
> __rds_send_complete(rs, rm, RDS_RDMA_SEND_CANCELED);
> - spin_unlock(&rs->rs_lock);
> + spin_unlock(&rs->rs_snd_lock);
>
> rm->m_rs = NULL;
> spin_unlock_irqrestore(&rm->m_rs_lock, flags);
> @@ -989,7 +1002,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
> static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
> struct rds_conn_path *cp,
> struct rds_message *rm, __be16 sport,
> - __be16 dport, int *queued)
> + __be16 dport, int *queued,
> + struct rs_buf_info *bufi)
> {
> unsigned long flags;
> u32 len;
> @@ -999,9 +1013,9 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
>
> len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
>
> - /* this is the only place which holds both the socket's rs_lock
> + /* this is the only place which holds both the socket's rs_snd_lock
> * and the connection's c_lock */
> - spin_lock_irqsave(&rs->rs_lock, flags);
> + spin_lock_irqsave(&rs->rs_snd_lock, flags);
>
> /*
> * If there is a little space in sndbuf, we don't queue anything,
> @@ -1011,7 +1025,10 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
> * rs_snd_bytes here to allow the last msg to exceed the buffer,
> * and poll() now knows no more data can be sent.
> */
> - if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
> + if (bufi->rsbi_snd_bytes < rds_sk_sndbuf(rs)) {
> + bufi->rsbi_snd_bytes += len;
> +
> + /* Record the total number of snd_bytes of all peers. */
> rs->rs_snd_bytes += len;
>
> /* let recv side know we are close to send space exhaustion.
> @@ -1019,7 +1036,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
> * means we set the flag on *all* messages as soon as our
> * throughput hits a certain threshold.
> */
> - if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
> + if (bufi->rsbi_snd_bytes >= rds_sk_sndbuf(rs) / 2)
> set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
>
> list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
> @@ -1037,7 +1054,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
> spin_lock(&cp->cp_lock);
> if (cp->cp_pending_flush) {
> spin_unlock(&cp->cp_lock);
> - spin_unlock_irqrestore(&rs->rs_lock, flags);
> + spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> goto out;
> }
> rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
> @@ -1046,14 +1063,14 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
>
> spin_unlock(&cp->cp_lock);
>
> - rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
> - rm, len, rs, rs->rs_snd_bytes,
> + rdsdebug("queued msg %p len %d, rs %p bytes %u (%u) seq %llu\n",
> + rm, len, rs, rs->rs_snd_bytes, bufi->rsbi_snd_bytes,
> (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
>
> *queued = 1;
> }
>
> - spin_unlock_irqrestore(&rs->rs_lock, flags);
> + spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> out:
> return *queued;
> }
> @@ -1257,6 +1274,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
> long timeo = sock_sndtimeo(sk, nonblock);
> size_t total_payload_len = payload_len, rdma_payload_len = 0;
> struct rds_conn_path *cpath;
> + struct rs_buf_info *bufi;
> struct in6_addr daddr;
> __u32 scope_id = 0;
> int namelen;
> @@ -1395,19 +1413,25 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
> goto out;
> }
>
> +
> + bufi = rds_add_buf_info(rs, &daddr, &ret, GFP_KERNEL);
> + if (!bufi)
> + goto out;
> +
> /*
> * Avoid copying the message from user-space if we already
> * know there's no space in the send buffer.
> * The check is a negated version of the condition used inside
> - * function "rds_send_queue_rm": "if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))",
> + * function "rds_send_queue_rm":
> + * "if (bufi->rsbi_snd_bytes < rds_sk_sndbuf(rs))",
> * which needs some reconsideration, as it unexpectedly checks
> * if half of the send-buffer space is available, instead of
> * checking if the given message would fit.
> */
> if (nonblock) {
> - spin_lock_irqsave(&rs->rs_lock, flags);
> - no_space = rs->rs_snd_bytes >= rds_sk_sndbuf(rs);
> - spin_unlock_irqrestore(&rs->rs_lock, flags);
> + spin_lock_irqsave(&rs->rs_snd_lock, flags);
> + no_space = bufi->rsbi_snd_bytes >= rds_sk_sndbuf(rs);
> + spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> if (no_space) {
> ret = -EAGAIN;
> goto out;
> @@ -1525,7 +1549,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
> }
>
> while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
> - dport, &queued)) {
> + dport, &queued, bufi)) {
> rds_stats_inc(s_send_queue_full);
>
> if (nonblock) {
> @@ -1541,7 +1565,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
> rds_send_queue_rm(rs, conn, cpath, rm,
> rs->rs_bound_port,
> dport,
> - &queued),
> + &queued, bufi),
> timeo);
> rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
> if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
> @@ -1591,6 +1615,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
> int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs,
> struct sk_buff *skb, gfp_t gfp)
> {
> + struct rs_buf_info *bufi;
> struct rds_nf_hdr *dst;
> struct rds_message *rm = NULL;
> struct scatterlist *sg;
> @@ -1609,6 +1634,13 @@ int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs,
> if (ret < 0)
> goto out;
>
> + bufi = rds_add_buf_info(rs, &dst->daddr, &ret, gfp);
> + if (!bufi) {
> + rds_rtd(RDS_RTD_ERR, "failed to allocate rs %p sbuf to %pI6c",
> + rs, &dst->daddr);
> + goto out;
> + }
> +
> /* create ourselves a new message to send out the data */
> rm = rds_message_alloc(ret, gfp);
> if (!rm) {
> @@ -1677,7 +1709,7 @@ int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs,
>
> /* only take a single pass */
> if (!rds_send_queue_rm(rs, conn, &conn->c_path[0], rm,
> - rs->rs_bound_port, dst->dport, &queued)) {
> + rs->rs_bound_port, dst->dport, &queued, bufi)) {
> rds_rtd(RDS_RTD_SND, "cannot block on internal send rs %p", rs);
> rds_stats_inc(s_send_queue_full);
>
> diff --git a/net/rds/sysctl.c b/net/rds/sysctl.c
> index b22e8b8..21bc056e 100644
> --- a/net/rds/sysctl.c
> +++ b/net/rds/sysctl.c
> @@ -1,5 +1,5 @@
> /*
> - * Copyright (c) 2006 Oracle. All rights reserved.
> + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
> *
> * This software is available to you under a choice of one of two
> * licenses. You may choose to be licensed under the terms of the GNU
> @@ -52,6 +52,10 @@
> unsigned int rds_sysctl_shutdown_trace_start_time;
> unsigned int rds_sysctl_shutdown_trace_end_time;
>
> +unsigned int rds_sock_max_peers_min = 128;
> +unsigned int rds_sock_max_peers_max = 65536;
> +unsigned int rds_sock_max_peers = 8192;
> +
> /*
> * We have official values, but must maintain the sysctl interface for existing
> * software that expects to find these values here.
> @@ -127,6 +131,15 @@
> .mode = 0644,
> .proc_handler = &proc_dointvec,
> },
> + {
> + .procname = "sock_max_peers",
> + .data = &rds_sock_max_peers,
> + .maxlen = sizeof(int),
> + .mode = 0644,
> + .proc_handler = &proc_dointvec_minmax,
> + .extra1 = &rds_sock_max_peers_min,
> + .extra2 = &rds_sock_max_peers_max
> + },
> { }
> };
>
>
--
K. Poon
ka-cheong.poon@...cle.com
Powered by blists - more mailing lists