lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20241202143057.378147-17-dhowells@redhat.com>
Date: Mon,  2 Dec 2024 14:30:34 +0000
From: David Howells <dhowells@...hat.com>
To: netdev@...r.kernel.org
Cc: David Howells <dhowells@...hat.com>,
	Marc Dionne <marc.dionne@...istor.com>,
	Yunsheng Lin <linyunsheng@...wei.com>,
	"David S. Miller" <davem@...emloft.net>,
	Eric Dumazet <edumazet@...gle.com>,
	Jakub Kicinski <kuba@...nel.org>,
	Paolo Abeni <pabeni@...hat.com>,
	linux-afs@...ts.infradead.org,
	linux-kernel@...r.kernel.org
Subject: [PATCH net-next 16/37] rxrpc: Implement progressive transmission queue struct

We need to scan the buffers in the transmission queue occasionally when
processing ACKs, but the transmission queue is currently a linked list of
transmission buffers which, when we eventually expand the Tx window to 8192
packets will be very slow to walk.

Instead, pull the fields we need to examine a lot (last sent time,
retransmitted flag) into a new struct rxrpc_txqueue and make each one hold
an array of 32 or 64 packets.

The transmission queue is then a list of these structs, each pointing to a
contiguous set of packets.  Scanning is then a lot faster as the flags and
timestamps are concentrated in the CPU dcache.

The transmission timestamps are stored as a number of microseconds from a
base ktime to reduce memory requirements.  This should be fine provided we
manage to transmit an entire buffer within an hour.

This will make implementing RACK-TLP [RFC8985] easier as it will be less
costly to scan the transmission buffers.

Signed-off-by: David Howells <dhowells@...hat.com>
cc: Marc Dionne <marc.dionne@...istor.com>
cc: "David S. Miller" <davem@...emloft.net>
cc: Eric Dumazet <edumazet@...gle.com>
cc: Jakub Kicinski <kuba@...nel.org>
cc: Paolo Abeni <pabeni@...hat.com>
cc: linux-afs@...ts.infradead.org
cc: netdev@...r.kernel.org
---
 include/trace/events/rxrpc.h |  98 ++++++++++++++---
 net/rxrpc/ar-internal.h      |  47 ++++++--
 net/rxrpc/call_event.c       | 204 ++++++++++++++++++++++-------------
 net/rxrpc/call_object.c      |  38 ++++---
 net/rxrpc/input.c            |  72 ++++++++++---
 net/rxrpc/output.c           | 156 ++++++++++++++-------------
 net/rxrpc/sendmsg.c          |  69 +++++++++---
 net/rxrpc/txbuf.c            |  41 +------
 8 files changed, 468 insertions(+), 257 deletions(-)

diff --git a/include/trace/events/rxrpc.h b/include/trace/events/rxrpc.h
index 28fa7be31ff8..e6cf9ec940aa 100644
--- a/include/trace/events/rxrpc.h
+++ b/include/trace/events/rxrpc.h
@@ -297,7 +297,6 @@
 
 #define rxrpc_txqueue_traces \
 	EM(rxrpc_txqueue_await_reply,		"AWR") \
-	EM(rxrpc_txqueue_dequeue,		"DEQ") \
 	EM(rxrpc_txqueue_end,			"END") \
 	EM(rxrpc_txqueue_queue,			"QUE") \
 	EM(rxrpc_txqueue_queue_last,		"QLS") \
@@ -482,6 +481,19 @@
 	EM(rxrpc_txbuf_see_send_more,		"SEE SEND+  ")	\
 	E_(rxrpc_txbuf_see_unacked,		"SEE UNACKED")
 
+#define rxrpc_tq_traces \
+	EM(rxrpc_tq_alloc,			"ALLOC") \
+	EM(rxrpc_tq_cleaned,			"CLEAN") \
+	EM(rxrpc_tq_decant,			"DCNT ") \
+	EM(rxrpc_tq_decant_advance,		"DCNT>") \
+	EM(rxrpc_tq_queue,			"QUEUE") \
+	EM(rxrpc_tq_queue_dup,			"QUE!!") \
+	EM(rxrpc_tq_rotate,			"ROT  ") \
+	EM(rxrpc_tq_rotate_and_free,		"ROT-F") \
+	EM(rxrpc_tq_rotate_and_keep,		"ROT-K") \
+	EM(rxrpc_tq_transmit,			"XMIT ") \
+	E_(rxrpc_tq_transmit_advance,		"XMIT>")
+
 #define rxrpc_pmtud_reduce_traces \
 	EM(rxrpc_pmtud_reduce_ack,		"Ack  ")	\
 	EM(rxrpc_pmtud_reduce_icmp,		"Icmp ")	\
@@ -518,6 +530,7 @@ enum rxrpc_rtt_tx_trace		{ rxrpc_rtt_tx_traces } __mode(byte);
 enum rxrpc_sack_trace		{ rxrpc_sack_traces } __mode(byte);
 enum rxrpc_skb_trace		{ rxrpc_skb_traces } __mode(byte);
 enum rxrpc_timer_trace		{ rxrpc_timer_traces } __mode(byte);
+enum rxrpc_tq_trace		{ rxrpc_tq_traces } __mode(byte);
 enum rxrpc_tx_point		{ rxrpc_tx_points } __mode(byte);
 enum rxrpc_txbuf_trace		{ rxrpc_txbuf_traces } __mode(byte);
 enum rxrpc_txqueue_trace	{ rxrpc_txqueue_traces } __mode(byte);
@@ -554,6 +567,7 @@ rxrpc_rtt_tx_traces;
 rxrpc_sack_traces;
 rxrpc_skb_traces;
 rxrpc_timer_traces;
+rxrpc_tq_traces;
 rxrpc_tx_points;
 rxrpc_txbuf_traces;
 rxrpc_txqueue_traces;
@@ -881,7 +895,7 @@ TRACE_EVENT(rxrpc_txqueue,
 		    __field(rxrpc_seq_t,		acks_hard_ack)
 		    __field(rxrpc_seq_t,		tx_bottom)
 		    __field(rxrpc_seq_t,		tx_top)
-		    __field(rxrpc_seq_t,		tx_prepared)
+		    __field(rxrpc_seq_t,		send_top)
 		    __field(int,			tx_winsize)
 			     ),
 
@@ -891,7 +905,7 @@ TRACE_EVENT(rxrpc_txqueue,
 		    __entry->acks_hard_ack = call->acks_hard_ack;
 		    __entry->tx_bottom = call->tx_bottom;
 		    __entry->tx_top = call->tx_top;
-		    __entry->tx_prepared = call->tx_prepared;
+		    __entry->send_top = call->send_top;
 		    __entry->tx_winsize = call->tx_winsize;
 			   ),
 
@@ -902,14 +916,14 @@ TRACE_EVENT(rxrpc_txqueue,
 		      __entry->acks_hard_ack,
 		      __entry->tx_top - __entry->tx_bottom,
 		      __entry->tx_top - __entry->acks_hard_ack,
-		      __entry->tx_prepared - __entry->tx_bottom,
+		      __entry->send_top - __entry->tx_top,
 		      __entry->tx_winsize)
 	    );
 
 TRACE_EVENT(rxrpc_transmit,
-	    TP_PROTO(struct rxrpc_call *call, int space),
+	    TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t send_top, int space),
 
-	    TP_ARGS(call, space),
+	    TP_ARGS(call, send_top, space),
 
 	    TP_STRUCT__entry(
 		    __field(unsigned int,	call)
@@ -925,12 +939,12 @@ TRACE_EVENT(rxrpc_transmit,
 
 	    TP_fast_assign(
 		    __entry->call	= call->debug_id;
-		    __entry->seq	= call->tx_bottom;
+		    __entry->seq	= call->tx_top + 1;
 		    __entry->space	= space;
 		    __entry->tx_winsize	= call->tx_winsize;
 		    __entry->cong_cwnd	= call->cong_cwnd;
 		    __entry->cong_extra	= call->cong_extra;
-		    __entry->prepared	= call->tx_prepared - call->tx_bottom;
+		    __entry->prepared	= send_top - call->tx_bottom;
 		    __entry->in_flight	= call->tx_top - call->acks_hard_ack;
 		    __entry->pmtud_jumbo = call->peer->pmtud_jumbo;
 			   ),
@@ -947,6 +961,32 @@ TRACE_EVENT(rxrpc_transmit,
 		      __entry->pmtud_jumbo)
 	    );
 
+TRACE_EVENT(rxrpc_tx_rotate,
+	    TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq, rxrpc_seq_t to),
+
+	    TP_ARGS(call, seq, to),
+
+	    TP_STRUCT__entry(
+		    __field(unsigned int,	call)
+		    __field(rxrpc_seq_t,	seq)
+		    __field(rxrpc_seq_t,	to)
+		    __field(rxrpc_seq_t,	top)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->call	= call->debug_id;
+		    __entry->seq	= seq;
+		    __entry->to		= to;
+		    __entry->top	= call->tx_top;
+			   ),
+
+	    TP_printk("c=%08x q=%08x-%08x-%08x",
+		      __entry->call,
+		      __entry->seq,
+		      __entry->to,
+		      __entry->top)
+	    );
+
 TRACE_EVENT(rxrpc_rx_data,
 	    TP_PROTO(unsigned int call, rxrpc_seq_t seq,
 		     rxrpc_serial_t serial, u8 flags),
@@ -1621,10 +1661,11 @@ TRACE_EVENT(rxrpc_drop_ack,
 	    );
 
 TRACE_EVENT(rxrpc_retransmit,
-	    TP_PROTO(struct rxrpc_call *call, rxrpc_seq_t seq,
-		     rxrpc_serial_t serial, ktime_t expiry),
+	    TP_PROTO(struct rxrpc_call *call,
+		     struct rxrpc_send_data_req *req,
+		     struct rxrpc_txbuf *txb, ktime_t expiry),
 
-	    TP_ARGS(call, seq, serial, expiry),
+	    TP_ARGS(call, req, txb, expiry),
 
 	    TP_STRUCT__entry(
 		    __field(unsigned int,	call)
@@ -1635,8 +1676,8 @@ TRACE_EVENT(rxrpc_retransmit,
 
 	    TP_fast_assign(
 		    __entry->call = call->debug_id;
-		    __entry->seq = seq;
-		    __entry->serial = serial;
+		    __entry->seq = req->seq;
+		    __entry->serial = txb->serial;
 		    __entry->expiry = expiry;
 			   ),
 
@@ -1714,9 +1755,9 @@ TRACE_EVENT(rxrpc_reset_cwnd,
 		    __entry->cwnd	= call->cong_cwnd;
 		    __entry->extra	= call->cong_extra;
 		    __entry->hard_ack	= call->acks_hard_ack;
-		    __entry->prepared	= call->tx_prepared - call->tx_bottom;
+		    __entry->prepared	= call->send_top - call->tx_bottom;
 		    __entry->since_last_tx = ktime_sub(now, call->tx_last_sent);
-		    __entry->has_data	= !list_empty(&call->tx_sendmsg);
+		    __entry->has_data	= call->tx_bottom != call->tx_top;
 			   ),
 
 	    TP_printk("c=%08x q=%08x %s cw=%u+%u pr=%u tm=%llu d=%u",
@@ -2024,6 +2065,33 @@ TRACE_EVENT(rxrpc_txbuf,
 		      __entry->ref)
 	    );
 
+TRACE_EVENT(rxrpc_tq,
+	    TP_PROTO(struct rxrpc_call *call, struct rxrpc_txqueue *tq,
+		     rxrpc_seq_t seq, enum rxrpc_tq_trace trace),
+
+	    TP_ARGS(call, tq, seq, trace),
+
+	    TP_STRUCT__entry(
+		    __field(unsigned int,		call_debug_id)
+		    __field(rxrpc_seq_t,		qbase)
+		    __field(rxrpc_seq_t,		seq)
+		    __field(enum rxrpc_tq_trace,	trace)
+			     ),
+
+	    TP_fast_assign(
+		    __entry->call_debug_id = call->debug_id;
+		    __entry->qbase = tq ? tq->qbase : call->tx_qbase;
+		    __entry->seq = seq;
+		    __entry->trace = trace;
+			   ),
+
+	    TP_printk("c=%08x bq=%08x q=%08x %s",
+		      __entry->call_debug_id,
+		      __entry->qbase,
+		      __entry->seq,
+		      __print_symbolic(__entry->trace, rxrpc_tq_traces))
+	    );
+
 TRACE_EVENT(rxrpc_poke_call,
 	    TP_PROTO(struct rxrpc_call *call, bool busy,
 		     enum rxrpc_call_poke_trace what),
diff --git a/net/rxrpc/ar-internal.h b/net/rxrpc/ar-internal.h
index 84efa21f176c..bcce4862b0b7 100644
--- a/net/rxrpc/ar-internal.h
+++ b/net/rxrpc/ar-internal.h
@@ -30,6 +30,7 @@ struct rxrpc_crypt {
 struct key_preparsed_payload;
 struct rxrpc_connection;
 struct rxrpc_txbuf;
+struct rxrpc_txqueue;
 
 /*
  * Mark applied to socket buffers in skb->mark.  skb->priority is used
@@ -691,13 +692,17 @@ struct rxrpc_call {
 	unsigned short		rx_pkt_offset;	/* Current recvmsg packet offset */
 	unsigned short		rx_pkt_len;	/* Current recvmsg packet len */
 
+	/* Sendmsg data tracking. */
+	rxrpc_seq_t		send_top;	/* Highest Tx slot filled by sendmsg. */
+	struct rxrpc_txqueue	*send_queue;	/* Queue that sendmsg is writing into */
+
 	/* Transmitted data tracking. */
 	spinlock_t		tx_lock;	/* Transmit queue lock */
-	struct list_head	tx_sendmsg;	/* Sendmsg prepared packets */
-	struct list_head	tx_buffer;	/* Buffer of transmissible packets */
+	struct rxrpc_txqueue	*tx_queue;	/* Start of transmission buffers */
+	struct rxrpc_txqueue	*tx_qtail;	/* End of transmission buffers */
+	rxrpc_seq_t		tx_qbase;	/* First slot in tx_queue */
 	rxrpc_seq_t		tx_bottom;	/* First packet in buffer */
 	rxrpc_seq_t		tx_transmitted;	/* Highest packet transmitted */
-	rxrpc_seq_t		tx_prepared;	/* Highest Tx slot prepared. */
 	rxrpc_seq_t		tx_top;		/* Highest Tx slot allocated. */
 	u16			tx_backoff;	/* Delay to insert due to Tx failure (ms) */
 	u8			tx_winsize;	/* Maximum size of Tx window */
@@ -815,9 +820,6 @@ struct rxrpc_send_params {
  * Buffer of data to be output as a packet.
  */
 struct rxrpc_txbuf {
-	struct list_head	call_link;	/* Link in call->tx_sendmsg/tx_buffer */
-	struct list_head	tx_link;	/* Link in live Enc queue or Tx queue */
-	ktime_t			last_sent;	/* Time at which last transmitted */
 	refcount_t		ref;
 	rxrpc_seq_t		seq;		/* Sequence number of this packet */
 	rxrpc_serial_t		serial;		/* Last serial number transmitted with */
@@ -849,6 +851,36 @@ static inline bool rxrpc_sending_to_client(const struct rxrpc_txbuf *txb)
 	return !rxrpc_sending_to_server(txb);
 }
 
+/*
+ * Transmit queue element, including RACK [RFC8985] per-segment metadata.  The
+ * transmission timestamp is in usec from the base.
+ */
+struct rxrpc_txqueue {
+	/* Start with the members we want to prefetch. */
+	struct rxrpc_txqueue	*next;
+	ktime_t			xmit_ts_base;
+	rxrpc_seq_t		qbase;
+
+	/* The arrays we want to pack into as few cache lines as possible. */
+	struct {
+#define RXRPC_NR_TXQUEUE BITS_PER_LONG
+#define RXRPC_TXQ_MASK (RXRPC_NR_TXQUEUE - 1)
+		struct rxrpc_txbuf *bufs[RXRPC_NR_TXQUEUE];
+		unsigned int	segment_xmit_ts[RXRPC_NR_TXQUEUE];
+	} ____cacheline_aligned;
+};
+
+/*
+ * Data transmission request.
+ */
+struct rxrpc_send_data_req {
+	ktime_t			now;		/* Current time */
+	struct rxrpc_txqueue	*tq;		/* Tx queue segment holding first DATA */
+	rxrpc_seq_t		seq;		/* Sequence of first data */
+	int			n;		/* Number of DATA packets to glue into jumbo */
+	bool			did_send;	/* T if did actually send */
+};
+
 #include <trace/events/rxrpc.h>
 
 /*
@@ -905,7 +937,6 @@ void rxrpc_propose_ping(struct rxrpc_call *call, u32 serial,
 			enum rxrpc_propose_ack_trace why);
 void rxrpc_propose_delay_ACK(struct rxrpc_call *, rxrpc_serial_t,
 			     enum rxrpc_propose_ack_trace);
-void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *);
 void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb);
 
 bool rxrpc_input_call_event(struct rxrpc_call *call);
@@ -1191,10 +1222,10 @@ void rxrpc_send_ACK(struct rxrpc_call *call, u8 ack_reason,
 		    rxrpc_serial_t serial, enum rxrpc_propose_ack_trace why);
 void rxrpc_send_probe_for_pmtud(struct rxrpc_call *call);
 int rxrpc_send_abort_packet(struct rxrpc_call *);
+void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req);
 void rxrpc_send_conn_abort(struct rxrpc_connection *conn);
 void rxrpc_reject_packet(struct rxrpc_local *local, struct sk_buff *skb);
 void rxrpc_send_keepalive(struct rxrpc_peer *);
-void rxrpc_transmit_data(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n);
 
 /*
  * peer_event.c
diff --git a/net/rxrpc/call_event.c b/net/rxrpc/call_event.c
index ef47de3f41c6..c97b85f34e4f 100644
--- a/net/rxrpc/call_event.c
+++ b/net/rxrpc/call_event.c
@@ -62,57 +62,85 @@ static void rxrpc_congestion_timeout(struct rxrpc_call *call)
 	set_bit(RXRPC_CALL_RETRANS_TIMEOUT, &call->flags);
 }
 
+/*
+ * Retransmit one or more packets.
+ */
+static void rxrpc_retransmit_data(struct rxrpc_call *call,
+				  struct rxrpc_send_data_req *req,
+				  ktime_t rto)
+{
+	struct rxrpc_txqueue *tq = req->tq;
+	unsigned int ix = req->seq & RXRPC_TXQ_MASK;
+	struct rxrpc_txbuf *txb = tq->bufs[ix];
+	ktime_t xmit_ts, resend_at;
+
+	_enter("%x,%x,%x,%px", tq->qbase, req->seq, ix, txb);
+
+	xmit_ts = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[ix]);
+	resend_at = ktime_add(xmit_ts, rto);
+	trace_rxrpc_retransmit(call, req, txb,
+			       ktime_sub(resend_at, req->now));
+
+	txb->flags |= RXRPC_TXBUF_RESENT;
+	rxrpc_send_data_packet(call, req);
+	rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
+
+	req->tq		= NULL;
+	req->n		= 0;
+	req->did_send	= true;
+	req->now	= ktime_get_real();
+}
+
 /*
  * Perform retransmission of NAK'd and unack'd packets.
  */
 void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
 {
+	struct rxrpc_send_data_req req = {
+		.now	= ktime_get_real(),
+	};
 	struct rxrpc_ackpacket *ack = NULL;
 	struct rxrpc_skb_priv *sp;
+	struct rxrpc_txqueue *tq;
 	struct rxrpc_txbuf *txb;
-	rxrpc_seq_t transmitted = call->tx_transmitted;
+	rxrpc_seq_t transmitted = call->tx_transmitted, seq;
 	ktime_t next_resend = KTIME_MAX, rto = ns_to_ktime(call->peer->rto_us * NSEC_PER_USEC);
-	ktime_t resend_at = KTIME_MAX, now, delay;
+	ktime_t resend_at = KTIME_MAX, delay;
 	bool unacked = false, did_send = false;
-	unsigned int i;
+	unsigned int qix;
 
 	_enter("{%d,%d}", call->acks_hard_ack, call->tx_top);
 
-	now = ktime_get_real();
-
-	if (list_empty(&call->tx_buffer))
+	if (call->tx_bottom == call->tx_top)
 		goto no_resend;
 
 	trace_rxrpc_resend(call, ack_skb);
-	txb = list_first_entry(&call->tx_buffer, struct rxrpc_txbuf, call_link);
+	tq = call->tx_queue;
+	seq = call->tx_bottom;
 
-	/* Scan the soft ACK table without dropping the lock and resend any
-	 * explicitly NAK'd packets.
-	 */
+	/* Scan the soft ACK table and resend any explicitly NAK'd packets. */
 	if (ack_skb) {
 		sp = rxrpc_skb(ack_skb);
 		ack = (void *)ack_skb->data + sizeof(struct rxrpc_wire_header);
 
-		for (i = 0; i < sp->ack.nr_acks; i++) {
-			rxrpc_seq_t seq;
+		for (int i = 0; i < sp->ack.nr_acks; i++) {
+			rxrpc_seq_t aseq;
 
 			if (ack->acks[i] & 1)
 				continue;
-			seq = sp->ack.first_ack + i;
-			if (after(txb->seq, transmitted))
-				break;
-			if (after(txb->seq, seq))
-				continue; /* A new hard ACK probably came in */
-			list_for_each_entry_from(txb, &call->tx_buffer, call_link) {
-				if (txb->seq == seq)
-					goto found_txb;
-			}
-			goto no_further_resend;
+			aseq = sp->ack.first_ack + i;
+			while (after_eq(aseq, tq->qbase + RXRPC_NR_TXQUEUE))
+				tq = tq->next;
+			seq = aseq;
+			qix = seq - tq->qbase;
+			txb = tq->bufs[qix];
+			if (after(seq, transmitted))
+				goto no_further_resend;
 
-		found_txb:
-			resend_at = ktime_add(txb->last_sent, rto);
+			resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]);
+			resend_at = ktime_add(resend_at, rto);
 			if (after(txb->serial, call->acks_highest_serial)) {
-				if (ktime_after(resend_at, now) &&
+				if (ktime_after(resend_at, req.now) &&
 				    ktime_before(resend_at, next_resend))
 					next_resend = resend_at;
 				continue; /* Ack point not yet reached */
@@ -120,17 +148,13 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
 
 			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_unacked);
 
-			trace_rxrpc_retransmit(call, txb->seq, txb->serial,
-					       ktime_sub(resend_at, now));
-
-			txb->flags |= RXRPC_TXBUF_RESENT;
-			rxrpc_transmit_data(call, txb, 1);
-			did_send = true;
-			now = ktime_get_real();
+			req.tq  = tq;
+			req.seq = seq;
+			req.n   = 1;
+			rxrpc_retransmit_data(call, &req, rto);
 
-			if (list_is_last(&txb->call_link, &call->tx_buffer))
+			if (after_eq(seq, call->tx_top))
 				goto no_further_resend;
-			txb = list_next_entry(txb, call_link);
 		}
 	}
 
@@ -139,35 +163,43 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
 	 * ACK'd or NACK'd in due course, so don't worry about it here; here we
 	 * need to consider retransmitting anything beyond that point.
 	 */
-	if (after_eq(call->acks_prev_seq, call->tx_transmitted))
+	seq = call->acks_prev_seq;
+	if (after_eq(seq, call->tx_transmitted))
 		goto no_further_resend;
+	seq++;
 
-	list_for_each_entry_from(txb, &call->tx_buffer, call_link) {
-		resend_at = ktime_add(txb->last_sent, rto);
+	while (after_eq(seq, tq->qbase + RXRPC_NR_TXQUEUE))
+		tq = tq->next;
 
-		if (before_eq(txb->seq, call->acks_prev_seq))
+	while (before_eq(seq, call->tx_transmitted)) {
+		qix = seq - tq->qbase;
+		if (qix >= RXRPC_NR_TXQUEUE) {
+			tq = tq->next;
 			continue;
-		if (after(txb->seq, call->tx_transmitted))
-			break; /* Not transmitted yet */
+		}
+		txb = tq->bufs[qix];
+		resend_at = ktime_add_us(tq->xmit_ts_base, tq->segment_xmit_ts[qix]);
+		resend_at = ktime_add(resend_at, rto);
 
 		if (ack && ack->reason == RXRPC_ACK_PING_RESPONSE &&
 		    before(txb->serial, ntohl(ack->serial)))
 			goto do_resend; /* Wasn't accounted for by a more recent ping. */
 
-		if (ktime_after(resend_at, now)) {
+		if (ktime_after(resend_at, req.now)) {
 			if (ktime_before(resend_at, next_resend))
 				next_resend = resend_at;
+			seq++;
 			continue;
 		}
 
 	do_resend:
 		unacked = true;
 
-		txb->flags |= RXRPC_TXBUF_RESENT;
-		rxrpc_transmit_data(call, txb, 1);
-		did_send = true;
-		rxrpc_inc_stat(call->rxnet, stat_tx_data_retrans);
-		now = ktime_get_real();
+		req.tq  = tq;
+		req.seq = seq;
+		req.n   = 1;
+		rxrpc_retransmit_data(call, &req, rto);
+		seq++;
 	}
 
 no_further_resend:
@@ -175,7 +207,8 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
 	if (resend_at < KTIME_MAX) {
 		delay = rxrpc_get_rto_backoff(call->peer, did_send);
 		resend_at = ktime_add(resend_at, delay);
-		trace_rxrpc_timer_set(call, resend_at - now, rxrpc_timer_trace_resend_reset);
+		trace_rxrpc_timer_set(call, resend_at - req.now,
+				      rxrpc_timer_trace_resend_reset);
 	}
 	call->resend_at = resend_at;
 
@@ -186,11 +219,11 @@ void rxrpc_resend(struct rxrpc_call *call, struct sk_buff *ack_skb)
 	 * that an ACK got lost somewhere.  Send a ping to find out instead of
 	 * retransmitting data.
 	 */
-	if (!did_send) {
+	if (!req.did_send) {
 		ktime_t next_ping = ktime_add_us(call->acks_latest_ts,
 						 call->peer->srtt_us >> 3);
 
-		if (ktime_sub(next_ping, now) <= 0)
+		if (ktime_sub(next_ping, req.now) <= 0)
 			rxrpc_send_ACK(call, RXRPC_ACK_PING, 0,
 				       rxrpc_propose_ack_ping_for_0_retrans);
 	}
@@ -240,47 +273,68 @@ static unsigned int rxrpc_tx_window_space(struct rxrpc_call *call)
 }
 
 /*
- * Decant some if the sendmsg prepared queue into the transmission buffer.
+ * Transmit some as-yet untransmitted data.
  */
-static void rxrpc_decant_prepared_tx(struct rxrpc_call *call)
+static void rxrpc_transmit_fresh_data(struct rxrpc_call *call)
 {
 	int space = rxrpc_tx_window_space(call);
 
 	if (!test_bit(RXRPC_CALL_EXPOSED, &call->flags)) {
-		if (list_empty(&call->tx_sendmsg))
+		if (call->send_top == call->tx_top)
 			return;
 		rxrpc_expose_client_call(call);
 	}
 
 	while (space > 0) {
-		struct rxrpc_txbuf *head = NULL, *txb;
-		int count = 0, limit = min(space, 1);
-
-		if (list_empty(&call->tx_sendmsg))
+		struct rxrpc_send_data_req req = {
+			.now	= ktime_get_real(),
+			.seq	= call->tx_transmitted + 1,
+			.n	= 0,
+		};
+		struct rxrpc_txqueue *tq;
+		struct rxrpc_txbuf *txb;
+		rxrpc_seq_t send_top, seq;
+		int limit = min(space, 1);
+
+		/* Order send_top before the contents of the new txbufs and
+		 * txqueue pointers
+		 */
+		send_top = smp_load_acquire(&call->send_top);
+		if (call->tx_top == send_top)
 			break;
 
-		trace_rxrpc_transmit(call, space);
+		trace_rxrpc_transmit(call, send_top, space);
+
+		tq = call->tx_qtail;
+		seq = call->tx_top;
+		trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant);
 
-		spin_lock(&call->tx_lock);
 		do {
-			txb = list_first_entry(&call->tx_sendmsg,
-					       struct rxrpc_txbuf, call_link);
-			if (!head)
-				head = txb;
-			list_move_tail(&txb->call_link, &call->tx_buffer);
-			count++;
+			int ix;
+
+			seq++;
+			ix = seq & RXRPC_TXQ_MASK;
+			if (!ix) {
+				tq = tq->next;
+				trace_rxrpc_tq(call, tq, seq, rxrpc_tq_decant_advance);
+			}
+			if (!req.tq)
+				req.tq = tq;
+			txb = tq->bufs[ix];
+			req.n++;
 			if (!txb->jumboable)
 				break;
-		} while (count < limit && !list_empty(&call->tx_sendmsg));
+		} while (req.n < limit && before(seq, send_top));
 
-		spin_unlock(&call->tx_lock);
-
-		call->tx_top = txb->seq;
-		if (txb->flags & RXRPC_LAST_PACKET)
+		if (txb->flags & RXRPC_LAST_PACKET) {
 			rxrpc_close_tx_phase(call);
+			tq = NULL;
+		}
+		call->tx_qtail = tq;
+		call->tx_top = seq;
 
-		space -= count;
-		rxrpc_transmit_data(call, head, count);
+		space -= req.n;
+		rxrpc_send_data_packet(call, &req);
 	}
 }
 
@@ -288,7 +342,7 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
 {
 	switch (__rxrpc_call_state(call)) {
 	case RXRPC_CALL_SERVER_ACK_REQUEST:
-		if (list_empty(&call->tx_sendmsg))
+		if (call->tx_bottom == READ_ONCE(call->send_top))
 			return;
 		rxrpc_begin_service_reply(call);
 		fallthrough;
@@ -297,11 +351,11 @@ static void rxrpc_transmit_some_data(struct rxrpc_call *call)
 	case RXRPC_CALL_CLIENT_SEND_REQUEST:
 		if (!rxrpc_tx_window_space(call))
 			return;
-		if (list_empty(&call->tx_sendmsg)) {
+		if (call->tx_bottom == READ_ONCE(call->send_top)) {
 			rxrpc_inc_stat(call->rxnet, stat_tx_data_underflow);
 			return;
 		}
-		rxrpc_decant_prepared_tx(call);
+		rxrpc_transmit_fresh_data(call);
 		break;
 	default:
 		return;
@@ -503,8 +557,6 @@ bool rxrpc_input_call_event(struct rxrpc_call *call)
 		    call->peer->pmtud_pending)
 			rxrpc_send_probe_for_pmtud(call);
 	}
-	if (call->acks_hard_ack != call->tx_bottom)
-		rxrpc_shrink_call_tx_buffer(call);
 	_leave("");
 	return true;
 
diff --git a/net/rxrpc/call_object.c b/net/rxrpc/call_object.c
index c026f16f891e..a9682b31a4f9 100644
--- a/net/rxrpc/call_object.c
+++ b/net/rxrpc/call_object.c
@@ -146,8 +146,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
 	INIT_LIST_HEAD(&call->recvmsg_link);
 	INIT_LIST_HEAD(&call->sock_link);
 	INIT_LIST_HEAD(&call->attend_link);
-	INIT_LIST_HEAD(&call->tx_sendmsg);
-	INIT_LIST_HEAD(&call->tx_buffer);
 	skb_queue_head_init(&call->rx_queue);
 	skb_queue_head_init(&call->recvmsg_queue);
 	skb_queue_head_init(&call->rx_oos_queue);
@@ -532,9 +530,26 @@ void rxrpc_get_call(struct rxrpc_call *call, enum rxrpc_call_trace why)
 }
 
 /*
- * Clean up the Rx skb ring.
+ * Clean up the transmission buffers.
  */
-static void rxrpc_cleanup_ring(struct rxrpc_call *call)
+static void rxrpc_cleanup_tx_buffers(struct rxrpc_call *call)
+{
+	struct rxrpc_txqueue *tq, *next;
+
+	for (tq = call->tx_queue; tq; tq = next) {
+		next = tq->next;
+		for (int i = 0; i < RXRPC_NR_TXQUEUE; i++)
+			if (tq->bufs[i])
+				rxrpc_put_txbuf(tq->bufs[i], rxrpc_txbuf_put_cleaned);
+		trace_rxrpc_tq(call, tq, 0, rxrpc_tq_cleaned);
+		kfree(tq);
+	}
+}
+
+/*
+ * Clean up the receive buffers.
+ */
+static void rxrpc_cleanup_rx_buffers(struct rxrpc_call *call)
 {
 	rxrpc_purge_queue(&call->recvmsg_queue);
 	rxrpc_purge_queue(&call->rx_queue);
@@ -673,23 +688,12 @@ static void rxrpc_rcu_free_call(struct rcu_head *rcu)
 static void rxrpc_destroy_call(struct work_struct *work)
 {
 	struct rxrpc_call *call = container_of(work, struct rxrpc_call, destroyer);
-	struct rxrpc_txbuf *txb;
 
 	del_timer_sync(&call->timer);
 
 	rxrpc_free_skb(call->cong_last_nack, rxrpc_skb_put_last_nack);
-	rxrpc_cleanup_ring(call);
-	while ((txb = list_first_entry_or_null(&call->tx_sendmsg,
-					       struct rxrpc_txbuf, call_link))) {
-		list_del(&txb->call_link);
-		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
-	}
-	while ((txb = list_first_entry_or_null(&call->tx_buffer,
-					       struct rxrpc_txbuf, call_link))) {
-		list_del(&txb->call_link);
-		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_cleaned);
-	}
-
+	rxrpc_cleanup_tx_buffers(call);
+	rxrpc_cleanup_rx_buffers(call);
 	rxrpc_put_txbuf(call->tx_pending, rxrpc_txbuf_put_cleaned);
 	rxrpc_put_connection(call->conn, rxrpc_conn_put_call);
 	rxrpc_deactivate_bundle(call->bundle);
diff --git a/net/rxrpc/input.c b/net/rxrpc/input.c
index 96fe005c5e81..03fd11a9bd31 100644
--- a/net/rxrpc/input.c
+++ b/net/rxrpc/input.c
@@ -214,24 +214,71 @@ void rxrpc_congestion_degrade(struct rxrpc_call *call)
 static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
 				   struct rxrpc_ack_summary *summary)
 {
-	struct rxrpc_txbuf *txb;
+	struct rxrpc_txqueue *tq = call->tx_queue;
+	rxrpc_seq_t seq = call->tx_bottom + 1;
 	bool rot_last = false;
 
-	list_for_each_entry_rcu(txb, &call->tx_buffer, call_link, false) {
-		if (before_eq(txb->seq, call->acks_hard_ack))
-			continue;
-		if (txb->flags & RXRPC_LAST_PACKET) {
+	_enter("%x,%x,%x", call->tx_bottom, call->acks_hard_ack, to);
+
+	trace_rxrpc_tx_rotate(call, seq, to);
+	trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate);
+
+	/* We may have a left over fully-consumed buffer at the front that we
+	 * couldn't drop before (rotate_and_keep below).
+	 */
+	if (seq == call->tx_qbase + RXRPC_NR_TXQUEUE) {
+		call->tx_qbase += RXRPC_NR_TXQUEUE;
+		call->tx_queue = tq->next;
+		trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
+		kfree(tq);
+		tq = call->tx_queue;
+	}
+
+	do {
+		unsigned int ix = seq - call->tx_qbase;
+
+		_debug("tq=%x seq=%x i=%d f=%x", tq->qbase, seq, ix, tq->bufs[ix]->flags);
+		if (tq->bufs[ix]->flags & RXRPC_LAST_PACKET) {
 			set_bit(RXRPC_CALL_TX_LAST, &call->flags);
 			rot_last = true;
 		}
-		if (txb->seq == to)
-			break;
-	}
+		rxrpc_put_txbuf(tq->bufs[ix], rxrpc_txbuf_put_rotated);
+		tq->bufs[ix] = NULL;
+
+		smp_store_release(&call->tx_bottom, seq);
+		smp_store_release(&call->acks_hard_ack, seq);
+		trace_rxrpc_txqueue(call, (rot_last ?
+					   rxrpc_txqueue_rotate_last :
+					   rxrpc_txqueue_rotate));
 
-	if (rot_last)
+		seq++;
+		if (!(seq & RXRPC_TXQ_MASK)) {
+			prefetch(tq->next);
+			if (tq != call->tx_qtail) {
+				call->tx_qbase += RXRPC_NR_TXQUEUE;
+				call->tx_queue = tq->next;
+				trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
+				kfree(tq);
+				tq = call->tx_queue;
+			} else {
+				trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_keep);
+				tq = NULL;
+				break;
+			}
+		}
+
+	} while (before_eq(seq, to));
+
+	if (rot_last) {
 		set_bit(RXRPC_CALL_TX_ALL_ACKED, &call->flags);
+		if (tq) {
+			trace_rxrpc_tq(call, tq, seq, rxrpc_tq_rotate_and_free);
+			kfree(tq);
+			call->tx_queue = NULL;
+		}
+	}
 
-	_enter("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last);
+	_debug("%x,%x,%x,%d", to, call->acks_hard_ack, call->tx_top, rot_last);
 
 	if (call->acks_lowest_nak == call->acks_hard_ack) {
 		call->acks_lowest_nak = to;
@@ -240,11 +287,6 @@ static bool rxrpc_rotate_tx_window(struct rxrpc_call *call, rxrpc_seq_t to,
 		call->acks_lowest_nak = to;
 	}
 
-	smp_store_release(&call->acks_hard_ack, to);
-
-	trace_rxrpc_txqueue(call, (rot_last ?
-				   rxrpc_txqueue_rotate_last :
-				   rxrpc_txqueue_rotate));
 	wake_up(&call->waitq);
 	return rot_last;
 }
diff --git a/net/rxrpc/output.c b/net/rxrpc/output.c
index 7b068a33eb21..f785ea0ade43 100644
--- a/net/rxrpc/output.c
+++ b/net/rxrpc/output.c
@@ -374,10 +374,10 @@ int rxrpc_send_abort_packet(struct rxrpc_call *call)
 /*
  * Prepare a (sub)packet for transmission.
  */
-static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc_txbuf *txb,
-					   rxrpc_serial_t serial,
-					   int subpkt, int nr_subpkts,
-					   ktime_t now)
+static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call,
+					   struct rxrpc_send_data_req *req,
+					   struct rxrpc_txbuf *txb,
+					   rxrpc_serial_t serial, int subpkt)
 {
 	struct rxrpc_wire_header *whdr = txb->kvec[0].iov_base;
 	struct rxrpc_jumbo_header *jumbo = (void *)(whdr + 1) - sizeof(*jumbo);
@@ -385,7 +385,7 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
 	struct rxrpc_connection *conn = call->conn;
 	struct kvec *kv = &call->local->kvec[subpkt];
 	size_t len = txb->pkt_len;
-	bool last, more;
+	bool last;
 	u8 flags;
 
 	_enter("%x,%zd", txb->seq, len);
@@ -400,14 +400,11 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
 	flags = txb->flags & RXRPC_TXBUF_WIRE_FLAGS;
 	last = txb->flags & RXRPC_LAST_PACKET;
 
-	if (subpkt < nr_subpkts - 1) {
+	if (subpkt < req->n - 1) {
 		len = RXRPC_JUMBO_DATALEN;
 		goto dont_set_request_ack;
 	}
 
-	more = (!list_is_last(&txb->call_link, &call->tx_buffer) ||
-		!list_empty(&call->tx_sendmsg));
-
 	/* If our RTT cache needs working on, request an ACK.  Also request
 	 * ACKs if a DATA packet appears to have been lost.
 	 *
@@ -429,7 +426,7 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
 		why = rxrpc_reqack_more_rtt;
 	else if (ktime_before(ktime_add_ms(call->peer->rtt_last_req, 1000), ktime_get_real()))
 		why = rxrpc_reqack_old_rtt;
-	else if (!last && !more)
+	else if (!last && !after(READ_ONCE(call->send_top), txb->seq))
 		why = rxrpc_reqack_app_stall;
 	else
 		goto dont_set_request_ack;
@@ -438,13 +435,13 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
 	trace_rxrpc_req_ack(call->debug_id, txb->seq, why);
 	if (why != rxrpc_reqack_no_srv_last) {
 		flags |= RXRPC_REQUEST_ACK;
-		rxrpc_begin_rtt_probe(call, serial, now, rxrpc_rtt_tx_data);
-		call->peer->rtt_last_req = now;
+		rxrpc_begin_rtt_probe(call, serial, req->now, rxrpc_rtt_tx_data);
+		call->peer->rtt_last_req = req->now;
 	}
 dont_set_request_ack:
 
 	/* The jumbo header overlays the wire header in the txbuf. */
-	if (subpkt < nr_subpkts - 1)
+	if (subpkt < req->n - 1)
 		flags |= RXRPC_JUMBO_PACKET;
 	else
 		flags &= ~RXRPC_JUMBO_PACKET;
@@ -468,62 +465,100 @@ static size_t rxrpc_prepare_data_subpacket(struct rxrpc_call *call, struct rxrpc
 	return len;
 }
 
+/*
+ * Prepare a transmission queue object for initial transmission.  Returns the
+ * number of microseconds since the transmission queue base timestamp.
+ */
+static unsigned int rxrpc_prepare_txqueue(struct rxrpc_txqueue *tq,
+					  struct rxrpc_send_data_req *req)
+{
+	if (!tq)
+		return 0;
+	if (tq->xmit_ts_base == KTIME_MIN) {
+		tq->xmit_ts_base = req->now;
+		return 0;
+	}
+	return ktime_to_us(ktime_sub(req->now, tq->xmit_ts_base));
+}
+
 /*
  * Prepare a (jumbo) packet for transmission.
  */
-static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *head, int n)
+static size_t rxrpc_prepare_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
 {
-	struct rxrpc_txbuf *txb = head;
+	struct rxrpc_txqueue *tq = req->tq;
 	rxrpc_serial_t serial;
-	ktime_t now = ktime_get_real();
+	unsigned int xmit_ts;
+	rxrpc_seq_t seq = req->seq;
 	size_t len = 0;
 
+	trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit);
+
 	/* Each transmission of a Tx packet needs a new serial number */
-	serial = rxrpc_get_next_serials(call->conn, n);
+	serial = rxrpc_get_next_serials(call->conn, req->n);
 
-	for (int i = 0; i < n; i++) {
-		txb->last_sent = now;
-		len += rxrpc_prepare_data_subpacket(call, txb, serial, i, n, now);
-		serial++;
-		txb = list_next_entry(txb, call_link);
-	}
+	call->tx_last_sent = req->now;
+	xmit_ts = rxrpc_prepare_txqueue(tq, req);
+	prefetch(tq->next);
 
-	/* Set timeouts */
-	if (call->peer->rtt_count > 1) {
-		ktime_t delay = rxrpc_get_rto_backoff(call->peer, false);
+	for (int i = 0;;) {
+		int ix = seq & RXRPC_TXQ_MASK;
+		struct rxrpc_txbuf *txb = tq->bufs[seq & RXRPC_TXQ_MASK];
 
-		call->ack_lost_at = ktime_add(now, delay);
-		trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_lost_ack);
+		_debug("prep[%u] tq=%x q=%x", i, tq->qbase, seq);
+		tq->segment_xmit_ts[ix] = xmit_ts;
+		len += rxrpc_prepare_data_subpacket(call, req, txb, serial, i);
+		serial++;
+		seq++;
+		i++;
+		if (i >= req->n)
+			break;
+		if (!(seq & RXRPC_TXQ_MASK)) {
+			tq = tq->next;
+			trace_rxrpc_tq(call, tq, seq, rxrpc_tq_transmit_advance);
+			xmit_ts = rxrpc_prepare_txqueue(tq, req);
+		}
 	}
 
+	/* Set timeouts */
 	if (!test_and_set_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags)) {
 		ktime_t delay = ms_to_ktime(READ_ONCE(call->next_rx_timo));
 
-		call->expect_rx_by = ktime_add(now, delay);
+		call->expect_rx_by = ktime_add(req->now, delay);
 		trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_expect_rx);
 	}
+	if (call->resend_at == KTIME_MAX) {
+		ktime_t delay = rxrpc_get_rto_backoff(call->peer, false);
 
-	rxrpc_set_keepalive(call, now);
+		call->resend_at = ktime_add(req->now, delay);
+		trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_resend);
+	}
+
+	rxrpc_set_keepalive(call, req->now);
 	return len;
 }
 
 /*
- * send a packet through the transport endpoint
+ * Send one or more packets through the transport endpoint
  */
-static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n)
+void rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_send_data_req *req)
 {
 	struct rxrpc_connection *conn = call->conn;
 	enum rxrpc_tx_point frag;
+	struct rxrpc_txqueue *tq = req->tq;
+	struct rxrpc_txbuf *txb;
 	struct msghdr msg;
+	rxrpc_seq_t seq = req->seq;
 	size_t len;
 	bool new_call = test_bit(RXRPC_CALL_BEGAN_RX_TIMER, &call->flags);
 	int ret;
 
-	_enter("%x,{%d}", txb->seq, txb->pkt_len);
+	_enter("%x,%x-%x", tq->qbase, seq, seq + req->n - 1);
 
-	len = rxrpc_prepare_data_packet(call, txb, n);
+	len = rxrpc_prepare_data_packet(call, req);
+	txb = tq->bufs[seq & RXRPC_TXQ_MASK];
 
-	iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, n, len);
+	iov_iter_kvec(&msg.msg_iter, WRITE, call->local->kvec, req->n, len);
 
 	msg.msg_name	= &call->peer->srx.transport;
 	msg.msg_namelen	= call->peer->srx.transport_len;
@@ -534,7 +569,7 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
 	/* Send the packet with the don't fragment bit set unless we think it's
 	 * too big or if this is a retransmission.
 	 */
-	if (txb->seq == call->tx_transmitted + 1 &&
+	if (seq == call->tx_transmitted + 1 &&
 	    len >= sizeof(struct rxrpc_wire_header) + call->peer->max_data) {
 		rxrpc_local_dont_fragment(conn->local, false);
 		frag = rxrpc_tx_point_call_data_frag;
@@ -547,8 +582,8 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
 	 * retransmission algorithm doesn't try to resend what we haven't sent
 	 * yet.
 	 */
-	if (txb->seq == call->tx_transmitted + 1)
-		call->tx_transmitted = txb->seq + n - 1;
+	if (seq == call->tx_transmitted + 1)
+		call->tx_transmitted = seq + req->n - 1;
 
 	if (IS_ENABLED(CONFIG_AF_RXRPC_INJECT_LOSS)) {
 		static int lose;
@@ -584,19 +619,21 @@ static int rxrpc_send_data_packet(struct rxrpc_call *call, struct rxrpc_txbuf *t
 	rxrpc_tx_backoff(call, ret);
 
 	if (ret < 0) {
-		/* Cancel the call if the initial transmission fails,
-		 * particularly if that's due to network routing issues that
-		 * aren't going away anytime soon.  The layer above can arrange
-		 * the retransmission.
+		/* Cancel the call if the initial transmission fails or if we
+		 * hit due to network routing issues that aren't going away
+		 * anytime soon.  The layer above can arrange the
+		 * retransmission.
 		 */
-		if (new_call)
+		if (new_call ||
+		    ret == -ENETUNREACH ||
+		    ret == -EHOSTUNREACH ||
+		    ret == -ECONNREFUSED)
 			rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
 						  RX_USER_ABORT, ret);
 	}
 
 done:
 	_leave(" = %d [%u]", ret, call->peer->max_data);
-	return ret;
 }
 
 /*
@@ -775,37 +812,8 @@ void rxrpc_send_keepalive(struct rxrpc_peer *peer)
 /*
  * Schedule an instant Tx resend.
  */
-static inline void rxrpc_instant_resend(struct rxrpc_call *call,
-					struct rxrpc_txbuf *txb)
+static inline void rxrpc_instant_resend(struct rxrpc_call *call)
 {
 	if (!__rxrpc_call_is_complete(call))
 		kdebug("resend");
 }
-
-/*
- * Transmit a packet, possibly gluing several subpackets together.
- */
-void rxrpc_transmit_data(struct rxrpc_call *call, struct rxrpc_txbuf *txb, int n)
-{
-	int ret;
-
-	ret = rxrpc_send_data_packet(call, txb, n);
-	if (ret < 0) {
-		switch (ret) {
-		case -ENETUNREACH:
-		case -EHOSTUNREACH:
-		case -ECONNREFUSED:
-			rxrpc_set_call_completion(call, RXRPC_CALL_LOCAL_ERROR,
-						  0, ret);
-			break;
-		default:
-			_debug("need instant resend %d", ret);
-			rxrpc_instant_resend(call, txb);
-		}
-	} else {
-		ktime_t delay = ns_to_ktime(call->peer->rto_us * NSEC_PER_USEC);
-
-		call->resend_at = ktime_add(ktime_get_real(), delay);
-		trace_rxrpc_timer_set(call, delay, rxrpc_timer_trace_resend_tx);
-	}
-}
diff --git a/net/rxrpc/sendmsg.c b/net/rxrpc/sendmsg.c
index 786c1fb1369a..cb9b38fc5cf8 100644
--- a/net/rxrpc/sendmsg.c
+++ b/net/rxrpc/sendmsg.c
@@ -96,7 +96,7 @@ static bool rxrpc_check_tx_space(struct rxrpc_call *call, rxrpc_seq_t *_tx_win)
 {
 	if (_tx_win)
 		*_tx_win = call->tx_bottom;
-	return call->tx_prepared - call->tx_bottom < 256;
+	return call->send_top - call->tx_bottom < 256;
 }
 
 /*
@@ -240,36 +240,74 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
 			       struct rxrpc_txbuf *txb,
 			       rxrpc_notify_end_tx_t notify_end_tx)
 {
+	struct rxrpc_txqueue *sq = call->send_queue;
 	rxrpc_seq_t seq = txb->seq;
 	bool poke, last = txb->flags & RXRPC_LAST_PACKET;
-
+	int ix = seq & RXRPC_TXQ_MASK;
 	rxrpc_inc_stat(call->rxnet, stat_tx_data);
 
-	ASSERTCMP(txb->seq, ==, call->tx_prepared + 1);
-
-	/* We have to set the timestamp before queueing as the retransmit
-	 * algorithm can see the packet as soon as we queue it.
-	 */
-	txb->last_sent = ktime_get_real();
+	ASSERTCMP(txb->seq, ==, call->send_top + 1);
 
 	if (last)
 		trace_rxrpc_txqueue(call, rxrpc_txqueue_queue_last);
 	else
 		trace_rxrpc_txqueue(call, rxrpc_txqueue_queue);
 
+	if (WARN_ON_ONCE(sq->bufs[ix]))
+		trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue_dup);
+	else
+		trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue);
+
 	/* Add the packet to the call's output buffer */
 	spin_lock(&call->tx_lock);
-	poke = list_empty(&call->tx_sendmsg);
-	list_add_tail(&txb->call_link, &call->tx_sendmsg);
-	call->tx_prepared = seq;
-	if (last)
+	poke = (call->tx_bottom == call->send_top);
+	sq->bufs[ix] = txb;
+	/* Order send_top after the queue->next pointer and txb content. */
+	smp_store_release(&call->send_top, seq);
+	if (last) {
 		rxrpc_notify_end_tx(rx, call, notify_end_tx);
+		call->send_queue = NULL;
+	}
 	spin_unlock(&call->tx_lock);
 
 	if (poke)
 		rxrpc_poke_call(call, rxrpc_call_poke_start);
 }
 
+/*
+ * Allocate a new txqueue unit and add it to the transmission queue.
+ */
+static int rxrpc_alloc_txqueue(struct sock *sk, struct rxrpc_call *call)
+{
+	struct rxrpc_txqueue *tq;
+
+	tq = kzalloc(sizeof(*tq), sk->sk_allocation);
+	if (!tq)
+		return -ENOMEM;
+
+	tq->xmit_ts_base = KTIME_MIN;
+	for (int i = 0; i < RXRPC_NR_TXQUEUE; i++)
+		tq->segment_xmit_ts[i] = UINT_MAX;
+
+	if (call->send_queue) {
+		tq->qbase = call->send_top + 1;
+		call->send_queue->next = tq;
+		call->send_queue = tq;
+	} else if (WARN_ON(call->tx_queue)) {
+		kfree(tq);
+		return -ENOMEM;
+	} else {
+		tq->qbase = 0;
+		call->tx_qbase = 0;
+		call->send_queue = tq;
+		call->tx_qtail = tq;
+		call->tx_queue = tq;
+	}
+
+	trace_rxrpc_tq(call, tq, call->send_top, rxrpc_tq_alloc);
+	return 0;
+}
+
 /*
  * send data through a socket
  * - must be called in process context
@@ -344,6 +382,13 @@ static int rxrpc_send_data(struct rxrpc_sock *rx,
 			if (!rxrpc_check_tx_space(call, NULL))
 				goto wait_for_space;
 
+			/* See if we need to begin/extend the Tx queue. */
+			if (!call->send_queue || !((call->send_top + 1) & RXRPC_TXQ_MASK)) {
+				ret = rxrpc_alloc_txqueue(sk, call);
+				if (ret < 0)
+					goto maybe_error;
+			}
+
 			/* Work out the maximum size of a packet.  Assume that
 			 * the security header is going to be in the padded
 			 * region (enc blocksize), but the trailer is not.
diff --git a/net/rxrpc/txbuf.c b/net/rxrpc/txbuf.c
index 8b7c854ed3d7..067223c8c35f 100644
--- a/net/rxrpc/txbuf.c
+++ b/net/rxrpc/txbuf.c
@@ -43,17 +43,14 @@ struct rxrpc_txbuf *rxrpc_alloc_data_txbuf(struct rxrpc_call *call, size_t data_
 
 	whdr = buf + hoff;
 
-	INIT_LIST_HEAD(&txb->call_link);
-	INIT_LIST_HEAD(&txb->tx_link);
 	refcount_set(&txb->ref, 1);
-	txb->last_sent		= KTIME_MIN;
 	txb->call_debug_id	= call->debug_id;
 	txb->debug_id		= atomic_inc_return(&rxrpc_txbuf_debug_ids);
 	txb->alloc_size		= data_size;
 	txb->space		= data_size;
 	txb->offset		= sizeof(*whdr);
 	txb->flags		= call->conn->out_clientflag;
-	txb->seq		= call->tx_prepared + 1;
+	txb->seq		= call->send_top + 1;
 	txb->nr_kvec		= 1;
 	txb->kvec[0].iov_base	= whdr;
 	txb->kvec[0].iov_len	= sizeof(*whdr);
@@ -114,8 +111,6 @@ struct rxrpc_txbuf *rxrpc_alloc_ack_txbuf(struct rxrpc_call *call, size_t sack_s
 	filler	= buf + sizeof(*whdr) + sizeof(*ack) + 1;
 	trailer	= buf + sizeof(*whdr) + sizeof(*ack) + 1 + 3;
 
-	INIT_LIST_HEAD(&txb->call_link);
-	INIT_LIST_HEAD(&txb->tx_link);
 	refcount_set(&txb->ref, 1);
 	txb->call_debug_id	= call->debug_id;
 	txb->debug_id		= atomic_inc_return(&rxrpc_txbuf_debug_ids);
@@ -200,37 +195,3 @@ void rxrpc_put_txbuf(struct rxrpc_txbuf *txb, enum rxrpc_txbuf_trace what)
 			rxrpc_free_txbuf(txb);
 	}
 }
-
-/*
- * Shrink the transmit buffer.
- */
-void rxrpc_shrink_call_tx_buffer(struct rxrpc_call *call)
-{
-	struct rxrpc_txbuf *txb;
-	rxrpc_seq_t hard_ack = smp_load_acquire(&call->acks_hard_ack);
-	bool wake = false;
-
-	_enter("%x/%x/%x", call->tx_bottom, call->acks_hard_ack, call->tx_top);
-
-	while ((txb = list_first_entry_or_null(&call->tx_buffer,
-					       struct rxrpc_txbuf, call_link))) {
-		hard_ack = smp_load_acquire(&call->acks_hard_ack);
-		if (before(hard_ack, txb->seq))
-			break;
-
-		if (txb->seq != call->tx_bottom + 1)
-			rxrpc_see_txbuf(txb, rxrpc_txbuf_see_out_of_step);
-		ASSERTCMP(txb->seq, ==, call->tx_bottom + 1);
-		smp_store_release(&call->tx_bottom, call->tx_bottom + 1);
-		list_del_rcu(&txb->call_link);
-
-		trace_rxrpc_txqueue(call, rxrpc_txqueue_dequeue);
-
-		rxrpc_put_txbuf(txb, rxrpc_txbuf_put_rotated);
-		if (after(call->acks_hard_ack, call->tx_bottom + 128))
-			wake = true;
-	}
-
-	if (wake)
-		wake_up(&call->waitq);
-}


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ