lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite for Android: free password hash cracker in your pocket
[<prev] [next>] [thread-next>] [day] [month] [year] [list]
Message-Id: <20220830121705.8618-1-maciej.fijalkowski@intel.com>
Date:   Tue, 30 Aug 2022 14:17:05 +0200
From:   Maciej Fijalkowski <maciej.fijalkowski@...el.com>
To:     bpf@...r.kernel.org, ast@...nel.org, daniel@...earbox.net,
        andrii@...nel.org
Cc:     netdev@...r.kernel.org, magnus.karlsson@...el.com,
        bjorn@...nel.org, Maciej Fijalkowski <maciej.fijalkowski@...el.com>
Subject: [PATCH bpf] xsk: fix backpressure mechanism on Tx

Commit d678cbd2f867 ("xsk: Fix handling of invalid descriptors in XSK TX
batching API") fixed batch API usage against set of descriptors with
invalid ones but introduced a problem when AF_XDP SW rings are smaller
than HW ones. Mismatch of reported Tx'ed frames between HW generator and
user space app was observed. It turned out that backpressure mechanism
became a bottleneck when the amount of produced descriptors to CQ is
lower than what we grabbed from XSK Tx ring.

Say that 512 entries had been taken from XSK Tx ring but we had only 490
free entries in CQ. Then callsite (ZC driver) will produce only 490
entries onto HW Tx ring but 512 entries will be released from Tx ring
and this is what will be seen by the user space.

In order to fix this case, mix XSK Tx/CQ ring interractions by moving
around internal functions and changing call order:
*  pull out xskq_prod_nb_free() from xskq_prod_reserve_addr_batch() up to
   xsk_tx_peek_release_desc_batch();
** move xskq_cons_release_n() into xskq_cons_read_desc_batch()

After doing so, algorithm can be described as follows:
1. lookup Tx entries
2. use value from 1. to reserve space in CQ (*)
3. Read from Tx ring as much descriptors as value from 2
 3a. release descriptors from XSK Tx ring (**)
4. Finally produce addresses to CQ

Fixes: d678cbd2f867 ("xsk: Fix handling of invalid descriptors in XSK TX batching API")
Signed-off-by: Magnus Karlsson <magnus.karlsson@...el.com>
Signed-off-by: Maciej Fijalkowski <maciej.fijalkowski@...el.com>
---
 net/xdp/xsk.c       | 22 +++++++++++-----------
 net/xdp/xsk_queue.h | 22 ++++++++++------------
 2 files changed, 21 insertions(+), 23 deletions(-)

diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 5b4ce6ba1bc7..639b2c3beb69 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -355,16 +355,15 @@ static u32 xsk_tx_peek_release_fallback(struct xsk_buff_pool *pool, u32 max_entr
 	return nb_pkts;
 }
 
-u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 max_entries)
+u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 nb_pkts)
 {
 	struct xdp_sock *xs;
-	u32 nb_pkts;
 
 	rcu_read_lock();
 	if (!list_is_singular(&pool->xsk_tx_list)) {
 		/* Fallback to the non-batched version */
 		rcu_read_unlock();
-		return xsk_tx_peek_release_fallback(pool, max_entries);
+		return xsk_tx_peek_release_fallback(pool, nb_pkts);
 	}
 
 	xs = list_first_or_null_rcu(&pool->xsk_tx_list, struct xdp_sock, tx_list);
@@ -373,12 +372,7 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 max_entries)
 		goto out;
 	}
 
-	max_entries = xskq_cons_nb_entries(xs->tx, max_entries);
-	nb_pkts = xskq_cons_read_desc_batch(xs->tx, pool, max_entries);
-	if (!nb_pkts) {
-		xs->tx->queue_empty_descs++;
-		goto out;
-	}
+	nb_pkts = xskq_cons_nb_entries(xs->tx, nb_pkts);
 
 	/* This is the backpressure mechanism for the Tx path. Try to
 	 * reserve space in the completion queue for all packets, but
@@ -386,12 +380,18 @@ u32 xsk_tx_peek_release_desc_batch(struct xsk_buff_pool *pool, u32 max_entries)
 	 * packets. This avoids having to implement any buffering in
 	 * the Tx path.
 	 */
-	nb_pkts = xskq_prod_reserve_addr_batch(pool->cq, pool->tx_descs, nb_pkts);
+	nb_pkts = xskq_prod_nb_free(pool->cq, nb_pkts);
 	if (!nb_pkts)
 		goto out;
 
-	xskq_cons_release_n(xs->tx, max_entries);
+	nb_pkts = xskq_cons_read_desc_batch(xs->tx, pool, nb_pkts);
+	if (!nb_pkts) {
+		xs->tx->queue_empty_descs++;
+		goto out;
+	}
+
 	__xskq_cons_release(xs->tx);
+	xskq_prod_write_addr_batch(pool->cq, pool->tx_descs, nb_pkts);
 	xs->sk.sk_write_space(&xs->sk);
 
 out:
diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
index fb20bf7207cf..c6fb6b763658 100644
--- a/net/xdp/xsk_queue.h
+++ b/net/xdp/xsk_queue.h
@@ -205,6 +205,11 @@ static inline bool xskq_cons_read_desc(struct xsk_queue *q,
 	return false;
 }
 
+static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt)
+{
+	q->cached_cons += cnt;
+}
+
 static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff_pool *pool,
 					    u32 max)
 {
@@ -226,6 +231,8 @@ static inline u32 xskq_cons_read_desc_batch(struct xsk_queue *q, struct xsk_buff
 		cached_cons++;
 	}
 
+	/* Release valid plus any invalid entries */
+	xskq_cons_release_n(q, cached_cons - q->cached_cons);
 	return nb_entries;
 }
 
@@ -291,11 +298,6 @@ static inline void xskq_cons_release(struct xsk_queue *q)
 	q->cached_cons++;
 }
 
-static inline void xskq_cons_release_n(struct xsk_queue *q, u32 cnt)
-{
-	q->cached_cons += cnt;
-}
-
 static inline u32 xskq_cons_present_entries(struct xsk_queue *q)
 {
 	/* No barriers needed since data is not accessed */
@@ -350,21 +352,17 @@ static inline int xskq_prod_reserve_addr(struct xsk_queue *q, u64 addr)
 	return 0;
 }
 
-static inline u32 xskq_prod_reserve_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
-					       u32 max)
+static inline void xskq_prod_write_addr_batch(struct xsk_queue *q, struct xdp_desc *descs,
+					      u32 nb_entries)
 {
 	struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
-	u32 nb_entries, i, cached_prod;
-
-	nb_entries = xskq_prod_nb_free(q, max);
+	u32 i, cached_prod;
 
 	/* A, matches D */
 	cached_prod = q->cached_prod;
 	for (i = 0; i < nb_entries; i++)
 		ring->desc[cached_prod++ & q->ring_mask] = descs[i].addr;
 	q->cached_prod = cached_prod;
-
-	return nb_entries;
 }
 
 static inline int xskq_prod_reserve_desc(struct xsk_queue *q,
-- 
2.34.1

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ