lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [day] [month] [year] [list]
Message-ID: <a68fec8a-cede-448a-1f7b-024047e7d73d@oracle.com>
Date:   Mon, 4 Mar 2019 18:35:55 +0800
From:   Ka-Cheong Poon <ka-cheong.poon@...cle.com>
To:     netdev@...r.kernel.org
Cc:     santosh.shilimkar@...cle.com
Subject: Re: [PATCH UEK5-{Master,U2,U1},UEK4-QU7] rds: Add per peer RDS socket
 send buffer

Please ignore this review request.  It was sent to the
wrong alias.  I'm sorry about this.



On 3/4/19 6:32 PM, Ka-Cheong Poon wrote:
> Currently, an RDS socket's send buffer is shared by all the peers this
> socket communicates with.  When one or more peers are slow to receive,
> their unacknowledged data can consume a large portion of the socket's
> buffer.  This will affect the communication with other peers of the
> socket.
> 
> To resolve this issue, an RDS socket's send buffer is now per peer.
> This works like opening multiple TCP sockets to different peers, each
> peer has its own send buffer (in each socket).  In RDS, each peer has
> its own buffer space in the socket.  With this per peer send buffer,
> when one or more peers are slow, their data will not interfere with
> data sent to other peers.
> 
> A sysctl parameter, sock_max_peers, is added to limit the number of
> peers a socket can communicate with.  The default is 8192.  Its range
> of valid value is 128 to 65536.
> 
> Orabug: 28314151
> 
> Tested-by: Rose Wang <rose.wang@...cle.com>
> Signed-off-by: Ka-Cheong Poon <ka-cheong.poon@...cle.com>
> ---
>   net/rds/af_rds.c | 129 +++++++++++++++++++++++++++++++++++++++++++++++++++++--
>   net/rds/rds.h    |  49 +++++++++++++++++----
>   net/rds/recv.c   |   6 +--
>   net/rds/send.c   |  90 +++++++++++++++++++++++++-------------
>   net/rds/sysctl.c |  15 ++++++-
>   5 files changed, 244 insertions(+), 45 deletions(-)
> 
> diff --git a/net/rds/af_rds.c b/net/rds/af_rds.c
> index 0b86de5..ad51714 100644
> --- a/net/rds/af_rds.c
> +++ b/net/rds/af_rds.c
> @@ -1,5 +1,5 @@
>   /*
> - * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
> + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
>    *
>    * This software is available to you under a choice of one of two
>    * licenses.  You may choose to be licensed under the terms of the GNU
> @@ -80,6 +80,17 @@ char *rds_str_array(char **array, size_t elements, size_t index)
>   static LIST_HEAD(rds_sock_list);
>   DECLARE_WAIT_QUEUE_HEAD(rds_poll_waitq);
>   
> +/* kmem cache slab for struct rds_buf_info */
> +static struct kmem_cache *rds_rs_buf_info_slab;
> +
> +/* Helper function to be passed to rhashtable_free_and_destroy() to free a
> + * struct rs_buf_info.
> + */
> +static void rds_buf_info_free(void *rsbi, void *arg)
> +{
> +	kmem_cache_free(rds_rs_buf_info_slab, rsbi);
> +}
> +
>   /*
>    * This is called as the final descriptor referencing this socket is closed.
>    * We have to unbind the socket so that another socket can be bound to the
> @@ -112,6 +123,9 @@ static int rds_release(struct socket *sock)
>   	rds_rdma_drop_keys(rs);
>   	rds_notify_queue_get(rs, NULL);
>   
> +	rhashtable_free_and_destroy(&rs->rs_buf_info_tbl, rds_buf_info_free,
> +				    NULL);
> +
>   	spin_lock_bh(&rds_sock_lock);
>   	list_del_init(&rs->rs_item);
>   	rds_sock_count--;
> @@ -272,10 +286,18 @@ static unsigned int rds_poll(struct file *file, struct socket *sock,
>   	if (!list_empty(&rs->rs_recv_queue)
>   	 || !list_empty(&rs->rs_notify_queue))
>   		mask |= (POLLIN | POLLRDNORM);
> -	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))
> -		mask |= (POLLOUT | POLLWRNORM);
>   	read_unlock_irqrestore(&rs->rs_recv_lock, flags);
>   
> +	/* Use the number of destination this socket has to estimate the
> +	 * send buffer size.  When there is no peer yet, return the default
> +	 * send buffer size.
> +	 */
> +	spin_lock_irqsave(&rs->rs_snd_lock, flags);
> +	if (rs->rs_snd_bytes < max(rs->rs_buf_info_dest_cnt, (u32)1) *
> +	    rds_sk_sndbuf(rs))
> +		mask |= (POLLOUT | POLLWRNORM);
> +	spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> +
>   	/* clear state any time we wake a seen-congested socket */
>   	if (mask)
>   		rs->rs_seen_congestion = 0;
> @@ -712,6 +734,80 @@ static int rds_getsockopt(struct socket *sock, int level, int optname,
>   
>   }
>   
> +/* Check if there is a rs_buf_info associated with the given address.  If not,
> + * add one to the rds_sock.  The found or added rs_buf_info is returned.  If
> + * there is no rs_buf_info found and a new rs_buf_info cannot be allocated,
> + * NULL is returned and ret is set to the error.  Once an address' rs_buf_info
> + * is added, it will not be removed until the rs_sock is closed.
> + */
> +struct rs_buf_info *rds_add_buf_info(struct rds_sock *rs, struct in6_addr *addr,
> +				     int *ret, gfp_t gfp)
> +{
> +	struct rs_buf_info *info, *tmp_info;
> +	unsigned long flags;
> +
> +	/* Normal path, peer is expected to be found most of the time. */
> +	rcu_read_lock();
> +	info = rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
> +				      rs_buf_info_params);
> +	if (info) {
> +		rcu_read_unlock();
> +		*ret = 0;
> +		return info;
> +	}
> +	rcu_read_unlock();
> +
> +	/* Allocate the buffer outside of lock first. */
> +	tmp_info = kmem_cache_alloc(rds_rs_buf_info_slab, gfp);
> +	if (!tmp_info) {
> +		*ret = -ENOMEM;
> +		return NULL;
> +	}
> +
> +	spin_lock_irqsave(&rs->rs_snd_lock, flags);
> +
> +	/* Cannot add more peer. */
> +	if (rs->rs_buf_info_dest_cnt + 1 > rds_sock_max_peers) {
> +		spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> +		*ret = -ENFILE;
> +		return NULL;
> +	}
> +
> +	tmp_info->rsbi_key = *addr;
> +	tmp_info->rsbi_snd_bytes = 0;
> +	*ret = rhashtable_insert_fast(&rs->rs_buf_info_tbl,
> +				     &tmp_info->rsbi_link, rs_buf_info_params);
> +	if (!*ret) {
> +		rs->rs_buf_info_dest_cnt++;
> +		spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> +		return tmp_info;
> +	} else if (*ret != -EEXIST) {
> +		spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> +		kmem_cache_free(rds_rs_buf_info_slab, tmp_info);
> +		/* Very unlikely to happen... */
> +		printk(KERN_ERR "%s: cannot add rs_buf_info for %pI6c: %d\n",
> +		       __func__, addr, *ret);
> +		return NULL;
> +	}
> +
> +	/* Another thread beats us in adding the rs_buf_info.... */
> +	info = rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
> +				      rs_buf_info_params);
> +	spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
> +	kmem_cache_free(rds_rs_buf_info_slab, tmp_info);
> +
> +	if (info) {
> +		*ret = 0;
> +		return info;
> +	}
> +
> +	/* Should not happen... */
> +	printk(KERN_ERR "%s: cannot find rs_buf_info for %pI6c\n", __func__,
> +	       addr);
> +	*ret = -EINVAL;
> +	return NULL;
> +}
> +
>   static int rds_connect(struct socket *sock, struct sockaddr *uaddr,
>   		       int addr_len, int flags)
>   {
> @@ -800,6 +896,12 @@ static int rds_connect(struct socket *sock, struct sockaddr *uaddr,
>   		break;
>   	}
>   
> +	if (!ret &&
> +	    !rds_add_buf_info(rs, &rs->rs_conn_addr, &ret, GFP_KERNEL)) {
> +		/* Need to clear the connected info in case of error. */
> +		rs->rs_conn_addr = in6addr_any;
> +		rs->rs_conn_port = 0;
> +	}
>   	release_sock(sk);
>   	return ret;
>   }
> @@ -842,6 +944,7 @@ static void rds_sock_destruct(struct sock *sk)
>   static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
>   {
>   	struct rds_sock *rs;
> +	int ret;
>   
>   	sock_init_data(sock, sk);
>   	sock->ops		= &rds_proto_ops;
> @@ -863,6 +966,11 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
>   	rs->rs_netfilter_enabled = 0;
>   	rs->rs_rx_traces = 0;
>   
> +	spin_lock_init(&rs->rs_snd_lock);
> +	ret = rhashtable_init(&rs->rs_buf_info_tbl, &rs_buf_info_params);
> +	if (ret)
> +		return ret;
> +
>   	if (!ipv6_addr_any(&rs->rs_bound_addr)) {
>   		printk(KERN_CRIT "bound addr %pI6c at create\n",
>   		       &rs->rs_bound_addr);
> @@ -879,6 +987,7 @@ static int __rds_create(struct socket *sock, struct sock *sk, int protocol)
>   static int rds_create(struct net *net, struct socket *sock, int protocol, int kern)
>   {
>   	struct sock *sk;
> +	int ret;
>   
>   	if (sock->type != SOCK_SEQPACKET ||
>   	    (protocol && IPPROTO_OKA != protocol))
> @@ -888,7 +997,10 @@ static int rds_create(struct net *net, struct socket *sock, int protocol, int ke
>   	if (!sk)
>   		return -ENOMEM;
>   
> -	return __rds_create(sock, sk, protocol);
> +	ret = __rds_create(sock, sk, protocol);
> +	if (ret)
> +		sk_free(sk);
> +	return ret;
>   }
>   
>   void debug_sock_hold(struct sock *sk)
> @@ -1194,6 +1306,7 @@ static void __exit rds_exit(void)
>   	rds_info_deregister_func(RDS6_INFO_SOCKETS, rds6_sock_info);
>   	rds_info_deregister_func(RDS6_INFO_RECV_MESSAGES, rds6_sock_inc_info);
>   #endif
> +	kmem_cache_destroy(rds_rs_buf_info_slab);
>   }
>   
>   module_exit(rds_exit);
> @@ -1204,6 +1317,14 @@ static int __init rds_init(void)
>   {
>   	int ret;
>   
> +	rds_rs_buf_info_slab = kmem_cache_create("rds_rs_buf_info",
> +						 sizeof(struct rs_buf_info),
> +						 0, SLAB_HWCACHE_ALIGN, NULL);
> +	if (!rds_rs_buf_info_slab) {
> +		ret = -ENOMEM;
> +		goto out;
> +	}
> +
>   	net_get_random_once(&rds_gen_num, sizeof(rds_gen_num));
>   
>   	rds_bind_lock_init();
> diff --git a/net/rds/rds.h b/net/rds/rds.h
> index 31ab639..435caa5 100644
> --- a/net/rds/rds.h
> +++ b/net/rds/rds.h
> @@ -12,6 +12,7 @@
>   #include <uapi/linux/rds.h>
>   #include <linux/in6.h>
>   #include <linux/sizes.h>
> +#include <linux/rhashtable.h>
>   
>   #include "info.h"
>   
> @@ -725,6 +726,13 @@ struct rds_transport {
>   				struct rdma_cm_event *event);
>   };
>   
> +/* Used to store per peer socket buffer info. */
> +struct rs_buf_info {
> +	struct in6_addr		rsbi_key;
> +	struct rhash_head	rsbi_link;
> +	u32			rsbi_snd_bytes;
> +};
> +
>   struct rds_sock {
>   	struct sock		rs_sk;
>   
> @@ -757,28 +765,34 @@ struct rds_sock {
>   	/* seen congestion (ENOBUFS) when sending? */
>   	int			rs_seen_congestion;
>   
> -	/* rs_lock protects all these adjacent members before the newline */
> -	spinlock_t		rs_lock;
> -	struct list_head	rs_send_queue;
> -	u32			rs_snd_bytes;
> -	int			rs_rcv_bytes;
> -	struct list_head	rs_notify_queue;	/* currently used for failed RDMAs */
> -
> -	/* Congestion wake_up. If rs_cong_monitor is set, we use cong_mask
> +	/* rs_lock protects all these adjacent members before the newline.
> +	 *
> +	 * Congestion wake_up. If rs_cong_monitor is set, we use cong_mask
>   	 * to decide whether the application should be woken up.
>   	 * If not set, we use rs_cong_track to find out whether a cong map
>   	 * update arrived.
>   	 */
> +	spinlock_t		rs_lock;
>   	uint64_t		rs_cong_mask;
>   	uint64_t		rs_cong_notify;
>   	struct list_head	rs_cong_list;
>   	unsigned long		rs_cong_track;
> +	struct list_head	rs_notify_queue; /* currently used for failed RDMAs */
> +
> +	/* rs_snd_lock protects all these adjacent members before the
> +	 * newline */
> +	spinlock_t		rs_snd_lock;
> +	struct list_head	rs_send_queue;
> +	u32			rs_snd_bytes; /* Total bytes to all peers */
> +	u32			rs_buf_info_dest_cnt;
> +	struct rhashtable	rs_buf_info_tbl;
>   
>   	/*
>   	 * rs_recv_lock protects the receive queue, and is
>   	 * used to serialize with rds_release.
>   	 */
>   	rwlock_t		rs_recv_lock;
> +	int			rs_rcv_bytes;
>   	struct list_head	rs_recv_queue;
>   
>   	/* just for stats reporting */
> @@ -867,6 +881,25 @@ struct rds_statistics {
>   };
>   
>   /* af_rds.c */
> +#define	RDS_SOCK_BUF_INFO_HTBL_SIZE	512
> +static const struct rhashtable_params rs_buf_info_params = {
> +	.nelem_hint = RDS_SOCK_BUF_INFO_HTBL_SIZE,
> +	.key_len = sizeof(struct in6_addr),
> +	.key_offset = offsetof(struct rs_buf_info, rsbi_key),
> +	.head_offset = offsetof(struct rs_buf_info, rsbi_link),
> +};
> +
> +/* Maximum number of peers a socket can communicate with */
> +extern unsigned int rds_sock_max_peers;
> +
> +struct rs_buf_info *rds_add_buf_info(struct rds_sock *, struct in6_addr *,
> +				     int *, gfp_t);
> +static inline struct rs_buf_info *rds_get_buf_info(struct rds_sock *rs,
> +						   struct in6_addr *addr)
> +{
> +	return rhashtable_lookup_fast(&rs->rs_buf_info_tbl, addr,
> +				      rs_buf_info_params);
> +}
>   char *rds_str_array(char **array, size_t elements, size_t index);
>   void rds_sock_addref(struct rds_sock *rs);
>   void rds_sock_put(struct rds_sock *rs);
> diff --git a/net/rds/recv.c b/net/rds/recv.c
> index 7138f75..4402b02 100644
> --- a/net/rds/recv.c
> +++ b/net/rds/recv.c
> @@ -1,5 +1,5 @@
>   /*
> - * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
> + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
>    *
>    * This software is available to you under a choice of one of two
>    * licenses.  You may choose to be licensed under the terms of the GNU
> @@ -900,9 +900,9 @@ static int rds_notify_cong(struct rds_sock *rs, struct msghdr *msghdr)
>   	if (err)
>   		return err;
>   
> -	spin_lock_irqsave(&rs->rs_lock, flags);
> +	spin_lock_irqsave(&rs->rs_snd_lock, flags);
>   	rs->rs_cong_notify &= ~notify;
> -	spin_unlock_irqrestore(&rs->rs_lock, flags);
> +	spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
>   
>   	return 0;
>   }
> diff --git a/net/rds/send.c b/net/rds/send.c
> index 676f38b..1f1a54c 100644
> --- a/net/rds/send.c
> +++ b/net/rds/send.c
> @@ -1,5 +1,5 @@
>   /*
> - * Copyright (c) 2006, 2018 Oracle and/or its affiliates. All rights reserved.
> + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
>    *
>    * This software is available to you under a choice of one of two
>    * licenses.  You may choose to be licensed under the terms of the GNU
> @@ -548,10 +548,16 @@ int rds_send_xmit(struct rds_conn_path *cp)
>   static void rds_send_sndbuf_remove(struct rds_sock *rs, struct rds_message *rm)
>   {
>   	u32 len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
> +	struct rs_buf_info *bufi;
>   
> -	assert_spin_locked(&rs->rs_lock);
> +	assert_spin_locked(&rs->rs_snd_lock);
>   
> +	bufi = rds_get_buf_info(rs, &rm->m_daddr);
> +	/* bufi cannot be NULL as an address's rs_buf_info is never deleted. */
> +	BUG_ON(!bufi);
> +	BUG_ON(bufi->rsbi_snd_bytes < len);
>   	BUG_ON(rs->rs_snd_bytes < len);
> +	bufi->rsbi_snd_bytes -= len;
>   	rs->rs_snd_bytes -= len;
>   
>   	if (rs->rs_snd_bytes == 0)
> @@ -764,11 +770,12 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
>   			rs = rm->m_rs;
>   			debug_sock_hold(rds_rs_to_sk(rs));
>   		}
> -		spin_lock(&rs->rs_lock);
>   
> +		spin_lock(&rs->rs_snd_lock);
>   		if (test_and_clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags)) {
>   			list_del_init(&rm->m_sock_item);
>   			rds_send_sndbuf_remove(rs, rm);
> +			spin_unlock(&rs->rs_snd_lock);
>   
>   			if (rm->rdma.op_active && rm->rdma.op_notifier) {
>   				struct rm_rdma_op *ro = &rm->rdma;
> @@ -776,8 +783,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
>   
>   				if (ro->op_notify || status) {
>   					notifier = ro->op_notifier;
> +					spin_lock(&rs->rs_lock);
>   					list_add_tail(&notifier->n_list,
>   							&rs->rs_notify_queue);
> +					spin_unlock(&rs->rs_lock);
>   					if (!notifier->n_status)
>   						notifier->n_status = status;
>   				} else
> @@ -789,8 +798,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
>   
>   				if (ao->op_notify || status) {
>   					notifier = ao->op_notifier;
> +					spin_lock(&rs->rs_lock);
>   					list_add_tail(&notifier->n_list,
>   						&rs->rs_notify_queue);
> +					spin_unlock(&rs->rs_lock);
>   					if (!notifier->n_status)
>   						notifier->n_status = status;
>   				} else
> @@ -802,8 +813,10 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
>   
>   				if (so->op_notify || status) {
>   					notifier = so->op_notifier;
> +					spin_lock(&rs->rs_lock);
>   					list_add_tail(&notifier->n_list,
>   						&rs->rs_notify_queue);
> +					spin_unlock(&rs->rs_lock);
>   					if (!notifier->n_status)
>   						notifier->n_status = status;
>   				} else
> @@ -813,8 +826,8 @@ void rds_send_remove_from_sock(struct list_head *messages, int status)
>   
>   			was_on_sock = 1;
>   			rm->m_rs = NULL;
> -		}
> -		spin_unlock(&rs->rs_lock);
> +		} else
> +			spin_unlock(&rs->rs_snd_lock);
>   
>   unlock_and_drop:
>   		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
> @@ -885,8 +898,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
>   	LIST_HEAD(list);
>   	int conn_dropped = 0;
>   
> -	/* get all the messages we're dropping under the rs lock */
> -	spin_lock_irqsave(&rs->rs_lock, flags);
> +	/* get all the messages we're dropping under the rs_snd_lock */
> +	spin_lock_irqsave(&rs->rs_snd_lock, flags);
>   
>   	list_for_each_entry_safe(rm, tmp, &rs->rs_send_queue, m_sock_item) {
>   		if (dest &&
> @@ -899,10 +912,10 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
>   		clear_bit(RDS_MSG_ON_SOCK, &rm->m_flags);
>   	}
>   
> -	/* order flag updates with the rs lock */
> +	/* order flag updates with the rs_snd_lock */
>   	smp_mb__after_atomic();
>   
> -	spin_unlock_irqrestore(&rs->rs_lock, flags);
> +	spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
>   
>   	if (list_empty(&list))
>   		return;
> @@ -935,9 +948,9 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
>   		 */
>   		spin_lock_irqsave(&rm->m_rs_lock, flags);
>   
> -		spin_lock(&rs->rs_lock);
> +		spin_lock(&rs->rs_snd_lock);
>   		__rds_send_complete(rs, rm, RDS_RDMA_SEND_CANCELED);
> -		spin_unlock(&rs->rs_lock);
> +		spin_unlock(&rs->rs_snd_lock);
>   
>   		rm->m_rs = NULL;
>   		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
> @@ -970,9 +983,9 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
>   		 */
>   		spin_lock_irqsave(&rm->m_rs_lock, flags);
>   
> -		spin_lock(&rs->rs_lock);
> +		spin_lock(&rs->rs_snd_lock);
>   		__rds_send_complete(rs, rm, RDS_RDMA_SEND_CANCELED);
> -		spin_unlock(&rs->rs_lock);
> +		spin_unlock(&rs->rs_snd_lock);
>   
>   		rm->m_rs = NULL;
>   		spin_unlock_irqrestore(&rm->m_rs_lock, flags);
> @@ -989,7 +1002,8 @@ void rds_send_drop_to(struct rds_sock *rs, struct sockaddr_in6 *dest)
>   static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
>   			     struct rds_conn_path *cp,
>   			     struct rds_message *rm, __be16 sport,
> -			     __be16 dport, int *queued)
> +			     __be16 dport, int *queued,
> +			     struct rs_buf_info *bufi)
>   {
>   	unsigned long flags;
>   	u32 len;
> @@ -999,9 +1013,9 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
>   
>   	len = be32_to_cpu(rm->m_inc.i_hdr.h_len);
>   
> -	/* this is the only place which holds both the socket's rs_lock
> +	/* this is the only place which holds both the socket's rs_snd_lock
>   	 * and the connection's c_lock */
> -	spin_lock_irqsave(&rs->rs_lock, flags);
> +	spin_lock_irqsave(&rs->rs_snd_lock, flags);
>   
>   	/*
>   	 * If there is a little space in sndbuf, we don't queue anything,
> @@ -1011,7 +1025,10 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
>   	 * rs_snd_bytes here to allow the last msg to exceed the buffer,
>   	 * and poll() now knows no more data can be sent.
>   	 */
> -	if (rs->rs_snd_bytes < rds_sk_sndbuf(rs)) {
> +	if (bufi->rsbi_snd_bytes < rds_sk_sndbuf(rs)) {
> +		bufi->rsbi_snd_bytes += len;
> +
> +		/* Record the total number of snd_bytes of all peers. */
>   		rs->rs_snd_bytes += len;
>   
>   		/* let recv side know we are close to send space exhaustion.
> @@ -1019,7 +1036,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
>   		 * means we set the flag on *all* messages as soon as our
>   		 * throughput hits a certain threshold.
>   		 */
> -		if (rs->rs_snd_bytes >= rds_sk_sndbuf(rs) / 2)
> +		if (bufi->rsbi_snd_bytes >= rds_sk_sndbuf(rs) / 2)
>   			set_bit(RDS_MSG_ACK_REQUIRED, &rm->m_flags);
>   
>   		list_add_tail(&rm->m_sock_item, &rs->rs_send_queue);
> @@ -1037,7 +1054,7 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
>   		spin_lock(&cp->cp_lock);
>   		if (cp->cp_pending_flush) {
>   			spin_unlock(&cp->cp_lock);
> -			spin_unlock_irqrestore(&rs->rs_lock, flags);
> +			spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
>   			goto out;
>   		}
>   		rm->m_inc.i_hdr.h_sequence = cpu_to_be64(cp->cp_next_tx_seq++);
> @@ -1046,14 +1063,14 @@ static int rds_send_queue_rm(struct rds_sock *rs, struct rds_connection *conn,
>   
>   		spin_unlock(&cp->cp_lock);
>   
> -		rdsdebug("queued msg %p len %d, rs %p bytes %d seq %llu\n",
> -			 rm, len, rs, rs->rs_snd_bytes,
> +		rdsdebug("queued msg %p len %d, rs %p bytes %u (%u) seq %llu\n",
> +			 rm, len, rs, rs->rs_snd_bytes, bufi->rsbi_snd_bytes,
>   			 (unsigned long long)be64_to_cpu(rm->m_inc.i_hdr.h_sequence));
>   
>   		*queued = 1;
>   	}
>   
> -	spin_unlock_irqrestore(&rs->rs_lock, flags);
> +	spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
>   out:
>   	return *queued;
>   }
> @@ -1257,6 +1274,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
>   	long timeo = sock_sndtimeo(sk, nonblock);
>   	size_t total_payload_len = payload_len, rdma_payload_len = 0;
>   	struct rds_conn_path *cpath;
> +	struct rs_buf_info *bufi;
>   	struct in6_addr daddr;
>   	__u32 scope_id = 0;
>   	int namelen;
> @@ -1395,19 +1413,25 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
>   		goto out;
>   	}
>   
> +
> +	bufi = rds_add_buf_info(rs, &daddr, &ret, GFP_KERNEL);
> +	if (!bufi)
> +		goto out;
> +
>   	/*
>   	 * Avoid copying the message from user-space if we already
>   	 * know there's no space in the send buffer.
>   	 * The check is a negated version of the condition used inside
> -	 * function "rds_send_queue_rm": "if (rs->rs_snd_bytes < rds_sk_sndbuf(rs))",
> +	 * function "rds_send_queue_rm":
> +	 * 	"if (bufi->rsbi_snd_bytes < rds_sk_sndbuf(rs))",
>   	 * which needs some reconsideration, as it unexpectedly checks
>   	 * if half of the send-buffer space is available, instead of
>   	 * checking if the given message would fit.
>   	 */
>   	if (nonblock) {
> -		spin_lock_irqsave(&rs->rs_lock, flags);
> -		no_space = rs->rs_snd_bytes >= rds_sk_sndbuf(rs);
> -		spin_unlock_irqrestore(&rs->rs_lock, flags);
> +		spin_lock_irqsave(&rs->rs_snd_lock, flags);
> +		no_space = bufi->rsbi_snd_bytes >= rds_sk_sndbuf(rs);
> +		spin_unlock_irqrestore(&rs->rs_snd_lock, flags);
>   		if (no_space) {
>   			ret = -EAGAIN;
>   			goto out;
> @@ -1525,7 +1549,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
>   	}
>   
>   	while (!rds_send_queue_rm(rs, conn, cpath, rm, rs->rs_bound_port,
> -				  dport, &queued)) {
> +				  dport, &queued, bufi)) {
>   		rds_stats_inc(s_send_queue_full);
>   
>   		if (nonblock) {
> @@ -1541,7 +1565,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
>   					rds_send_queue_rm(rs, conn, cpath, rm,
>   							  rs->rs_bound_port,
>   							  dport,
> -							  &queued),
> +							  &queued, bufi),
>   					timeo);
>   		rdsdebug("sendmsg woke queued %d timeo %ld\n", queued, timeo);
>   		if (timeo > 0 || timeo == MAX_SCHEDULE_TIMEOUT)
> @@ -1591,6 +1615,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
>   int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs,
>   		      struct sk_buff *skb, gfp_t gfp)
>   {
> +	struct rs_buf_info *bufi;
>   	struct rds_nf_hdr *dst;
>   	struct rds_message *rm = NULL;
>   	struct scatterlist *sg;
> @@ -1609,6 +1634,13 @@ int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs,
>   	if (ret < 0)
>   		goto out;
>   
> +	bufi = rds_add_buf_info(rs, &dst->daddr, &ret, gfp);
> +	if (!bufi) {
> +		rds_rtd(RDS_RTD_ERR, "failed to allocate rs %p sbuf to %pI6c",
> +			rs, &dst->daddr);
> +		goto out;
> +	}
> +
>   	/* create ourselves a new message to send out the data */
>   	rm = rds_message_alloc(ret, gfp);
>   	if (!rm) {
> @@ -1677,7 +1709,7 @@ int rds_send_internal(struct rds_connection *conn, struct rds_sock *rs,
>   
>   	/* only take a single pass */
>   	if (!rds_send_queue_rm(rs, conn, &conn->c_path[0], rm,
> -			       rs->rs_bound_port, dst->dport, &queued)) {
> +			       rs->rs_bound_port, dst->dport, &queued, bufi)) {
>   		rds_rtd(RDS_RTD_SND, "cannot block on internal send rs %p", rs);
>   		rds_stats_inc(s_send_queue_full);
>   
> diff --git a/net/rds/sysctl.c b/net/rds/sysctl.c
> index b22e8b8..21bc056e 100644
> --- a/net/rds/sysctl.c
> +++ b/net/rds/sysctl.c
> @@ -1,5 +1,5 @@
>   /*
> - * Copyright (c) 2006 Oracle.  All rights reserved.
> + * Copyright (c) 2006, 2019 Oracle and/or its affiliates. All rights reserved.
>    *
>    * This software is available to you under a choice of one of two
>    * licenses.  You may choose to be licensed under the terms of the GNU
> @@ -52,6 +52,10 @@
>   unsigned int rds_sysctl_shutdown_trace_start_time;
>   unsigned int rds_sysctl_shutdown_trace_end_time;
>   
> +unsigned int rds_sock_max_peers_min = 128;
> +unsigned int rds_sock_max_peers_max = 65536;
> +unsigned int rds_sock_max_peers = 8192;
> +
>   /*
>    * We have official values, but must maintain the sysctl interface for existing
>    * software that expects to find these values here.
> @@ -127,6 +131,15 @@
>   		.mode           = 0644,
>   		.proc_handler   = &proc_dointvec,
>   	},
> +	{
> +		.procname       = "sock_max_peers",
> +		.data           = &rds_sock_max_peers,
> +		.maxlen         = sizeof(int),
> +		.mode           = 0644,
> +		.proc_handler   = &proc_dointvec_minmax,
> +		.extra1		= &rds_sock_max_peers_min,
> +		.extra2		= &rds_sock_max_peers_max
> +	},
>   	{ }
>   };
>   
> 


-- 
K. Poon
ka-cheong.poon@...cle.com


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ