lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250227042638.82553-2-allison.henderson@oracle.com>
Date: Wed, 26 Feb 2025 21:26:33 -0700
From: allison.henderson@...cle.com
To: netdev@...r.kernel.org
Subject: [PATCH 1/6] net/rds: Avoid queuing superfluous send and recv work

From: Håkon Bugge <haakon.bugge@...cle.com>

Queuing work on the connect path's work-queue is done from a plurality
of places and contexts.  Two more bits in cp_flags are defined, and
together with some helper functions, the work is only queued if not
already queued.

This to avoid the work queue being overloaded, reducing the connection
management performance, since the connection management uses the same
work queue.

Signed-off-by: Håkon Bugge <haakon.bugge@...cle.com>
Signed-off-by: Gerd Rausch <gerd.rausch@...cle.com>
Signed-off-by: Allison Henderson <allison.henderson@...cle.com>
---
 net/rds/connection.c | 11 ++++++++---
 net/rds/ib_recv.c    |  2 +-
 net/rds/ib_send.c    |  2 +-
 net/rds/rds.h        | 33 +++++++++++++++++++++++++++++++++
 net/rds/send.c       |  6 +++---
 net/rds/tcp.c        | 12 ++++++++++--
 net/rds/tcp_recv.c   |  2 +-
 net/rds/tcp_send.c   |  2 +-
 net/rds/threads.c    | 25 +++++++++++++++++--------
 9 files changed, 75 insertions(+), 20 deletions(-)

diff --git a/net/rds/connection.c b/net/rds/connection.c
index c749c5525b40..1d80586fdda2 100644
--- a/net/rds/connection.c
+++ b/net/rds/connection.c
@@ -445,9 +445,14 @@ static void rds_conn_path_destroy(struct rds_conn_path *cp)
 	if (!cp->cp_transport_data)
 		return;
 
-	/* make sure lingering queued work won't try to ref the conn */
-	cancel_delayed_work_sync(&cp->cp_send_w);
-	cancel_delayed_work_sync(&cp->cp_recv_w);
+	/* make sure lingering queued work won't try to ref the
+	 * conn. If there is work queued, we cancel it (and set the
+	 * bit to avoid any re-queueing)
+	 */
+	if (test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_send_w);
+	if (test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_recv_w);
 
 	rds_conn_path_drop(cp, true);
 	flush_work(&cp->cp_down_w);
diff --git a/net/rds/ib_recv.c b/net/rds/ib_recv.c
index e53b7f266bd7..4ecee1ff1c26 100644
--- a/net/rds/ib_recv.c
+++ b/net/rds/ib_recv.c
@@ -457,7 +457,7 @@ void rds_ib_recv_refill(struct rds_connection *conn, int prefill, gfp_t gfp)
 	    (must_wake ||
 	    (can_wait && rds_ib_ring_low(&ic->i_recv_ring)) ||
 	    rds_ib_ring_empty(&ic->i_recv_ring))) {
-		queue_delayed_work(rds_wq, &conn->c_recv_w, 1);
+		rds_cond_queue_recv_work(conn->c_path + 0, 1);
 	}
 	if (can_wait)
 		cond_resched();
diff --git a/net/rds/ib_send.c b/net/rds/ib_send.c
index 4190b90ff3b1..95edd4cb8528 100644
--- a/net/rds/ib_send.c
+++ b/net/rds/ib_send.c
@@ -419,7 +419,7 @@ void rds_ib_send_add_credits(struct rds_connection *conn, unsigned int credits)
 
 	atomic_add(IB_SET_SEND_CREDITS(credits), &ic->i_credits);
 	if (test_and_clear_bit(RDS_LL_SEND_FULL, &conn->c_flags))
-		queue_delayed_work(rds_wq, &conn->c_send_w, 0);
+		rds_cond_queue_send_work(conn->c_path + 0, 0);
 
 	WARN_ON(IB_GET_SEND_CREDITS(credits) >= 16384);
 
diff --git a/net/rds/rds.h b/net/rds/rds.h
index dc360252c515..c9a22d0e887b 100644
--- a/net/rds/rds.h
+++ b/net/rds/rds.h
@@ -90,6 +90,8 @@ enum {
 #define RDS_IN_XMIT		2
 #define RDS_RECV_REFILL		3
 #define	RDS_DESTROY_PENDING	4
+#define RDS_SEND_WORK_QUEUED	5
+#define RDS_RECV_WORK_QUEUED	6
 
 /* Max number of multipaths per RDS connection. Must be a power of 2 */
 #define	RDS_MPATH_WORKERS	8
@@ -791,6 +793,37 @@ void __rds_conn_path_error(struct rds_conn_path *cp, const char *, ...);
 #define rds_conn_path_error(cp, fmt...) \
 	__rds_conn_path_error(cp, KERN_WARNING "RDS: " fmt)
 
+extern struct workqueue_struct *rds_wq;
+static inline void rds_cond_queue_send_work(struct rds_conn_path *cp, unsigned long delay)
+{
+	if (!test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+		queue_delayed_work(rds_wq, &cp->cp_send_w, delay);
+}
+
+static inline void rds_clear_queued_send_work_bit(struct rds_conn_path *cp)
+{
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__before_atomic();
+	clear_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags);
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__after_atomic();
+}
+
+static inline void rds_cond_queue_recv_work(struct rds_conn_path *cp, unsigned long delay)
+{
+	if (!test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+		queue_delayed_work(rds_wq, &cp->cp_recv_w, delay);
+}
+
+static inline void rds_clear_queued_recv_work_bit(struct rds_conn_path *cp)
+{
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__before_atomic();
+	clear_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags);
+	/* clear_bit() does not imply a memory barrier */
+	smp_mb__after_atomic();
+}
+
 static inline int
 rds_conn_path_transition(struct rds_conn_path *cp, int old, int new)
 {
diff --git a/net/rds/send.c b/net/rds/send.c
index 09a280110654..6329cc8ec246 100644
--- a/net/rds/send.c
+++ b/net/rds/send.c
@@ -458,7 +458,7 @@ int rds_send_xmit(struct rds_conn_path *cp)
 			if (rds_destroy_pending(cp->cp_conn))
 				ret = -ENETUNREACH;
 			else
-				queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+				rds_cond_queue_send_work(cp, 1);
 			rcu_read_unlock();
 		} else if (raced) {
 			rds_stats_inc(s_send_lock_queue_raced);
@@ -1380,7 +1380,7 @@ int rds_sendmsg(struct socket *sock, struct msghdr *msg, size_t payload_len)
 		if (rds_destroy_pending(cpath->cp_conn))
 			ret = -ENETUNREACH;
 		else
-			queue_delayed_work(rds_wq, &cpath->cp_send_w, 1);
+			rds_cond_queue_send_work(cpath, 1);
 		rcu_read_unlock();
 	}
 	if (ret)
@@ -1473,7 +1473,7 @@ rds_send_probe(struct rds_conn_path *cp, __be16 sport,
 	/* schedule the send work on rds_wq */
 	rcu_read_lock();
 	if (!rds_destroy_pending(cp->cp_conn))
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 1);
+		rds_cond_queue_send_work(cp, 0);
 	rcu_read_unlock();
 
 	rds_message_put(rm);
diff --git a/net/rds/tcp.c b/net/rds/tcp.c
index 0581c53e6517..b3f2c6e27b59 100644
--- a/net/rds/tcp.c
+++ b/net/rds/tcp.c
@@ -168,8 +168,16 @@ void rds_tcp_reset_callbacks(struct socket *sock,
 	atomic_set(&cp->cp_state, RDS_CONN_RESETTING);
 	wait_event(cp->cp_waitq, !test_bit(RDS_IN_XMIT, &cp->cp_flags));
 	/* reset receive side state for rds_tcp_data_recv() for osock  */
-	cancel_delayed_work_sync(&cp->cp_send_w);
-	cancel_delayed_work_sync(&cp->cp_recv_w);
+
+	/* make sure lingering queued work won't try to ref the
+	 * conn. If there is work queued, we cancel it (and set the bit
+	 * to avoid any re-queueing)
+	 */
+
+	if (test_and_set_bit(RDS_SEND_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_send_w);
+	if (test_and_set_bit(RDS_RECV_WORK_QUEUED, &cp->cp_flags))
+		cancel_delayed_work_sync(&cp->cp_recv_w);
 	lock_sock(osock->sk);
 	if (tc->t_tinc) {
 		rds_inc_put(&tc->t_tinc->ti_inc);
diff --git a/net/rds/tcp_recv.c b/net/rds/tcp_recv.c
index 7997a19d1da3..ab9fc150f974 100644
--- a/net/rds/tcp_recv.c
+++ b/net/rds/tcp_recv.c
@@ -327,7 +327,7 @@ void rds_tcp_data_ready(struct sock *sk)
 	if (rds_tcp_read_sock(cp, GFP_ATOMIC) == -ENOMEM) {
 		rcu_read_lock();
 		if (!rds_destroy_pending(cp->cp_conn))
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+			rds_cond_queue_recv_work(cp, 0);
 		rcu_read_unlock();
 	}
 out:
diff --git a/net/rds/tcp_send.c b/net/rds/tcp_send.c
index 7d284ac7e81a..cecd3dbde58d 100644
--- a/net/rds/tcp_send.c
+++ b/net/rds/tcp_send.c
@@ -201,7 +201,7 @@ void rds_tcp_write_space(struct sock *sk)
 	rcu_read_lock();
 	if ((refcount_read(&sk->sk_wmem_alloc) << 1) <= sk->sk_sndbuf &&
 	    !rds_destroy_pending(cp->cp_conn))
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+		rds_cond_queue_send_work(cp, 0);
 	rcu_read_unlock();
 
 out:
diff --git a/net/rds/threads.c b/net/rds/threads.c
index 1f424cbfcbb4..eedae5653051 100644
--- a/net/rds/threads.c
+++ b/net/rds/threads.c
@@ -89,8 +89,10 @@ void rds_connect_path_complete(struct rds_conn_path *cp, int curr)
 	set_bit(0, &cp->cp_conn->c_map_queued);
 	rcu_read_lock();
 	if (!rds_destroy_pending(cp->cp_conn)) {
-		queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
-		queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+		rds_clear_queued_send_work_bit(cp);
+		rds_cond_queue_send_work(cp, 0);
+		rds_clear_queued_recv_work_bit(cp);
+		rds_cond_queue_recv_work(cp, 0);
 	}
 	rcu_read_unlock();
 	cp->cp_conn->c_proposed_version = RDS_PROTOCOL_VERSION;
@@ -193,9 +195,11 @@ void rds_send_worker(struct work_struct *work)
 	struct rds_conn_path *cp = container_of(work,
 						struct rds_conn_path,
 						cp_send_w.work);
+	unsigned long delay;
 	int ret;
 
 	if (rds_conn_path_state(cp) == RDS_CONN_UP) {
+		rds_clear_queued_send_work_bit(cp);
 		clear_bit(RDS_LL_SEND_FULL, &cp->cp_flags);
 		ret = rds_send_xmit(cp);
 		cond_resched();
@@ -203,15 +207,17 @@ void rds_send_worker(struct work_struct *work)
 		switch (ret) {
 		case -EAGAIN:
 			rds_stats_inc(s_send_immediate_retry);
-			queue_delayed_work(rds_wq, &cp->cp_send_w, 0);
+			delay = 0;
 			break;
 		case -ENOMEM:
 			rds_stats_inc(s_send_delayed_retry);
-			queue_delayed_work(rds_wq, &cp->cp_send_w, 2);
+			delay = 2;
 			break;
 		default:
-			break;
+			return;
 		}
+
+		rds_cond_queue_send_work(cp, delay);
 	}
 }
 
@@ -220,23 +226,26 @@ void rds_recv_worker(struct work_struct *work)
 	struct rds_conn_path *cp = container_of(work,
 						struct rds_conn_path,
 						cp_recv_w.work);
+	unsigned long delay;
 	int ret;
 
 	if (rds_conn_path_state(cp) == RDS_CONN_UP) {
+		rds_clear_queued_recv_work_bit(cp);
 		ret = cp->cp_conn->c_trans->recv_path(cp);
 		rdsdebug("conn %p ret %d\n", cp->cp_conn, ret);
 		switch (ret) {
 		case -EAGAIN:
 			rds_stats_inc(s_recv_immediate_retry);
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 0);
+			delay = 0;
 			break;
 		case -ENOMEM:
 			rds_stats_inc(s_recv_delayed_retry);
-			queue_delayed_work(rds_wq, &cp->cp_recv_w, 2);
+			delay = 2;
 			break;
 		default:
-			break;
+			return;
 		}
+		rds_cond_queue_recv_work(cp, delay);
 	}
 }
 
-- 
2.43.0


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ