lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20251120152914.1127975-4-simon.schippers@tu-dortmund.de>
Date: Thu, 20 Nov 2025 16:29:08 +0100
From: Simon Schippers <simon.schippers@...dortmund.de>
To: willemdebruijn.kernel@...il.com, jasowang@...hat.com,
        andrew+netdev@...n.ch, davem@...emloft.net, edumazet@...gle.com,
        kuba@...nel.org, pabeni@...hat.com, mst@...hat.com,
        eperezma@...hat.com, jon@...anix.com, tim.gebauer@...dortmund.de,
        simon.schippers@...dortmund.de, netdev@...r.kernel.org,
        linux-kernel@...r.kernel.org, kvm@...r.kernel.org,
        virtualization@...ts.linux.dev
Subject: [PATCH net-next v6 3/8] tun/tap: add synchronized ring produce/consume with queue management

Implement new ring buffer produce and consume functions for tun and tap
drivers that provide lockless producer-consumer synchronization and
netdev queue management to prevent ptr_ring tail drop and permanent
starvation.

- tun_ring_produce(): Produces packets to the ptr_ring with proper memory
  barriers and proactively stops the netdev queue when the ring is about
  to become full.

- __tun_ring_consume() / __tap_ring_consume(): Internal consume functions
  that check if the netdev queue was stopped due to a full ring, and wake
  it when space becomes available. Uses memory barriers to ensure proper
  ordering between producer and consumer.

- tun_ring_consume() / tap_ring_consume(): Wrapper functions that acquire
  the consumer lock before calling the internal consume functions.

Key features:
- Proactive queue stopping using __ptr_ring_full_next() to stop the queue
  before it becomes completely full.
- Not stopping the queue when the ptr_ring is full already, because if
  the consumer empties all entries in the meantime, stopping the queue
  would cause permanent starvation.
- Conditional queue waking using __ptr_ring_consume_created_space() to
  wake the queue only when space is actually created in the ring.
- Prevents permanent starvation by ensuring the queue is also woken when
  the ring becomes empty, which can happen when racing the producer.

NB: __always_unused on unused functions, to be removed later in the
series to not break bisectability.

Co-developed-by: Tim Gebauer <tim.gebauer@...dortmund.de>
Signed-off-by: Tim Gebauer <tim.gebauer@...dortmund.de>
Co-developed by: Jon Kohler <jon@...anix.com>
Signed-off-by: Jon Kohler <jon@...anix.com>
Signed-off-by: Simon Schippers <simon.schippers@...dortmund.de>
---
 drivers/net/tap.c |  63 +++++++++++++++++++++++++++++
 drivers/net/tun.c | 101 ++++++++++++++++++++++++++++++++++++++++++++++
 2 files changed, 164 insertions(+)

diff --git a/drivers/net/tap.c b/drivers/net/tap.c
index 1197f245e873..c370a02789eb 100644
--- a/drivers/net/tap.c
+++ b/drivers/net/tap.c
@@ -753,6 +753,69 @@ static ssize_t tap_put_user(struct tap_queue *q,
 	return ret ? ret : total;
 }
 
+/*
+ * Consume a packet from the transmit ring. Callers must hold
+ * the consumer_lock of the ptr_ring. If the ring was full and the
+ * queue was stopped, this may wake up the queue if space is created.
+ */
+static void *__tap_ring_consume(struct tap_queue *q)
+{
+	struct ptr_ring *ring = &q->ring;
+	struct netdev_queue *txq;
+	struct net_device *dev;
+	bool stopped;
+	void *ptr;
+
+	ptr = __ptr_ring_peek(ring);
+	if (!ptr)
+		return ptr;
+
+	/* Paired with smp_wmb() in the ring producer path. Ensures we
+	 * see any updated netdev queue state caused by a full ring.
+	 * Needed for proper synchronization between the ring and the
+	 * netdev queue.
+	 */
+	smp_rmb();
+	rcu_read_lock();
+	dev = rcu_dereference(q->tap)->dev;
+	txq = netdev_get_tx_queue(dev, q->queue_index);
+	stopped = netif_tx_queue_stopped(txq);
+
+	/* Ensures the read for a stopped queue completes before the
+	 * discard, so that we don't miss the window to wake the queue if
+	 * needed.
+	 */
+	smp_rmb();
+	__ptr_ring_discard_one(ring);
+
+	/* If the queue was stopped (meaning the producer couldn't have
+	 * inserted new entries just now), and we have actually created
+	 * space in the ring, or the ring is now empty (due to a race
+	 * with the producer), then it is now safe to wake the queue.
+	 */
+	if (unlikely(stopped &&
+		     (__ptr_ring_consume_created_space(ring) ||
+		      __ptr_ring_empty(ring)))) {
+		/* Paired with smp_rmb() in tun_ring_produce. */
+		smp_wmb();
+		netif_tx_wake_queue(txq);
+	}
+	rcu_read_unlock();
+
+	return ptr;
+}
+
+static __always_unused void *tap_ring_consume(struct tap_queue *q)
+{
+	void *ptr;
+
+	spin_lock(&q->ring.consumer_lock);
+	ptr = __tap_ring_consume(q);
+	spin_unlock(&q->ring.consumer_lock);
+
+	return ptr;
+}
+
 static ssize_t tap_do_read(struct tap_queue *q,
 			   struct iov_iter *to,
 			   int noblock, struct sk_buff *skb)
diff --git a/drivers/net/tun.c b/drivers/net/tun.c
index 8192740357a0..3b9d8d406ff5 100644
--- a/drivers/net/tun.c
+++ b/drivers/net/tun.c
@@ -999,6 +999,107 @@ static unsigned int run_ebpf_filter(struct tun_struct *tun,
 	return len;
 }
 
+/* Produce a packet into the transmit ring. If the ring becomes full, the
+ * netdev queue is stopped until the consumer wakes it again.
+ */
+static __always_unused int tun_ring_produce(struct ptr_ring *ring,
+					    struct netdev_queue *queue,
+					    struct sk_buff *skb)
+{
+	int ret;
+
+	spin_lock(&ring->producer_lock);
+
+	/* Pairs with smp_wmb() in __tun_ring_consume/__tap_ring_consume.
+	 * Ensures that freed space by the consumer is visible.
+	 */
+	smp_rmb();
+
+	/* Do not stop the netdev queue if the ptr_ring is full already.
+	 * The consumer could empty out the ptr_ring in the meantime
+	 * without noticing the stopped netdev queue, resulting in a
+	 * stopped netdev queue and an empty ptr_ring. In this case the
+	 * netdev queue would stay stopped forever.
+	 */
+	if (unlikely(!__ptr_ring_full(ring) &&
+		     __ptr_ring_full_next(ring)))
+		netif_tx_stop_queue(queue);
+
+	/* Note: __ptr_ring_produce has an internal smp_wmb() to synchronize the
+	 * state with the consumer. This ensures that after adding an entry to
+	 * the ring, any stopped queue state is visible to the consumer after
+	 * dequeueing.
+	 */
+	ret = __ptr_ring_produce(ring, skb);
+
+	spin_unlock(&ring->producer_lock);
+
+	return ret;
+}
+
+/*
+ * Consume a packet from the transmit ring. Callers must hold
+ * the consumer_lock of the ptr_ring. If the ring was full and the
+ * queue was stopped, this may wake up the queue if space is created.
+ */
+static void *__tun_ring_consume(struct tun_file *tfile)
+{
+	struct ptr_ring *ring = &tfile->tx_ring;
+	struct netdev_queue *txq;
+	struct net_device *dev;
+	bool stopped;
+	void *ptr;
+
+	ptr = __ptr_ring_peek(ring);
+	if (!ptr)
+		return ptr;
+
+	/* Paired with smp_wmb() in the ring producer path. Ensures we
+	 * see any updated netdev queue state caused by a full ring.
+	 * Needed for proper synchronization between the ring and the
+	 * netdev queue.
+	 */
+	smp_rmb();
+	rcu_read_lock();
+	dev = rcu_dereference(tfile->tun)->dev;
+	txq = netdev_get_tx_queue(dev, tfile->queue_index);
+	stopped = netif_tx_queue_stopped(txq);
+
+	/* Ensures the read for a stopped queue completes before the
+	 * discard, so that we don't miss the window to wake the queue if
+	 * needed.
+	 */
+	smp_rmb();
+	__ptr_ring_discard_one(ring);
+
+	/* If the queue was stopped (meaning the producer couldn't have
+	 * inserted new entries just now), and we have actually created
+	 * space in the ring, or the ring is now empty (due to a race
+	 * with the producer), then it is now safe to wake the queue.
+	 */
+	if (unlikely(stopped &&
+		     (__ptr_ring_consume_created_space(ring) ||
+		      __ptr_ring_empty(ring)))) {
+		/* Paired with smp_rmb() in tun_ring_produce. */
+		smp_wmb();
+		netif_tx_wake_queue(txq);
+	}
+	rcu_read_unlock();
+
+	return ptr;
+}
+
+static void __always_unused *tun_ring_consume(struct tun_file *tfile)
+{
+	void *ptr;
+
+	spin_lock(&tfile->tx_ring.consumer_lock);
+	ptr = __tun_ring_consume(tfile);
+	spin_unlock(&tfile->tx_ring.consumer_lock);
+
+	return ptr;
+}
+
 /* Net device start xmit */
 static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
 {
-- 
2.43.0


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ