[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <20251031093230.82386-3-kerneljasonxing@gmail.com>
Date: Fri, 31 Oct 2025 17:32:30 +0800
From: Jason Xing <kerneljasonxing@...il.com>
To: davem@...emloft.net,
	edumazet@...gle.com,
	kuba@...nel.org,
	pabeni@...hat.com,
	bjorn@...nel.org,
	magnus.karlsson@...el.com,
	maciej.fijalkowski@...el.com,
	jonathan.lemon@...il.com,
	sdf@...ichev.me,
	ast@...nel.org,
	daniel@...earbox.net,
	hawk@...nel.org,
	john.fastabend@...il.com,
	joe@...a.to,
	willemdebruijn.kernel@...il.com,
	fmancera@...e.de,
	csmate@....hu
Cc: bpf@...r.kernel.org,
	netdev@...r.kernel.org,
	Jason Xing <kernelxing@...cent.com>
Subject: [PATCH RFC net-next 2/2] xsk: introduce a cached cq to temporarily store descriptor addrs
From: Jason Xing <kernelxing@...cent.com>
Before the commit 30f241fcf52a ("xsk: Fix immature cq descriptor
production"), there is one issue[1] which causes the wrong publish
of descriptors in race condidtion. The above commit fixes the issue
but adds more memory operations in the xmit hot path and interrupt
context, which can cause side effect in performance.
This patch tries to propose a new solution to fix the problem
without manipulating the allocation and deallocation of memory. One
of the key points is that I borrowed the idea from the above commit
that postpones updating the ring->descs in xsk_destruct_skb()
instead of in __xsk_generic_xmit().
The core logics are as show below:
1. allocate a new local queue. Only its cached_prod member is used.
2. write the descriptors into the local queue in the xmit path. And
   record the cached_prod as @start_addr that reflects the
   start position of this queue so that later the skb can easily
   find where its addrs are written in the destruction phase.
3. initialize the upper 24 bits of destructor_arg to store @start_addr
   in xsk_skb_init_misc().
4. Initialize the lower 8 bits of destructor_arg to store how many
   descriptors the skb owns in xsk_update_num_desc().
5. write the desc addr(s) from the @start_addr from the cached cq
   one by one into the real cq in xsk_destruct_skb(). In turn sync
   the global state of the cq.
The format of destructor_arg is designed as:
 ------------------------ --------
|       start_addr       |  num   |
 ------------------------ --------
Using upper 24 bits is enough to keep the temporary descriptors. And
it's also enough to use lower 8 bits to show the number of descriptors
that one skb owns.
[1]: https://lore.kernel.org/all/20250530095957.43248-1-e.kubanski@partner.samsung.com/
Signed-off-by: Jason Xing <kernelxing@...cent.com>
---
I posted the series as an RFC because I'd like to hear more opinions on
the current rought approach so that the fix[2] can be avoided and
mitigate the impact of performance. This patch might have bugs because
I decided to spend more time on it after we come to an agreement. Please
review the overall concepts. Thanks!
Maciej, could you share with me the way you tested jumbo frame? I used
./xdpsock -i enp2s0f1 -t -q 1 -S -s 9728 but the xdpsock utilizes the
nic more than 90%, which means I cannot see the performance impact.
[2]:https://lore.kernel.org/all/20251030140355.4059-1-fmancera@suse.de/
---
 include/net/xdp_sock.h      |   1 +
 include/net/xsk_buff_pool.h |   1 +
 net/xdp/xsk.c               | 104 ++++++++++++++++++++++++++++--------
 net/xdp/xsk_buff_pool.c     |   1 +
 4 files changed, 84 insertions(+), 23 deletions(-)
diff --git a/include/net/xdp_sock.h b/include/net/xdp_sock.h
index ce587a225661..0d90d97e0a62 100644
--- a/include/net/xdp_sock.h
+++ b/include/net/xdp_sock.h
@@ -89,6 +89,7 @@ struct xdp_sock {
 	struct mutex mutex;
 	struct xsk_queue *fq_tmp; /* Only as tmp storage before bind */
 	struct xsk_queue *cq_tmp; /* Only as tmp storage before bind */
+	struct xsk_queue *cached_cq;
 };
 
 /*
diff --git a/include/net/xsk_buff_pool.h b/include/net/xsk_buff_pool.h
index cac56e6b0869..b52491f93c7d 100644
--- a/include/net/xsk_buff_pool.h
+++ b/include/net/xsk_buff_pool.h
@@ -63,6 +63,7 @@ struct xsk_buff_pool {
 	/* Data path members as close to free_heads at the end as possible. */
 	struct xsk_queue *fq ____cacheline_aligned_in_smp;
 	struct xsk_queue *cq;
+	struct xsk_queue *cached_cq;
 	/* For performance reasons, each buff pool has its own array of dma_pages
 	 * even when they are identical.
 	 */
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 05010c1bbfbd..3951c2bc9d97 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -532,25 +532,33 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
 	return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
 }
 
-static int xsk_cq_reserve_addr_locked(struct xsk_buff_pool *pool, u64 addr)
+static int xsk_cq_reserve_addr_locked(struct xsk_buff_pool *pool, u64 addr,
+				      u32 *start_addr)
 {
+	struct xdp_umem_ring *ring;
 	unsigned long flags;
 	int ret;
 
+	ring = (struct xdp_umem_ring *)pool->cached_cq->ring;
 	spin_lock_irqsave(&pool->cq_lock, flags);
-	ret = xskq_prod_reserve_addr(pool->cq, addr);
+	ret = xskq_prod_reserve(pool->cq);
+	if (!ret) {
+		*start_addr = pool->cached_cq->cached_prod++ & pool->cq->ring_mask;
+		ring->desc[*start_addr] = addr;
+	}
 	spin_unlock_irqrestore(&pool->cq_lock, flags);
 
 	return ret;
 }
 
-static void xsk_cq_submit_locked(struct xsk_buff_pool *pool, u32 n)
+static void xsk_cq_write_addr(struct xsk_buff_pool *pool, u32 addr)
 {
-	unsigned long flags;
+	struct xsk_queue *cq = pool->cq;
+	struct xsk_queue *ccq = pool->cached_cq;
+	struct xdp_umem_ring * ccqr = (struct xdp_umem_ring *)ccq->ring;
+	struct xdp_umem_ring * cqr = (struct xdp_umem_ring *)cq->ring;
 
-	spin_lock_irqsave(&pool->cq_lock, flags);
-	xskq_prod_submit_n(pool->cq, n);
-	spin_unlock_irqrestore(&pool->cq_lock, flags);
+	cqr->desc[cq->cached_prod++ & cq->ring_mask] = ccqr->desc[addr++];
 }
 
 static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
@@ -562,9 +570,41 @@ static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
 	spin_unlock_irqrestore(&pool->cq_lock, flags);
 }
 
-static u32 xsk_get_num_desc(struct sk_buff *skb)
+#define XSK_DESTRUCTOR_DESCS_SHIFT 8
+#define XSK_DESTRUCTOR_DESCS_MASK \
+	((1ULL << XSK_DESTRUCTOR_DESCS_SHIFT) - 1)
+
+static u8 xsk_get_num_desc(struct sk_buff *skb)
+{
+	long val = (long)skb_shinfo(skb)->destructor_arg;
+
+	return (u8)val & XSK_DESTRUCTOR_DESCS_MASK;
+}
+
+static u32 xsk_get_start_addr(struct sk_buff *skb)
 {
-	return skb ? (long)skb_shinfo(skb)->destructor_arg : 0;
+	long val = (long)skb_shinfo(skb)->destructor_arg;
+
+	return val >> XSK_DESTRUCTOR_DESCS_SHIFT;
+}
+
+static long xsk_get_destructor_arg(struct sk_buff *skb)
+{
+	return (long)skb_shinfo(skb)->destructor_arg;
+}
+
+static void xsk_cq_submit_locked(struct sk_buff *skb)
+{
+	struct xsk_buff_pool *pool = xdp_sk(skb->sk)->pool;
+	u32 addr = xsk_get_start_addr(skb);
+	u8 num = xsk_get_num_desc(skb), i;
+	unsigned long flags;
+
+	spin_lock_irqsave(&pool->cq_lock, flags);
+	for (i = 0; i < num; i++)
+		xsk_cq_write_addr(pool, addr);
+	xskq_prod_submit_n(pool->cq, num);
+	spin_unlock_irqrestore(&pool->cq_lock, flags);
 }
 
 static void xsk_destruct_skb(struct sk_buff *skb)
@@ -576,23 +616,30 @@ static void xsk_destruct_skb(struct sk_buff *skb)
 		*compl->tx_timestamp = ktime_get_tai_fast_ns();
 	}
 
-	xsk_cq_submit_locked(xdp_sk(skb->sk)->pool, xsk_get_num_desc(skb));
+	xsk_cq_submit_locked(skb);
 	sock_wfree(skb);
 }
 
-static void xsk_set_destructor_arg(struct sk_buff *skb)
+static void xsk_update_num_desc(struct sk_buff *skb)
 {
-	long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1;
+	long val = xsk_get_destructor_arg(skb);
+	u8 num = xsk_get_num_desc(skb) + 1;
 
-	skb_shinfo(skb)->destructor_arg = (void *)num;
+	val = (val & ~(XSK_DESTRUCTOR_DESCS_MASK)) | num;
+	skb_shinfo(skb)->destructor_arg = (void *)val;
 }
 
-static void xsk_skb_init_misc(struct sk_buff *skb, struct xdp_sock *xs)
+static void xsk_skb_init_misc(struct sk_buff *skb, struct xdp_sock *xs,
+			      u32 start_addr)
 {
+	long val;
+
 	skb->dev = xs->dev;
 	skb->priority = READ_ONCE(xs->sk.sk_priority);
 	skb->mark = READ_ONCE(xs->sk.sk_mark);
 	skb->destructor = xsk_destruct_skb;
+	val = start_addr << XSK_DESTRUCTOR_DESCS_SHIFT;
+	skb_shinfo(skb)->destructor_arg = (void *)val;
 }
 
 static void xsk_consume_skb(struct sk_buff *skb)
@@ -652,7 +699,8 @@ static int xsk_skb_metadata(struct sk_buff *skb, void *buffer,
 }
 
 static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
-					      struct xdp_desc *desc)
+					      struct xdp_desc *desc,
+					      u32 start_addr)
 {
 	struct xsk_buff_pool *pool = xs->pool;
 	u32 hr, len, ts, offset, copy, copied;
@@ -674,7 +722,7 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
 
 		skb_reserve(skb, hr);
 
-		xsk_skb_init_misc(skb, xs);
+		xsk_skb_init_misc(skb, xs, start_addr);
 		if (desc->options & XDP_TX_METADATA) {
 			err = xsk_skb_metadata(skb, buffer, desc, pool, hr);
 			if (unlikely(err))
@@ -713,14 +761,15 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
 }
 
 static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
-				     struct xdp_desc *desc)
+				     struct xdp_desc *desc,
+				     u32 start_addr)
 {
 	struct net_device *dev = xs->dev;
 	struct sk_buff *skb = xs->skb;
 	int err;
 
 	if (dev->priv_flags & IFF_TX_SKB_NO_LINEAR) {
-		skb = xsk_build_skb_zerocopy(xs, desc);
+		skb = xsk_build_skb_zerocopy(xs, desc, start_addr);
 		if (IS_ERR(skb)) {
 			err = PTR_ERR(skb);
 			skb = NULL;
@@ -747,7 +796,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
 			if (unlikely(err))
 				goto free_err;
 
-			xsk_skb_init_misc(skb, xs);
+			xsk_skb_init_misc(skb, xs, start_addr);
 			if (desc->options & XDP_TX_METADATA) {
 				err = xsk_skb_metadata(skb, buffer, desc,
 						       xs->pool, hr);
@@ -779,7 +828,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
 		}
 	}
 
-	xsk_set_destructor_arg(skb);
+	xsk_update_num_desc(skb);
 
 	return skb;
 
@@ -789,7 +838,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
 
 	if (err == -EOVERFLOW) {
 		/* Drop the packet */
-		xsk_set_destructor_arg(xs->skb);
+		xsk_update_num_desc(xs->skb);
 		xsk_drop_skb(xs->skb);
 		xskq_cons_release(xs->tx);
 	} else {
@@ -822,6 +871,8 @@ static int __xsk_generic_xmit(struct sock *sk)
 
 	max_batch = READ_ONCE(xs->max_tx_budget);
 	while (xskq_cons_peek_desc(xs->tx, &desc, xs->pool)) {
+		u32 start_addr;
+
 		if (max_batch-- == 0) {
 			err = -EAGAIN;
 			goto out;
@@ -832,13 +883,14 @@ static int __xsk_generic_xmit(struct sock *sk)
 		 * if there is space in it. This avoids having to implement
 		 * any buffering in the Tx path.
 		 */
-		err = xsk_cq_reserve_addr_locked(xs->pool, desc.addr);
+		err = xsk_cq_reserve_addr_locked(xs->pool, desc.addr,
+						 &start_addr);
 		if (err) {
 			err = -EAGAIN;
 			goto out;
 		}
 
-		skb = xsk_build_skb(xs, &desc);
+		skb = xsk_build_skb(xs, &desc, start_addr);
 		if (IS_ERR(skb)) {
 			err = PTR_ERR(skb);
 			if (err != -EOVERFLOW)
@@ -1142,6 +1194,7 @@ static int xsk_release(struct socket *sock)
 	xskq_destroy(xs->tx);
 	xskq_destroy(xs->fq_tmp);
 	xskq_destroy(xs->cq_tmp);
+	xskq_destroy(xs->cached_cq);
 
 	sock_orphan(sk);
 	sock->sk = NULL;
@@ -1321,6 +1374,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
 	/* FQ and CQ are now owned by the buffer pool and cleaned up with it. */
 	xs->fq_tmp = NULL;
 	xs->cq_tmp = NULL;
+	xs->cached_cq = NULL;
 
 	xs->dev = dev;
 	xs->zc = xs->umem->zc;
@@ -1458,6 +1512,10 @@ static int xsk_setsockopt(struct socket *sock, int level, int optname,
 		q = (optname == XDP_UMEM_FILL_RING) ? &xs->fq_tmp :
 			&xs->cq_tmp;
 		err = xsk_init_queue(entries, q, true);
+		if (!err && optname == XDP_UMEM_COMPLETION_RING) {
+			q = &xs->cached_cq;
+			err = xsk_init_queue(entries, q, true);
+		}
 		mutex_unlock(&xs->mutex);
 		return err;
 	}
diff --git a/net/xdp/xsk_buff_pool.c b/net/xdp/xsk_buff_pool.c
index aa9788f20d0d..6e170107dec7 100644
--- a/net/xdp/xsk_buff_pool.c
+++ b/net/xdp/xsk_buff_pool.c
@@ -99,6 +99,7 @@ struct xsk_buff_pool *xp_create_and_assign_umem(struct xdp_sock *xs,
 
 	pool->fq = xs->fq_tmp;
 	pool->cq = xs->cq_tmp;
+	pool->cached_cq = xs->cached_cq;
 
 	for (i = 0; i < pool->free_heads_cnt; i++) {
 		xskb = &pool->heads[i];
-- 
2.41.3
Powered by blists - more mailing lists
 
