[<prev] [next>] [<thread-prev] [day] [month] [year] [list]
Message-Id: <20250811131236.56206-3-kerneljasonxing@gmail.com>
Date: Mon, 11 Aug 2025 21:12:36 +0800
From: Jason Xing <kerneljasonxing@...il.com>
To: davem@...emloft.net,
edumazet@...gle.com,
kuba@...nel.org,
pabeni@...hat.com,
bjorn@...nel.org,
magnus.karlsson@...el.com,
maciej.fijalkowski@...el.com,
jonathan.lemon@...il.com,
sdf@...ichev.me,
ast@...nel.org,
daniel@...earbox.net,
hawk@...nel.org,
john.fastabend@...il.com,
horms@...nel.org,
andrew+netdev@...n.ch
Cc: bpf@...r.kernel.org,
netdev@...r.kernel.org,
Jason Xing <kernelxing@...cent.com>
Subject: [PATCH net-next 2/2] xsk: support generic batch xmit in copy mode
From: Jason Xing <kernelxing@...cent.com>
Zerocopy mode has a good feature named multi buffer while copy mode has
to transmit skb one by one like normal flows. The latter might lose the
bypass power to some extent because of grabbing/releasing the same tx
queue lock and disabling/enabling bh and stuff on a packet basis.
Contending the same queue lock will bring a worse result.
This patch supports batch feature by permitting owning the queue lock to
send the generic_xmit_batch number of packets at one time. To further
achieve a better result, some codes[1] are removed on purpose from
xsk_direct_xmit_batch() as referred to __dev_direct_xmit().
[1]
1. advance the device check to granularity of sendto syscall.
2. remove validating packets because of its uselessness.
3. remove operation of softnet_data.xmit.recursion because it's not
necessary.
4. remove BQL flow control. We don't need to do BQL control because it
probably limit the speed. An ideal scenario is to use a standalone and
clean tx queue to send packets only for xsk. Less competition shows
better performance results.
Experiments:
1) Tested on virtio_net:
With this patch series applied, the performance number of xdpsock[2] goes
up by 33%. Before, it was 767743 pps; while after it was 1021486 pps.
If we test with another thread competing the same queue, a 28% increase
(from 405466 pps to 521076 pps) can be observed.
2) Tested on ixgbe:
The results of zerocopy and copy mode are respectively 1303277 pps and
1187347 pps. After this socket option took effect, copy mode reaches
1472367 which was higher than zerocopy mode impressively.
[2]: ./xdpsock -i eth1 -t -S -s 64
It's worth mentioning batch process might bring high latency in certain
cases. The recommended value is 32.
Signed-off-by: Jason Xing <kernelxing@...cent.com>
---
include/linux/netdevice.h | 2 +
net/core/dev.c | 18 +++++++
net/xdp/xsk.c | 103 ++++++++++++++++++++++++++++++++++++--
3 files changed, 120 insertions(+), 3 deletions(-)
diff --git a/include/linux/netdevice.h b/include/linux/netdevice.h
index 5e5de4b0a433..27738894daa7 100644
--- a/include/linux/netdevice.h
+++ b/include/linux/netdevice.h
@@ -3352,6 +3352,8 @@ u16 dev_pick_tx_zero(struct net_device *dev, struct sk_buff *skb,
int __dev_queue_xmit(struct sk_buff *skb, struct net_device *sb_dev);
int __dev_direct_xmit(struct sk_buff *skb, u16 queue_id);
+int xsk_direct_xmit_batch(struct sk_buff **skb, struct net_device *dev,
+ struct netdev_queue *txq, u32 max_batch, u32 *cur);
static inline int dev_queue_xmit(struct sk_buff *skb)
{
diff --git a/net/core/dev.c b/net/core/dev.c
index 68dc47d7e700..7a512bd38806 100644
--- a/net/core/dev.c
+++ b/net/core/dev.c
@@ -4742,6 +4742,24 @@ int __dev_queue_xmit(struct sk_buff *skb, struct net_device *sb_dev)
}
EXPORT_SYMBOL(__dev_queue_xmit);
+int xsk_direct_xmit_batch(struct sk_buff **skb, struct net_device *dev,
+ struct netdev_queue *txq, u32 max_batch, u32 *cur)
+{
+ int ret = NETDEV_TX_BUSY;
+
+ local_bh_disable();
+ HARD_TX_LOCK(dev, txq, smp_processor_id());
+ for (; *cur < max_batch; (*cur)++) {
+ ret = netdev_start_xmit(skb[*cur], dev, txq, false);
+ if (ret != NETDEV_TX_OK)
+ break;
+ }
+ HARD_TX_UNLOCK(dev, txq);
+ local_bh_enable();
+
+ return ret;
+}
+
int __dev_direct_xmit(struct sk_buff *skb, u16 queue_id)
{
struct net_device *dev = skb->dev;
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index 7a149f4ac273..92ad82472776 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -780,9 +780,102 @@ static struct sk_buff *xsk_build_skb(struct xdp_sock *xs,
return ERR_PTR(err);
}
-static int __xsk_generic_xmit(struct sock *sk)
+static int __xsk_generic_xmit_batch(struct xdp_sock *xs)
+{
+ u32 max_batch = READ_ONCE(xs->generic_xmit_batch);
+ struct sk_buff **skb = xs->skb_batch;
+ struct net_device *dev = xs->dev;
+ struct netdev_queue *txq;
+ bool sent_frame = false;
+ struct xdp_desc desc;
+ u32 i = 0, j = 0;
+ u32 max_budget;
+ int err = 0;
+
+ mutex_lock(&xs->mutex);
+
+ /* Since we dropped the RCU read lock, the socket state might have changed. */
+ if (unlikely(!xsk_is_bound(xs))) {
+ err = -ENXIO;
+ goto out;
+ }
+
+ if (xs->queue_id >= dev->real_num_tx_queues)
+ goto out;
+
+ if (unlikely(!netif_running(dev) ||
+ !netif_carrier_ok(dev)))
+ goto out;
+
+ max_budget = READ_ONCE(xs->max_tx_budget);
+ txq = netdev_get_tx_queue(dev, xs->queue_id);
+ do {
+ for (; i < max_batch && xskq_cons_peek_desc(xs->tx, &desc, xs->pool); i++) {
+ if (max_budget-- == 0) {
+ err = -EAGAIN;
+ break;
+ }
+ /* This is the backpressure mechanism for the Tx path.
+ * Reserve space in the completion queue and only proceed
+ * if there is space in it. This avoids having to implement
+ * any buffering in the Tx path.
+ */
+ err = xsk_cq_reserve_addr_locked(xs->pool, desc.addr);
+ if (err) {
+ err = -EAGAIN;
+ break;
+ }
+
+ skb[i] = xsk_build_skb(xs, &desc);
+ if (IS_ERR(skb[i])) {
+ err = PTR_ERR(skb[i]);
+ break;
+ }
+
+ xskq_cons_release(xs->tx);
+
+ if (xp_mb_desc(&desc))
+ xs->skb = skb[i];
+ }
+
+ if (i) {
+ err = xsk_direct_xmit_batch(skb, dev, txq, i, &j);
+ if (err == NETDEV_TX_BUSY) {
+ err = -EAGAIN;
+ } else if (err == NET_XMIT_DROP) {
+ j++;
+ err = -EBUSY;
+ }
+
+ sent_frame = true;
+ xs->skb = NULL;
+ }
+
+ if (err)
+ goto out;
+ i = j = 0;
+ } while (xskq_cons_peek_desc(xs->tx, &desc, xs->pool));
+
+ if (xskq_has_descs(xs->tx)) {
+ if (xs->skb)
+ xsk_drop_skb(xs->skb);
+ xskq_cons_release(xs->tx);
+ }
+
+out:
+ for (; j < i; j++) {
+ xskq_cons_cancel_n(xs->tx, xsk_get_num_desc(skb[j]));
+ xsk_consume_skb(skb[j]);
+ }
+ if (sent_frame)
+ __xsk_tx_release(xs);
+
+ mutex_unlock(&xs->mutex);
+ return err;
+}
+
+static int __xsk_generic_xmit(struct xdp_sock *xs)
{
- struct xdp_sock *xs = xdp_sk(sk);
bool sent_frame = false;
struct xdp_desc desc;
struct sk_buff *skb;
@@ -871,11 +964,15 @@ static int __xsk_generic_xmit(struct sock *sk)
static int xsk_generic_xmit(struct sock *sk)
{
+ struct xdp_sock *xs = xdp_sk(sk);
int ret;
/* Drop the RCU lock since the SKB path might sleep. */
rcu_read_unlock();
- ret = __xsk_generic_xmit(sk);
+ if (READ_ONCE(xs->generic_xmit_batch))
+ ret = __xsk_generic_xmit_batch(xs);
+ else
+ ret = __xsk_generic_xmit(xs);
/* Reaquire RCU lock before going into common code. */
rcu_read_lock();
--
2.41.3
Powered by blists - more mailing lists