lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <4dde6d41-2a26-47b8-aef1-4967f7fc94ab@tu-dortmund.de>
Date: Sun, 28 Sep 2025 23:27:25 +0200
From: Simon Schippers <simon.schippers@...dortmund.de>
To: "Michael S. Tsirkin" <mst@...hat.com>
Cc: willemdebruijn.kernel@...il.com, jasowang@...hat.com, eperezma@...hat.com,
        stephen@...workplumber.org, leiyang@...hat.com, netdev@...r.kernel.org,
        linux-kernel@...r.kernel.org, virtualization@...ts.linux.dev,
        kvm@...r.kernel.org, Tim Gebauer <tim.gebauer@...dortmund.de>
Subject: [PATCH net-next v5 4/8] TUN & TAP: Wake netdev queue after consuming
 an entry

On 23.09.25 18:36, Michael S. Tsirkin wrote:
> On Tue, Sep 23, 2025 at 12:15:49AM +0200, Simon Schippers wrote:
>> The new wrappers tun_ring_consume/tap_ring_consume deal with consuming an
>> entry of the ptr_ring and then waking the netdev queue when entries got
>> invalidated to be used again by the producer.
>> To avoid waking the netdev queue when the ptr_ring is full, it is checked
>> if the netdev queue is stopped before invalidating entries. Like that the
>> netdev queue can be safely woken after invalidating entries.
>>
>> The READ_ONCE in __ptr_ring_peek, paired with the smp_wmb() in
>> __ptr_ring_produce within tun_net_xmit guarantees that the information
>> about the netdev queue being stopped is visible after __ptr_ring_peek is
>> called.
>>
>> The netdev queue is also woken after resizing the ptr_ring.
>>
>> Co-developed-by: Tim Gebauer <tim.gebauer@...dortmund.de>
>> Signed-off-by: Tim Gebauer <tim.gebauer@...dortmund.de>
>> Signed-off-by: Simon Schippers <simon.schippers@...dortmund.de>
>> ---
>>  drivers/net/tap.c | 44 +++++++++++++++++++++++++++++++++++++++++++-
>>  drivers/net/tun.c | 47 +++++++++++++++++++++++++++++++++++++++++++++--
>>  2 files changed, 88 insertions(+), 3 deletions(-)
>>
>> diff --git a/drivers/net/tap.c b/drivers/net/tap.c
>> index 1197f245e873..f8292721a9d6 100644
>> --- a/drivers/net/tap.c
>> +++ b/drivers/net/tap.c
>> @@ -753,6 +753,46 @@ static ssize_t tap_put_user(struct tap_queue *q,
>>  	return ret ? ret : total;
>>  }
>>  
>> +static struct sk_buff *tap_ring_consume(struct tap_queue *q)
>> +{
>> +	struct netdev_queue *txq;
>> +	struct net_device *dev;
>> +	bool will_invalidate;
>> +	bool stopped;
>> +	void *ptr;
>> +
>> +	spin_lock(&q->ring.consumer_lock);
>> +	ptr = __ptr_ring_peek(&q->ring);
>> +	if (!ptr) {
>> +		spin_unlock(&q->ring.consumer_lock);
>> +		return ptr;
>> +	}
>> +
>> +	/* Check if the queue stopped before zeroing out, so no ptr get
>> +	 * produced in the meantime, because this could result in waking
>> +	 * even though the ptr_ring is full.
> 
> So what? Maybe it would be a bit suboptimal? But with your design, I do
> not get what prevents this:
> 
> 
> 	stopped? -> No
> 		ring is stopped
> 	discard
> 
> and queue stays stopped forever
> 

I think I found a solution to this problem, see below:

> 
>> The order of the operations
>> +	 * is ensured by barrier().
>> +	 */
>> +	will_invalidate = __ptr_ring_will_invalidate(&q->ring);
>> +	if (unlikely(will_invalidate)) {
>> +		rcu_read_lock();
>> +		dev = rcu_dereference(q->tap)->dev;
>> +		txq = netdev_get_tx_queue(dev, q->queue_index);
>> +		stopped = netif_tx_queue_stopped(txq);
>> +	}
>> +	barrier();
>> +	__ptr_ring_discard_one(&q->ring, will_invalidate);
>> +
>> +	if (unlikely(will_invalidate)) {

Here I just check for

	if (will_invalidate || __ptr_ring_empty(&q->ring)) {

instead because, if the ptr_ring is empty and the netdev queue stopped,
the race must have occurred. Then it is safe to wake the netdev queue,
because it is known that space in the ptr_ring was freed when the race
occurred. Also, it is guaranteed that tap_ring_consume is called at least
once after the race, because a new entry is generated by the producer at
the race.
In my adjusted implementation, it tests fine with pktgen without any lost
packets.


Generally now I think that the whole implementation can be fine without
using spinlocks at all. I am currently adjusting the implementation
regarding SMP memory barrier pairings, and I have a question:
In the v4 you mentioned "the stop -> wake bounce involves enough barriers
already". Does it, for instance, mean that netif_tx_wake_queue already
ensures memory ordering, and I do not have to use an smp_wmb() in front of
netif_tx_wake_queue() and smp_rmb() in front of the ptr_ring operations
in tun_net_xmit?
I dug through net/core/netdevice.h and dev.c but could not really
answer this question by myself...
Thanks :)

>> +		if (stopped)
>> +			netif_tx_wake_queue(txq);
>> +		rcu_read_unlock();
>> +	}
> 
> 
> After an entry is consumed, you can detect this by checking
> 
> 	                r->consumer_head >= r->consumer_tail
> 
> 
> so it seems you could keep calling regular ptr_ring_consume
> and check afterwards?
> 
> 
> 
> 
>> +	spin_unlock(&q->ring.consumer_lock);
>> +
>> +	return ptr;
>> +}
>> +
>>  static ssize_t tap_do_read(struct tap_queue *q,
>>  			   struct iov_iter *to,
>>  			   int noblock, struct sk_buff *skb)
>> @@ -774,7 +814,7 @@ static ssize_t tap_do_read(struct tap_queue *q,
>>  					TASK_INTERRUPTIBLE);
>>  
>>  		/* Read frames from the queue */
>> -		skb = ptr_ring_consume(&q->ring);
>> +		skb = tap_ring_consume(q);
>>  		if (skb)
>>  			break;
>>  		if (noblock) {
>> @@ -1207,6 +1247,8 @@ int tap_queue_resize(struct tap_dev *tap)
>>  	ret = ptr_ring_resize_multiple_bh(rings, n,
>>  					  dev->tx_queue_len, GFP_KERNEL,
>>  					  __skb_array_destroy_skb);
>> +	if (netif_running(dev))
>> +		netif_tx_wake_all_queues(dev);
>>  
>>  	kfree(rings);
>>  	return ret;
>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>> index c6b22af9bae8..682df8157b55 100644
>> --- a/drivers/net/tun.c
>> +++ b/drivers/net/tun.c
>> @@ -2114,13 +2114,53 @@ static ssize_t tun_put_user(struct tun_struct *tun,
>>  	return total;
>>  }
>>  
>> +static void *tun_ring_consume(struct tun_file *tfile)
>> +{
>> +	struct netdev_queue *txq;
>> +	struct net_device *dev;
>> +	bool will_invalidate;
>> +	bool stopped;
>> +	void *ptr;
>> +
>> +	spin_lock(&tfile->tx_ring.consumer_lock);
>> +	ptr = __ptr_ring_peek(&tfile->tx_ring);
>> +	if (!ptr) {
>> +		spin_unlock(&tfile->tx_ring.consumer_lock);
>> +		return ptr;
>> +	}
>> +
>> +	/* Check if the queue stopped before zeroing out, so no ptr get
>> +	 * produced in the meantime, because this could result in waking
>> +	 * even though the ptr_ring is full. The order of the operations
>> +	 * is ensured by barrier().
>> +	 */
>> +	will_invalidate = __ptr_ring_will_invalidate(&tfile->tx_ring);
>> +	if (unlikely(will_invalidate)) {
>> +		rcu_read_lock();
>> +		dev = rcu_dereference(tfile->tun)->dev;
>> +		txq = netdev_get_tx_queue(dev, tfile->queue_index);
>> +		stopped = netif_tx_queue_stopped(txq);
>> +	}
>> +	barrier();
>> +	__ptr_ring_discard_one(&tfile->tx_ring, will_invalidate);
>> +
>> +	if (unlikely(will_invalidate)) {
>> +		if (stopped)
>> +			netif_tx_wake_queue(txq);
>> +		rcu_read_unlock();
>> +	}
>> +	spin_unlock(&tfile->tx_ring.consumer_lock);
>> +
>> +	return ptr;
>> +}
>> +
>>  static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>  {
>>  	DECLARE_WAITQUEUE(wait, current);
>>  	void *ptr = NULL;
>>  	int error = 0;
>>  
>> -	ptr = ptr_ring_consume(&tfile->tx_ring);
>> +	ptr = tun_ring_consume(tfile);
>>  	if (ptr)
>>  		goto out;
>>  	if (noblock) {
>> @@ -2132,7 +2172,7 @@ static void *tun_ring_recv(struct tun_file *tfile, int noblock, int *err)
>>  
>>  	while (1) {
>>  		set_current_state(TASK_INTERRUPTIBLE);
>> -		ptr = ptr_ring_consume(&tfile->tx_ring);
>> +		ptr = tun_ring_consume(tfile);
>>  		if (ptr)
>>  			break;
>>  		if (signal_pending(current)) {
>> @@ -3621,6 +3661,9 @@ static int tun_queue_resize(struct tun_struct *tun)
>>  					  dev->tx_queue_len, GFP_KERNEL,
>>  					  tun_ptr_free);
>>  
>> +	if (netif_running(dev))
>> +		netif_tx_wake_all_queues(dev);
>> +
>>  	kfree(rings);
>>  	return ret;
>>  }
>> -- 
>> 2.43.0
> 

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ