lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250411180207.450312-6-allison.henderson@oracle.com>
Date: Fri, 11 Apr 2025 11:02:04 -0700
From: allison.henderson@...cle.com
To: netdev@...r.kernel.org
Subject: [PATCH v2 5/8] net/rds: rds_tcp_accept_one ought to not discard messages

From: Gerd Rausch <gerd.rausch@...cle.com>

RDS/TCP differs from RDS/RDMA in that message acknowledgment
is done based on TCP sequence numbers:
As soon as the last byte of a message has been acknowledged
by the TCP stack of a peer, "rds_tcp_write_space()" goes on
to discard prior messages from the send queue.

Which is fine, for as long as the receiver never throws any messages away.

Unfortunately, that is *not* the case since the introduction of MPRDS:
commit "RDS: TCP: Enable multipath RDS for TCP"

A new function "rds_tcp_accept_one_path" was introduced,
which is entitled to return "NULL", if no connection path is currently
available.

Unfortunately, this happens after the "->accept()" call, and the new socket
often already contains messages, since the peer already transitioned
to "RDS_CONN_UP" on behalf of "TCP_ESTABLISHED".

That's also the case after this [1]:
commit "RDS: TCP: Force every connection to be initiated by numerically
smaller IP address"

which tried to address the situation of pending data by only transitioning
connections from a smaller IP address to "RDS_CONN_UP".

But even in those cases, and in particular if the "RDS_EXTHDR_NPATHS"
handshake has not occurred yet, and therefore we're working with
"c_npaths <= 1", "c_conn[0]" may be in a state distinct from
"RDS_CONN_DOWN", and therefore all messages on the just accepted socket
will be tossed away.

This fix changes "rds_tcp_accept_one":

* If connected from a peer with a larger IP address, the new socket
  will continue to get closed right away.
  With commit [1] above, there should not be any messages
  in the socket receive buffer, since the peer never transitioned
  to "RDS_CONN_UP".
  Therefore it should be okay to not make any efforts to dispatch
  the socket receive buffer.

* If connected from a peer with a smaller IP address,
  we call "rds_tcp_accept_one_path" to find a free slot/"path".
  If found, business goes on as usual.
  If none was found, we save/stash the newly accepted socket
  into "rds_tcp_accepted_sock", in order to not lose any
  messages that may have arrived already.
  We then return from "rds_tcp_accept_one" with "-ENOBUFS".
  Later on, when a slot/"path" does become available again
  (e.g. state transitioned to "RDS_CONN_DOWN",
   or HS extension header was received with "c_npaths > 1")
  we call "rds_tcp_conn_slots_available" that simply re-issues
  a "rds_tcp_accept_one_path" worker-callback and picks
  up the new socket from "rds_tcp_accepted_sock", and thereby
  continuing where it left with "-ENOBUFS" last time.
  Since a new slot has become available, those messages
  won't be lost, since processing proceeds as if that slot
  had been available the first time around.

Signed-off-by: Gerd Rausch <gerd.rausch@...cle.com>
Signed-off-by: Jack Vogel <jack.vogel@...cle.com>
Signed-off-by: Allison Henderson <allison.henderson@...cle.com>
---
 net/rds/connection.c |   4 ++
 net/rds/rds.h        |  65 ++++++++++--------
 net/rds/recv.c       |   4 ++
 net/rds/tcp.c        |  27 +++-----
 net/rds/tcp.h        |  22 +++++-
 net/rds/tcp_listen.c | 156 ++++++++++++++++++++++++++++++-------------
 6 files changed, 186 insertions(+), 92 deletions(-)

diff --git a/net/rds/connection.c b/net/rds/connection.c
index cfea54740219..05adbcd3c7b3 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -435,6 +435,10 @@ void rds_conn_shutdown(struct rds_conn_path *cp)
 	 * to the conn hash, so we never trigger a reconnect on this
 	 * conn - the reconnect is always triggered by the active peer. */
 	cancel_delayed_work_sync(&cp->cp_conn_w);
+
+	if (conn->c_trans->conn_slots_available)
+		conn->c_trans->conn_slots_available(conn);
+
 	rcu_read_lock();
 	if (!hlist_unhashed(&conn->c_hash_node)) {
 		rcu_read_unlock();
diff --git a/net/rds/rds.h b/net/rds/rds.h
index d8d3abb2d5a6..0d66d4f4cc63 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -510,33 +510,6 @@ struct rds_notifier {
  */
 #define	RDS_TRANS_LOOP	3
 
-/**
- * struct rds_transport -  transport specific behavioural hooks
- *
- * @xmit: .xmit is called by rds_send_xmit() to tell the transport to send
- *        part of a message.  The caller serializes on the send_sem so this
- *        doesn't need to be reentrant for a given conn.  The header must be
- *        sent before the data payload.  .xmit must be prepared to send a
- *        message with no data payload.  .xmit should return the number of
- *        bytes that were sent down the connection, including header bytes.
- *        Returning 0 tells the caller that it doesn't need to perform any
- *        additional work now.  This is usually the case when the transport has
- *        filled the sending queue for its connection and will handle
- *        triggering the rds thread to continue the send when space becomes
- *        available.  Returning -EAGAIN tells the caller to retry the send
- *        immediately.  Returning -ENOMEM tells the caller to retry the send at
- *        some point in the future.
- *
- * @conn_shutdown: conn_shutdown stops traffic on the given connection.  Once
- *                 it returns the connection can not call rds_recv_incoming().
- *                 This will only be called once after conn_connect returns
- *                 non-zero success and will The caller serializes this with
- *                 the send and connecting paths (xmit_* and conn_*).  The
- *                 transport is responsible for other serialization, including
- *                 rds_recv_incoming().  This is called in process context but
- *                 should try hard not to block.
- */
-
 struct rds_transport {
 	char			t_name[TRANSNAMSIZ];
 	struct list_head	t_item;
@@ -549,10 +522,48 @@ struct rds_transport {
 			   __u32 scope_id);
 	int (*conn_alloc)(struct rds_connection *conn, gfp_t gfp);
 	void (*conn_free)(void *data);
+
+	/*
+	 * conn_slots_available is invoked when a previously unavailable connection slot
+	 * becomes available again. rds_tcp_accept_one_path may return -ENOBUFS if it
+	 * cannot find an available slot, and then stashes the new socket in
+	 * "rds_tcp_accepted_sock". This function re-issues `rds_tcp_accept_one_path`,
+	 * which picks up the stashed socket and continuing where it left with "-ENOBUFS"
+	 * last time.  This ensures messages received on the new socket are not discarded
+	 * when no connection path was available at the time.
+	 */
+	void (*conn_slots_available)(struct rds_connection *conn);
 	int (*conn_path_connect)(struct rds_conn_path *cp);
+
+	/*
+	 * conn_shutdown stops traffic on the given connection.  Once
+	 * it returns the connection can not call rds_recv_incoming().
+	 * This will only be called once after conn_connect returns
+	 * non-zero success and will The caller serializes this with
+	 * the send and connecting paths (xmit_* and conn_*).  The
+	 * transport is responsible for other serialization, including
+	 * rds_recv_incoming().  This is called in process context but
+	 * should try hard not to block.
+	 */
 	void (*conn_path_shutdown)(struct rds_conn_path *conn);
 	void (*xmit_path_prepare)(struct rds_conn_path *cp);
 	void (*xmit_path_complete)(struct rds_conn_path *cp);
+
+	/*
+	 * .xmit is called by rds_send_xmit() to tell the transport to send
+	 * part of a message.  The caller serializes on the send_sem so this
+	 * doesn't need to be reentrant for a given conn.  The header must be
+	 * sent before the data payload.  .xmit must be prepared to send a
+	 * message with no data payload.  .xmit should return the number of
+	 * bytes that were sent down the connection, including header bytes.
+	 * Returning 0 tells the caller that it doesn't need to perform any
+	 * additional work now.  This is usually the case when the transport has
+	 * filled the sending queue for its connection and will handle
+	 * triggering the rds thread to continue the send when space becomes
+	 * available.  Returning -EAGAIN tells the caller to retry the send
+	 * immediately.  Returning -ENOMEM tells the caller to retry the send at
+	 * some point in the future.
+	 */
 	int (*xmit)(struct rds_connection *conn, struct rds_message *rm,
 		    unsigned int hdr_off, unsigned int sg, unsigned int off);
 	int (*xmit_rdma)(struct rds_connection *conn, struct rm_rdma_op *op);
diff --git a/net/rds/recv.c b/net/rds/recv.c
index 5627f80013f8..c0a89af29d1b 100644
--- a/net/rds/recv.c
+++ b/net/rds/recv.c
@@ -230,6 +230,10 @@ static void rds_recv_hs_exthdrs(struct rds_header *hdr,
 	conn->c_npaths = max_t(int, conn->c_npaths, 1);
 	conn->c_ping_triggered = 0;
 	rds_conn_peer_gen_update(conn, new_peer_gen_num);
+
+	if (conn->c_npaths > 1 &&
+	    conn->c_trans->conn_slots_available)
+		conn->c_trans->conn_slots_available(conn);
 }
 
 /* rds_start_mprds() will synchronously start multiple paths when appropriate.
diff --git a/net/rds/tcp.c b/net/rds/tcp.c
index 3cc2f303bf78..31e7425e2da9 100644
--- a/net/rds/tcp.c
+++ b/net/rds/tcp.c
@@ -213,6 +213,8 @@ void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp)
 		sock->sk->sk_data_ready = sock->sk->sk_user_data;
 
 	tc->t_sock = sock;
+	if (!tc->t_rtn)
+		tc->t_rtn = net_generic(sock_net(sock->sk), rds_tcp_netid);
 	tc->t_cpath = cp;
 	tc->t_orig_data_ready = sock->sk->sk_data_ready;
 	tc->t_orig_write_space = sock->sk->sk_write_space;
@@ -378,6 +380,7 @@ static int rds_tcp_conn_alloc(struct rds_connection *conn, gfp_t gfp)
 		}
 		mutex_init(&tc->t_conn_path_lock);
 		tc->t_sock = NULL;
+		tc->t_rtn = NULL;
 		tc->t_tinc = NULL;
 		tc->t_tinc_hdr_rem = sizeof(struct rds_header);
 		tc->t_tinc_data_rem = 0;
@@ -458,6 +461,7 @@ struct rds_transport rds_tcp_transport = {
 	.recv_path		= rds_tcp_recv_path,
 	.conn_alloc		= rds_tcp_conn_alloc,
 	.conn_free		= rds_tcp_conn_free,
+	.conn_slots_available	= rds_tcp_conn_slots_available,
 	.conn_path_connect	= rds_tcp_conn_path_connect,
 	.conn_path_shutdown	= rds_tcp_conn_path_shutdown,
 	.inc_copy_to_user	= rds_tcp_inc_copy_to_user,
@@ -473,17 +477,7 @@ struct rds_transport rds_tcp_transport = {
 	.t_unloading		= rds_tcp_is_unloading,
 };
 
-static unsigned int rds_tcp_netid;
-
-/* per-network namespace private data for this module */
-struct rds_tcp_net {
-	struct socket *rds_tcp_listen_sock;
-	struct work_struct rds_tcp_accept_w;
-	struct ctl_table_header *rds_tcp_sysctl;
-	struct ctl_table *ctl_table;
-	int sndbuf_size;
-	int rcvbuf_size;
-};
+int rds_tcp_netid;
 
 /* All module specific customizations to the RDS-TCP socket should be done in
  * rds_tcp_tune() and applied after socket creation.
@@ -526,15 +520,12 @@ static void rds_tcp_accept_worker(struct work_struct *work)
 					       struct rds_tcp_net,
 					       rds_tcp_accept_w);
 
-	while (rds_tcp_accept_one(rtn->rds_tcp_listen_sock) == 0)
+	while (rds_tcp_accept_one(rtn) == 0)
 		cond_resched();
 }
 
-void rds_tcp_accept_work(struct sock *sk)
+void rds_tcp_accept_work(struct rds_tcp_net *rtn)
 {
-	struct net *net = sock_net(sk);
-	struct rds_tcp_net *rtn = net_generic(net, rds_tcp_netid);
-
 	queue_work(rds_wq, &rtn->rds_tcp_accept_w);
 }
 
@@ -546,6 +537,8 @@ static __net_init int rds_tcp_init_net(struct net *net)
 
 	memset(rtn, 0, sizeof(*rtn));
 
+	mutex_init(&rtn->rds_tcp_accept_lock);
+
 	/* {snd, rcv}buf_size default to 0, which implies we let the
 	 * stack pick the value, and permit auto-tuning of buffer size.
 	 */
@@ -609,6 +602,8 @@ static void rds_tcp_kill_sock(struct net *net)
 
 	rtn->rds_tcp_listen_sock = NULL;
 	rds_tcp_listen_stop(lsock, &rtn->rds_tcp_accept_w);
+	if (rtn->rds_tcp_accepted_sock)
+		sock_release(rtn->rds_tcp_accepted_sock);
 	spin_lock_irq(&rds_tcp_conn_lock);
 	list_for_each_entry_safe(tc, _tc, &rds_tcp_conn_list, t_tcp_node) {
 		struct net *c_net = read_pnet(&tc->t_cpath->cp_conn->c_net);
diff --git a/net/rds/tcp.h b/net/rds/tcp.h
index 053aa7da87ef..2000f4acd57a 100644
--- a/net/rds/tcp.h
+++ b/net/rds/tcp.h
@@ -4,6 +4,21 @@
 
 #define RDS_TCP_PORT	16385
 
+/* per-network namespace private data for this module */
+struct rds_tcp_net {
+	/* serialize "rds_tcp_accept_one" with "rds_tcp_accept_lock"
+	 * to protect "rds_tcp_accepted_sock"
+	 */
+	struct mutex		rds_tcp_accept_lock;
+	struct socket		*rds_tcp_listen_sock;
+	struct socket		*rds_tcp_accepted_sock;
+	struct work_struct	rds_tcp_accept_w;
+	struct ctl_table_header	*rds_tcp_sysctl;
+	struct ctl_table	*ctl_table;
+	int			sndbuf_size;
+	int			rcvbuf_size;
+};
+
 struct rds_tcp_incoming {
 	struct rds_incoming	ti_inc;
 	struct sk_buff_head	ti_skb_list;
@@ -19,6 +34,7 @@ struct rds_tcp_connection {
 	 */
 	struct mutex		t_conn_path_lock;
 	struct socket		*t_sock;
+	struct rds_tcp_net	*t_rtn;
 	void			*t_orig_write_space;
 	void			*t_orig_data_ready;
 	void			*t_orig_state_change;
@@ -49,6 +65,7 @@ struct rds_tcp_statistics {
 };
 
 /* tcp.c */
+extern int rds_tcp_netid;
 bool rds_tcp_tune(struct socket *sock);
 void rds_tcp_set_callbacks(struct socket *sock, struct rds_conn_path *cp);
 void rds_tcp_reset_callbacks(struct socket *sock, struct rds_conn_path *cp);
@@ -57,7 +74,7 @@ void rds_tcp_restore_callbacks(struct socket *sock,
 u32 rds_tcp_write_seq(struct rds_tcp_connection *tc);
 u32 rds_tcp_snd_una(struct rds_tcp_connection *tc);
 extern struct rds_transport rds_tcp_transport;
-void rds_tcp_accept_work(struct sock *sk);
+void rds_tcp_accept_work(struct rds_tcp_net *rtn);
 int rds_tcp_laddr_check(struct net *net, const struct in6_addr *addr,
 			__u32 scope_id);
 /* tcp_connect.c */
@@ -69,7 +86,8 @@ void rds_tcp_state_change(struct sock *sk);
 struct socket *rds_tcp_listen_init(struct net *net, bool isv6);
 void rds_tcp_listen_stop(struct socket *sock, struct work_struct *acceptor);
 void rds_tcp_listen_data_ready(struct sock *sk);
-int rds_tcp_accept_one(struct socket *sock);
+void rds_tcp_conn_slots_available(struct rds_connection *conn);
+int rds_tcp_accept_one(struct rds_tcp_net *rtn);
 void rds_tcp_keepalive(struct socket *sock);
 void *rds_tcp_listen_sock_def_readable(struct net *net);
 
diff --git a/net/rds/tcp_listen.c b/net/rds/tcp_listen.c
index fced9e286f79..78d4990b2086 100644
--- a/net/rds/tcp_listen.c
+++ b/net/rds/tcp_listen.c
@@ -35,6 +35,8 @@
 #include <linux/in.h>
 #include <net/tcp.h>
 #include <trace/events/sock.h>
+#include <net/net_namespace.h>
+#include <net/netns/generic.h>
 
 #include "rds.h"
 #include "tcp.h"
@@ -66,32 +68,46 @@ struct rds_tcp_connection *rds_tcp_accept_one_path(struct rds_connection *conn)
 	int i;
 	int npaths = max_t(int, 1, conn->c_npaths);
 
-	/* for mprds, all paths MUST be initiated by the peer
-	 * with the smaller address.
-	 */
-	if (rds_addr_cmp(&conn->c_faddr, &conn->c_laddr) >= 0) {
-		/* Make sure we initiate at least one path if this
-		 * has not already been done; rds_start_mprds() will
-		 * take care of additional paths, if necessary.
-		 */
-		if (npaths == 1)
-			rds_conn_path_connect_if_down(&conn->c_path[0]);
-		return NULL;
-	}
-
 	for (i = 0; i < npaths; i++) {
 		struct rds_conn_path *cp = &conn->c_path[i];
 
 		if (rds_conn_path_transition(cp, RDS_CONN_DOWN,
-					     RDS_CONN_CONNECTING)) {
+					     RDS_CONN_CONNECTING))
 			return cp->cp_transport_data;
-		}
 	}
 	return NULL;
 }
 
-int rds_tcp_accept_one(struct socket *sock)
+void rds_tcp_conn_slots_available(struct rds_connection *conn)
+{
+	struct rds_tcp_connection *tc;
+	struct rds_tcp_net *rtn;
+
+	if (rds_destroy_pending(conn))
+		return;
+
+	tc = conn->c_path->cp_transport_data;
+	rtn = tc->t_rtn;
+	if (!rtn)
+		return;
+
+	/* As soon as a connection went down,
+	 * it is safe to schedule a "rds_tcp_accept_one"
+	 * attempt even if there are no connections pending:
+	 * Function "rds_tcp_accept_one" won't block
+	 * but simply return -EAGAIN in that case.
+	 *
+	 * Doing so is necessary to address the case where an
+	 * incoming connection on "rds_tcp_listen_sock" is ready
+	 * to be acccepted prior to a free slot being available:
+	 * the -ENOBUFS case in "rds_tcp_accept_one".
+	 */
+	rds_tcp_accept_work(rtn);
+}
+
+int rds_tcp_accept_one(struct rds_tcp_net *rtn)
 {
+	struct socket *listen_sock = rtn->rds_tcp_listen_sock;
 	struct socket *new_sock = NULL;
 	struct rds_connection *conn;
 	int ret;
@@ -109,33 +125,40 @@ int rds_tcp_accept_one(struct socket *sock)
 #endif
 	int dev_if = 0;
 
-	if (!sock) /* module unload or netns delete in progress */
+	if (!listen_sock)
 		return -ENETUNREACH;
 
-	ret = sock_create_lite(sock->sk->sk_family,
-			       sock->sk->sk_type, sock->sk->sk_protocol,
-			       &new_sock);
-	if (ret)
-		goto out;
-
-	ret = sock->ops->accept(sock, new_sock, &arg);
-	if (ret < 0)
-		goto out;
-
-	/* sock_create_lite() does not get a hold on the owner module so we
-	 * need to do it here.  Note that sock_release() uses sock->ops to
-	 * determine if it needs to decrement the reference count.  So set
-	 * sock->ops after calling accept() in case that fails.  And there's
-	 * no need to do try_module_get() as the listener should have a hold
-	 * already.
-	 */
-	new_sock->ops = sock->ops;
-	__module_get(new_sock->ops->owner);
+	mutex_lock(&rtn->rds_tcp_accept_lock);
+	new_sock = rtn->rds_tcp_accepted_sock;
+	rtn->rds_tcp_accepted_sock = NULL;
+
+	if (!new_sock) {
+		ret = sock_create_lite(listen_sock->sk->sk_family,
+				       listen_sock->sk->sk_type,
+				       listen_sock->sk->sk_protocol,
+				       &new_sock);
+		if (ret)
+			goto out;
+
+		ret = listen_sock->ops->accept(listen_sock, new_sock, &arg);
+		if (ret < 0)
+			goto out;
+
+		/* sock_create_lite() does not get a hold on the owner module so we
+		 * need to do it here.  Note that sock_release() uses sock->ops to
+		 * determine if it needs to decrement the reference count.  So set
+		 * sock->ops after calling accept() in case that fails.  And there's
+		 * no need to do try_module_get() as the listener should have a hold
+		 * already.
+		 */
+		new_sock->ops = listen_sock->ops;
+		__module_get(new_sock->ops->owner);
 
-	rds_tcp_keepalive(new_sock);
-	if (!rds_tcp_tune(new_sock)) {
-		ret = -EINVAL;
-		goto out;
+		rds_tcp_keepalive(new_sock);
+		if (!rds_tcp_tune(new_sock)) {
+			ret = -EINVAL;
+			goto out;
+		}
 	}
 
 	inet = inet_sk(new_sock->sk);
@@ -150,7 +173,7 @@ int rds_tcp_accept_one(struct socket *sock)
 	peer_addr = &daddr;
 #endif
 	rdsdebug("accepted family %d tcp %pI6c:%u -> %pI6c:%u\n",
-		 sock->sk->sk_family,
+		 listen_sock->sk->sk_family,
 		 my_addr, ntohs(inet->inet_sport),
 		 peer_addr, ntohs(inet->inet_dport));
 
@@ -170,13 +193,13 @@ int rds_tcp_accept_one(struct socket *sock)
 	}
 #endif
 
-	if (!rds_tcp_laddr_check(sock_net(sock->sk), peer_addr, dev_if)) {
+	if (!rds_tcp_laddr_check(sock_net(listen_sock->sk), peer_addr, dev_if)) {
 		/* local address connection is only allowed via loopback */
 		ret = -EOPNOTSUPP;
 		goto out;
 	}
 
-	conn = rds_conn_create(sock_net(sock->sk),
+	conn = rds_conn_create(sock_net(listen_sock->sk),
 			       my_addr, peer_addr,
 			       &rds_tcp_transport, 0, GFP_KERNEL, dev_if);
 
@@ -189,15 +212,51 @@ int rds_tcp_accept_one(struct socket *sock)
 	 * If the client reboots, this conn will need to be cleaned up.
 	 * rds_tcp_state_change() will do that cleanup
 	 */
-	rs_tcp = rds_tcp_accept_one_path(conn);
-	if (!rs_tcp)
+	if (rds_addr_cmp(&conn->c_faddr, &conn->c_laddr) < 0) {
+		/* Try to obtain a free connection slot.
+		 * If unsuccessful, we need to preserve "new_sock"
+		 * that we just accepted, since its "sk_receive_queue"
+		 * may contain messages already that have been acknowledged
+		 * to and discarded by the sender.
+		 * We must not throw those away!
+		 */
+		rs_tcp = rds_tcp_accept_one_path(conn);
+		if (!rs_tcp) {
+			/* It's okay to stash "new_sock", since
+			 * "rds_tcp_conn_slots_available" triggers "rds_tcp_accept_one"
+			 * again as soon as one of the connection slots
+			 * becomes available again
+			 */
+			rtn->rds_tcp_accepted_sock = new_sock;
+			new_sock = NULL;
+			ret = -ENOBUFS;
+			goto out;
+		}
+	} else {
+		/* This connection request came from a peer with
+		 * a larger address.
+		 * Function "rds_tcp_state_change" makes sure
+		 * that the connection doesn't transition
+		 * to state "RDS_CONN_UP", and therefore
+		 * we should not have received any messages
+		 * on this socket yet.
+		 * This is the only case where it's okay to
+		 * not dequeue messages from "sk_receive_queue".
+		 */
+		if (conn->c_npaths <= 1)
+			rds_conn_path_connect_if_down(&conn->c_path[0]);
+		rs_tcp = NULL;
 		goto rst_nsk;
+	}
+
 	mutex_lock(&rs_tcp->t_conn_path_lock);
 	cp = rs_tcp->t_cpath;
 	conn_state = rds_conn_path_state(cp);
 	WARN_ON(conn_state == RDS_CONN_UP);
-	if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_ERROR)
+	if (conn_state != RDS_CONN_CONNECTING && conn_state != RDS_CONN_ERROR) {
+		rds_conn_path_drop(cp, 0);
 		goto rst_nsk;
+	}
 	if (rs_tcp->t_sock) {
 		/* Duelling SYN has been handled in rds_tcp_accept_one() */
 		rds_tcp_reset_callbacks(new_sock, cp);
@@ -227,6 +286,9 @@ int rds_tcp_accept_one(struct socket *sock)
 		mutex_unlock(&rs_tcp->t_conn_path_lock);
 	if (new_sock)
 		sock_release(new_sock);
+
+	mutex_unlock(&rtn->rds_tcp_accept_lock);
+
 	return ret;
 }
 
@@ -254,7 +316,7 @@ void rds_tcp_listen_data_ready(struct sock *sk)
 	 * the listen socket is being torn down.
 	 */
 	if (sk->sk_state == TCP_LISTEN)
-		rds_tcp_accept_work(sk);
+		rds_tcp_accept_work(net_generic(sock_net(sk), rds_tcp_netid));
 	else
 		ready = rds_tcp_listen_sock_def_readable(sock_net(sk));
 
-- 
2.43.0


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ