[<prev] [next>] [thread-next>] [day] [month] [year] [list]
Message-Id: <20250705135512.1963216-1-maciej.fijalkowski@intel.com>
Date: Sat, 5 Jul 2025 15:55:12 +0200
From: Maciej Fijalkowski <maciej.fijalkowski@...el.com>
To: bpf@...r.kernel.org,
ast@...nel.org,
daniel@...earbox.net,
andrii@...nel.org
Cc: netdev@...r.kernel.org,
magnus.karlsson@...el.com,
stfomichev@...il.com,
Maciej Fijalkowski <maciej.fijalkowski@...el.com>,
Eryk Kubanski <e.kubanski@...tner.samsung.com>
Subject: [PATCH v2 bpf] xsk: fix immature cq descriptor production
Eryk reported an issue that I have put under Closes: tag, related to
umem addrs being prematurely produced onto pool's completion queue.
Let us make the skb's destructor responsible for producing all addrs
that given skb used.
Commit from fixes tag introduced the buggy behavior, it was not broken
from day 1, but rather when xsk multi-buffer got introduced.
Introduce a struct which will carry descriptor count with array of
addresses taken from processed descriptors that will be carried via
skb_shared_info::destructor_arg. This way we can refer to it within
xsk_destruct_skb().
To summarize, behavior is changed from:
- produce addr to cq, increase cq's cached_prod
- increment descriptor count and store it on
- (xmit and rest of path...)
skb_shared_info::destructor_arg
- use destructor_arg on skb destructor to update global state of cq
producer
to the following:
- increment cq's cached_prod
- increment descriptor count, save xdp_desc::addr in custom array and
store this custom array on skb_shared_info::destructor_arg
- (xmit and rest of path...)
- use destructor_arg on skb destructor to walk the array of addrs and
write them to cq and finally update global state of cq producer
Fixes: b7f72a30e9ac ("xsk: introduce wrappers and helpers for supporting multi-buffer in Tx path")
Reported-by: Eryk Kubanski <e.kubanski@...tner.samsung.com>
Closes: https://lore.kernel.org/netdev/20250530103456.53564-1-e.kubanski@partner.samsung.com/
Signed-off-by: Maciej Fijalkowski <maciej.fijalkowski@...el.com>
---
v1:
https://lore.kernel.org/bpf/20250702101648.1942562-1-maciej.fijalkowski@intel.com/
v1->v2:
* store addrs in array carried via destructor_arg instead having them
stored in skb headroom; cleaner and less hacky approach;
---
net/xdp/xsk.c | 79 ++++++++++++++++++++++++++++++++++-----------
net/xdp/xsk_queue.h | 12 +++++++
2 files changed, 73 insertions(+), 18 deletions(-)
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 72c000c0ae5f..9f0ce87d440f 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -36,6 +36,11 @@
#define TX_BATCH_SIZE 32
#define MAX_PER_SOCKET_BUDGET (TX_BATCH_SIZE)
+struct xsk_addrs {
+ u32 num_descs;
+ u64 addrs[MAX_SKB_FRAGS + 1];
+};
+
void xsk_set_rx_need_wakeup(struct xsk_buff_pool *pool)
{
if (pool->cached_need_wakeup & XDP_WAKEUP_RX)
@@ -528,25 +533,38 @@ static int xsk_wakeup(struct xdp_sock *xs, u8 flags)
return dev->netdev_ops->ndo_xsk_wakeup(dev, xs->queue_id, flags);
}
-static int xsk_cq_reserve_addr_locked(struct xsk_buff_pool *pool, u64 addr)
+static int xsk_cq_reserve_locked(struct xsk_buff_pool *pool)
{
unsigned long flags;
int ret;
spin_lock_irqsave(&pool->cq_lock, flags);
- ret = xskq_prod_reserve_addr(pool->cq, addr);
+ ret = xskq_prod_reserve(pool->cq);
spin_unlock_irqrestore(&pool->cq_lock, flags);
return ret;
}
-static void xsk_cq_submit_locked(struct xsk_buff_pool *pool, u32 n)
+static void xsk_cq_submit_addr_locked(struct xsk_buff_pool *pool,
+ struct sk_buff *skb)
{
+ struct xsk_addrs *xsk_addrs;
unsigned long flags;
+ u32 num_desc, i;
+ u32 idx;
+
+ xsk_addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
+ num_desc = xsk_addrs->num_descs;
spin_lock_irqsave(&pool->cq_lock, flags);
- xskq_prod_submit_n(pool->cq, n);
+ idx = xskq_get_prod(pool->cq);
+
+ for (i = 0; i < num_desc; i++, idx++)
+ xskq_prod_write_addr(pool->cq, idx, xsk_addrs->addrs[i]);
+ xskq_prod_submit_n(pool->cq, num_desc);
+
spin_unlock_irqrestore(&pool->cq_lock, flags);
+ kfree(xsk_addrs);
}
static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
@@ -558,29 +576,37 @@ static void xsk_cq_cancel_locked(struct xsk_buff_pool *pool, u32 n)
spin_unlock_irqrestore(&pool->cq_lock, flags);
}
-static u32 xsk_get_num_desc(struct sk_buff *skb)
-{
- return skb ? (long)skb_shinfo(skb)->destructor_arg : 0;
-}
-
static void xsk_destruct_skb(struct sk_buff *skb)
{
struct xsk_tx_metadata_compl *compl = &skb_shinfo(skb)->xsk_meta;
- if (compl->tx_timestamp) {
+ if (compl->tx_timestamp)
/* sw completion timestamp, not a real one */
*compl->tx_timestamp = ktime_get_tai_fast_ns();
- }
- xsk_cq_submit_locked(xdp_sk(skb->sk)->pool, xsk_get_num_desc(skb));
+ xsk_cq_submit_addr_locked(xdp_sk(skb->sk)->pool, skb);
sock_wfree(skb);
}
-static void xsk_set_destructor_arg(struct sk_buff *skb)
+static u32 xsk_get_num_desc(struct sk_buff *skb)
{
- long num = xsk_get_num_desc(xdp_sk(skb->sk)->skb) + 1;
+ struct xsk_addrs *addrs;
- skb_shinfo(skb)->destructor_arg = (void *)num;
+ addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
+ return addrs->num_descs;
+}
+
+static void xsk_set_destructor_arg(struct sk_buff *skb, struct xsk_addrs *addrs)
+{
+ skb_shinfo(skb)->destructor_arg = (void *)addrs;
+}
+
+static void xsk_inc_skb_descs(struct sk_buff *skb)
+{
+ struct xsk_addrs *addrs;
+
+ addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
+ addrs->num_descs++;
}
static void xsk_consume_skb(struct sk_buff *skb)
@@ -605,6 +631,7 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
{
struct xsk_buff_pool *pool = xs->pool;
u32 hr, len, ts, offset, copy, copied;
+ struct xsk_addrs *addrs = NULL;
struct sk_buff *skb = xs->skb;
struct page *page;
void *buffer;
@@ -619,6 +646,12 @@ static struct sk_buff *xsk_build_skb_zerocopy(struct xdp_sock *xs,
return ERR_PTR(err);
skb_reserve(skb, hr);
+
+ addrs = kzalloc(sizeof(*addrs), GFP_KERNEL);
+ if (!addrs)
+ return ERR_PTR(-ENOMEM);
+
+ xsk_set_destructor_arg(skb, addrs);
}
addr = desc->addr;
@@ -658,6 +691,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
{
struct xsk_tx_metadata *meta = NULL;
struct net_device *dev = xs->dev;
+ struct xsk_addrs *addrs = NULL;
struct sk_buff *skb = xs->skb;
bool first_frag = false;
int err;
@@ -690,6 +724,13 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
err = skb_store_bits(skb, 0, buffer, len);
if (unlikely(err))
goto free_err;
+
+ addrs = kzalloc(sizeof(*addrs), GFP_KERNEL);
+ if (!addrs)
+ goto free_err;
+
+ xsk_set_destructor_arg(skb, addrs);
+
} else {
int nr_frags = skb_shinfo(skb)->nr_frags;
struct page *page;
@@ -755,7 +796,9 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
skb->mark = READ_ONCE(xs->sk.sk_mark);
skb->destructor = xsk_destruct_skb;
xsk_tx_metadata_to_compl(meta, &skb_shinfo(skb)->xsk_meta);
- xsk_set_destructor_arg(skb);
+
+ addrs = (struct xsk_addrs *)skb_shinfo(skb)->destructor_arg;
+ addrs->addrs[addrs->num_descs++] = desc->addr;
return skb;
@@ -765,7 +808,7 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
if (err == -EOVERFLOW) {
/* Drop the packet */
- xsk_set_destructor_arg(xs->skb);
+ xsk_inc_skb_descs(xs->skb);
xsk_drop_skb(xs->skb);
xskq_cons_release(xs->tx);
} else {
@@ -807,7 +850,7 @@ static int __xsk_generic_xmit(struct sock *sk)
* if there is space in it. This avoids having to implement
* any buffering in the Tx path.
*/
- err = xsk_cq_reserve_addr_locked(xs->pool, desc.addr);
+ err = xsk_cq_reserve_locked(xs->pool);
if (err) {
err = -EAGAIN;
goto out;
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index 46d87e961ad6..f16f390370dc 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -344,6 +344,11 @@ static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
/* Functions for producers */
+static inline u32 xskq_get_prod(struct xsk_queue *q)
+{
+ return READ_ONCE(q->ring->producer);
+}
+
static inline u32 xskq_prod_nb_free(struct xsk_queue *q, u32 max)
{
u32 free_entries = q->nentries - (q->cached_prod - q->cached_cons);
@@ -390,6 +395,13 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
return 0;
}
+static inline void xskq_prod_write_addr(struct xsk_queue *q, u32 idx, u64 addr)
+{
+ struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+
+ ring->desc[idx & q->ring_mask] = addr;
+}
+
static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
u32 nb_entries)
{
--
2.34.1
Powered by blists - more mailing lists