[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20251120152914.1127975-4-simon.schippers@tu-dortmund.de>
Date: Thu, 20 Nov 2025 16:29:08 +0100
From: Simon Schippers <simon.schippers@...dortmund.de>
To: willemdebruijn.kernel@...il.com, jasowang@...hat.com,
andrew+netdev@...n.ch, davem@...emloft.net, edumazet@...gle.com,
kuba@...nel.org, pabeni@...hat.com, mst@...hat.com,
eperezma@...hat.com, jon@...anix.com, tim.gebauer@...dortmund.de,
simon.schippers@...dortmund.de, netdev@...r.kernel.org,
linux-kernel@...r.kernel.org, kvm@...r.kernel.org,
virtualization@...ts.linux.dev
Subject: [PATCH net-next v6 3/8] tun/tap: add synchronized ring produce/consume with queue management
Implement new ring buffer produce and consume functions for tun and tap
drivers that provide lockless producer-consumer synchronization and
netdev queue management to prevent ptr_ring tail drop and permanent
starvation.
- tun_ring_produce(): Produces packets to the ptr_ring with proper memory
barriers and proactively stops the netdev queue when the ring is about
to become full.
- __tun_ring_consume() / __tap_ring_consume(): Internal consume functions
that check if the netdev queue was stopped due to a full ring, and wake
it when space becomes available. Uses memory barriers to ensure proper
ordering between producer and consumer.
- tun_ring_consume() / tap_ring_consume(): Wrapper functions that acquire
the consumer lock before calling the internal consume functions.
Key features:
- Proactive queue stopping using __ptr_ring_full_next() to stop the queue
before it becomes completely full.
- Not stopping the queue when the ptr_ring is full already, because if
the consumer empties all entries in the meantime, stopping the queue
would cause permanent starvation.
- Conditional queue waking using __ptr_ring_consume_created_space() to
wake the queue only when space is actually created in the ring.
- Prevents permanent starvation by ensuring the queue is also woken when
the ring becomes empty, which can happen when racing the producer.
NB: __always_unused on unused functions, to be removed later in the
series to not break bisectability.
Co-developed-by: Tim Gebauer <tim.gebauer@...dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@...dortmund.de>
Co-developed by: Jon Kohler <jon@...anix.com>
Signed-off-by: Jon Kohler <jon@...anix.com>
Signed-off-by: Simon Schippers <simon.schippers@...dortmund.de>
---
drivers/net/tap.c | 63 +++++++++++++++++++++++++++++
drivers/net/tun.c | 101 ++++++++++++++++++++++++++++++++++++++++++++++
2 files changed, 164 insertions(+)
diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index 1197f245e873..c370a02789eb 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -753,6 +753,69 @@ static ssize_t tap_put_user(struct tap_queue *q,
return ret ? ret : total;
}
+/*
+ * Consume a packet from the transmit ring. Callers must hold
+ * the consumer_lock of the ptr_ring. If the ring was full and the
+ * queue was stopped, this may wake up the queue if space is created.
+ */
+static void *__tap_ring_consume(struct tap_queue *q)
+{
+ struct ptr_ring *ring = &q->ring;
+ struct netdev_queue *txq;
+ struct net_device *dev;
+ bool stopped;
+ void *ptr;
+
+ ptr = __ptr_ring_peek(ring);
+ if (!ptr)
+ return ptr;
+
+ /* Paired with smp_wmb() in the ring producer path. Ensures we
+ * see any updated netdev queue state caused by a full ring.
+ * Needed for proper synchronization between the ring and the
+ * netdev queue.
+ */
+ smp_rmb();
+ rcu_read_lock();
+ dev = rcu_dereference(q->tap)->dev;
+ txq = netdev_get_tx_queue(dev, q->queue_index);
+ stopped = netif_tx_queue_stopped(txq);
+
+ /* Ensures the read for a stopped queue completes before the
+ * discard, so that we don't miss the window to wake the queue if
+ * needed.
+ */
+ smp_rmb();
+ __ptr_ring_discard_one(ring);
+
+ /* If the queue was stopped (meaning the producer couldn't have
+ * inserted new entries just now), and we have actually created
+ * space in the ring, or the ring is now empty (due to a race
+ * with the producer), then it is now safe to wake the queue.
+ */
+ if (unlikely(stopped &&
+ (__ptr_ring_consume_created_space(ring) ||
+ __ptr_ring_empty(ring)))) {
+ /* Paired with smp_rmb() in tun_ring_produce. */
+ smp_wmb();
+ netif_tx_wake_queue(txq);
+ }
+ rcu_read_unlock();
+
+ return ptr;
+}
+
+static __always_unused void *tap_ring_consume(struct tap_queue *q)
+{
+ void *ptr;
+
+ spin_lock(&q->ring.consumer_lock);
+ ptr = __tap_ring_consume(q);
+ spin_unlock(&q->ring.consumer_lock);
+
+ return ptr;
+}
+
static ssize_t tap_do_read(struct tap_queue *q,
struct iov_iter *to,
int noblock, struct sk_buff *skb)
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index 8192740357a0..3b9d8d406ff5 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -999,6 +999,107 @@ static unsigned int run_ebpf_filter(struct tun_struct *tun,
return len;
}
+/* Produce a packet into the transmit ring. If the ring becomes full, the
+ * netdev queue is stopped until the consumer wakes it again.
+ */
+static __always_unused int tun_ring_produce(struct ptr_ring *ring,
+ struct netdev_queue *queue,
+ struct sk_buff *skb)
+{
+ int ret;
+
+ spin_lock(&ring->producer_lock);
+
+ /* Pairs with smp_wmb() in __tun_ring_consume/__tap_ring_consume.
+ * Ensures that freed space by the consumer is visible.
+ */
+ smp_rmb();
+
+ /* Do not stop the netdev queue if the ptr_ring is full already.
+ * The consumer could empty out the ptr_ring in the meantime
+ * without noticing the stopped netdev queue, resulting in a
+ * stopped netdev queue and an empty ptr_ring. In this case the
+ * netdev queue would stay stopped forever.
+ */
+ if (unlikely(!__ptr_ring_full(ring) &&
+ __ptr_ring_full_next(ring)))
+ netif_tx_stop_queue(queue);
+
+ /* Note: __ptr_ring_produce has an internal smp_wmb() to synchronize the
+ * state with the consumer. This ensures that after adding an entry to
+ * the ring, any stopped queue state is visible to the consumer after
+ * dequeueing.
+ */
+ ret = __ptr_ring_produce(ring, skb);
+
+ spin_unlock(&ring->producer_lock);
+
+ return ret;
+}
+
+/*
+ * Consume a packet from the transmit ring. Callers must hold
+ * the consumer_lock of the ptr_ring. If the ring was full and the
+ * queue was stopped, this may wake up the queue if space is created.
+ */
+static void *__tun_ring_consume(struct tun_file *tfile)
+{
+ struct ptr_ring *ring = &tfile->tx_ring;
+ struct netdev_queue *txq;
+ struct net_device *dev;
+ bool stopped;
+ void *ptr;
+
+ ptr = __ptr_ring_peek(ring);
+ if (!ptr)
+ return ptr;
+
+ /* Paired with smp_wmb() in the ring producer path. Ensures we
+ * see any updated netdev queue state caused by a full ring.
+ * Needed for proper synchronization between the ring and the
+ * netdev queue.
+ */
+ smp_rmb();
+ rcu_read_lock();
+ dev = rcu_dereference(tfile->tun)->dev;
+ txq = netdev_get_tx_queue(dev, tfile->queue_index);
+ stopped = netif_tx_queue_stopped(txq);
+
+ /* Ensures the read for a stopped queue completes before the
+ * discard, so that we don't miss the window to wake the queue if
+ * needed.
+ */
+ smp_rmb();
+ __ptr_ring_discard_one(ring);
+
+ /* If the queue was stopped (meaning the producer couldn't have
+ * inserted new entries just now), and we have actually created
+ * space in the ring, or the ring is now empty (due to a race
+ * with the producer), then it is now safe to wake the queue.
+ */
+ if (unlikely(stopped &&
+ (__ptr_ring_consume_created_space(ring) ||
+ __ptr_ring_empty(ring)))) {
+ /* Paired with smp_rmb() in tun_ring_produce. */
+ smp_wmb();
+ netif_tx_wake_queue(txq);
+ }
+ rcu_read_unlock();
+
+ return ptr;
+}
+
+static void __always_unused *tun_ring_consume(struct tun_file *tfile)
+{
+ void *ptr;
+
+ spin_lock(&tfile->tx_ring.consumer_lock);
+ ptr = __tun_ring_consume(tfile);
+ spin_unlock(&tfile->tx_ring.consumer_lock);
+
+ return ptr;
+}
+
/* Net device start xmit */
static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
{
--
2.43.0
Powered by blists - more mailing lists