lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite for Android: free password hash cracker in your pocket
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <57397C2B.7000603@redhat.com>
Date:	Mon, 16 May 2016 15:52:11 +0800
From:	Jason Wang <jasowang@...hat.com>
To:	"Michael S. Tsirkin" <mst@...hat.com>
Cc:	davem@...emloft.net, netdev@...r.kernel.org,
	linux-kernel@...r.kernel.org
Subject: Re: [PATCH net-next] tuntap: introduce tx skb ring



On 2016年05月16日 12:23, Michael S. Tsirkin wrote:
> On Mon, May 16, 2016 at 09:17:01AM +0800, Jason Wang wrote:
>> We used to queue tx packets in sk_receive_queue, this is less
>> efficient since it requires spinlocks to synchronize between producer
>> and consumer.
>>
>> This patch tries to address this by using circular buffer which allows
>> lockless synchronization. This is done by switching from
>> sk_receive_queue to a tx skb ring with a new flag IFF_TX_RING and when
>> this is set:
> Why do we need a new flag? Is there a userspace-visible
> behaviour change?

Probably yes since tx_queue_length does not work.

>
>> - store pointer to skb in circular buffer in tun_net_xmit(), and read
>>    it from the circular buffer in tun_do_read().
>> - introduce a new proto_ops peek which could be implemented by
>>    specific socket which does not use sk_receive_queue.
>> - store skb length in circular buffer too, and implement a lockless
>>    peek for tuntap.
>> - change vhost_net to use proto_ops->peek() instead
>> - new spinlocks were introduced to synchronize among producers (and so
>>    did for consumers).
>>
>> Pktgen test shows about 9% improvement on guest receiving pps:
>>
>> Before: ~1480000pps
>> After : ~1610000pps
>>
>> (I'm not sure noblocking read is still needed, so it was not included
>>   in this patch)
> How do you mean? Of course we must support blocking and non-blocking
> read - userspace uses it.

Ok, will add this.

>
>> Signed-off-by: Jason Wang <jasowang@...hat.com>
>> ---
>> ---
>>   drivers/net/tun.c           | 157 +++++++++++++++++++++++++++++++++++++++++---
>>   drivers/vhost/net.c         |  16 ++++-
>>   include/linux/net.h         |   1 +
>>   include/uapi/linux/if_tun.h |   1 +
>>   4 files changed, 165 insertions(+), 10 deletions(-)
>>
>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>> index 425e983..6001ece 100644
>> --- a/drivers/net/tun.c
>> +++ b/drivers/net/tun.c
>> @@ -71,6 +71,7 @@
>>   #include <net/sock.h>
>>   #include <linux/seq_file.h>
>>   #include <linux/uio.h>
>> +#include <linux/circ_buf.h>
>>   
>>   #include <asm/uaccess.h>
>>   
>> @@ -130,6 +131,8 @@ struct tap_filter {
>>   #define MAX_TAP_FLOWS  4096
>>   
>>   #define TUN_FLOW_EXPIRE (3 * HZ)
>> +#define TUN_RING_SIZE 256
> Can we resize this according to tx_queue_len set by user?

We can, but it needs lots of other changes, e.g being notified when 
tx_queue_len was changed by user. And if tx_queue_length is not power of 
2, we probably need modulus to calculate the capacity.

>
>> +#define TUN_RING_MASK (TUN_RING_SIZE - 1)
>>   
>>   struct tun_pcpu_stats {
>>   	u64 rx_packets;
>> @@ -142,6 +145,11 @@ struct tun_pcpu_stats {
>>   	u32 rx_frame_errors;
>>   };
>>   
>> +struct tun_desc {
>> +	struct sk_buff *skb;
>> +	int len; /* Cached skb len for peeking */
>> +};
>> +
>>   /* A tun_file connects an open character device to a tuntap netdevice. It
>>    * also contains all socket related structures (except sock_fprog and tap_filter)
>>    * to serve as one transmit queue for tuntap device. The sock_fprog and
>> @@ -167,6 +175,13 @@ struct tun_file {
>>   	};
>>   	struct list_head next;
>>   	struct tun_struct *detached;
>> +	/* reader lock */
>> +	spinlock_t rlock;
>> +	unsigned long tail;
>> +	struct tun_desc tx_descs[TUN_RING_SIZE];
>> +	/* writer lock */
>> +	spinlock_t wlock;
>> +	unsigned long head;
>>   };
>>   
>>   struct tun_flow_entry {
>> @@ -515,7 +530,27 @@ static struct tun_struct *tun_enable_queue(struct tun_file *tfile)
>>   
>>   static void tun_queue_purge(struct tun_file *tfile)
>>   {
>> +	unsigned long head, tail;
>> +	struct tun_desc *desc;
>> +	struct sk_buff *skb;
>>   	skb_queue_purge(&tfile->sk.sk_receive_queue);
>> +	spin_lock(&tfile->rlock);
>> +
>> +	head = ACCESS_ONCE(tfile->head);
>> +	tail = tfile->tail;
>> +
>> +	/* read tail before reading descriptor at tail */
>> +	smp_rmb();
> I think you mean read *head* here

Right.

>
>
>> +
>> +	while (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
>> +		desc = &tfile->tx_descs[tail];
>> +		skb = desc->skb;
>> +		kfree_skb(skb);
>> +		tail = (tail + 1) & TUN_RING_MASK;
>> +		/* read descriptor before incrementing tail. */
>> +		smp_store_release(&tfile->tail, tail & TUN_RING_MASK);
>> +	}
>> +	spin_unlock(&tfile->rlock);
>>   	skb_queue_purge(&tfile->sk.sk_error_queue);
>>   }
>>
> Barrier pairing seems messed up. Could you tag
> each barrier with its pair pls?
> E.g. add /* Barrier A for pairing */ Before barrier and
> its pair.

Ok.

for both tun_queue_purge() and tun_do_read():

smp_rmb() is paired with smp_store_release() in tun_net_xmit().
smp_store_release() is paired with spin_unlock()/spin_lock() in 
tun_net_xmit().

>    
>> @@ -824,6 +859,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>>   	int txq = skb->queue_mapping;
>>   	struct tun_file *tfile;
>>   	u32 numqueues = 0;
>> +	unsigned long flags;
>>   
>>   	rcu_read_lock();
>>   	tfile = rcu_dereference(tun->tfiles[txq]);
>> @@ -888,8 +924,35 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>>   
>>   	nf_reset(skb);
>>   
>> -	/* Enqueue packet */
>> -	skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
>> +	if (tun->flags & IFF_TX_RING) {
>> +		unsigned long head, tail;
>> +
>> +		spin_lock_irqsave(&tfile->wlock, flags);
>> +
>> +		head = tfile->head;
>> +		tail = ACCESS_ONCE(tfile->tail);
> this should be acquire

Consider there's always one slot left empty, so we need to produced two 
skbs here before we could corrupt consumer. So the 
spin_unlock()/spin_lock() provides ordering here?

>
>> +
>> +		if (CIRC_SPACE(head, tail, TUN_RING_SIZE) >= 1) {
>> +			struct tun_desc *desc = &tfile->tx_descs[head];
>> +
>> +			desc->skb = skb;
>> +			desc->len = skb->len;
>> +			if (skb_vlan_tag_present(skb))
>> +				desc->len += VLAN_HLEN;
>> +
>> +			/* read descriptor before incrementing head. */
> write descriptor.

Right.

>   so smp_wmb is enough.

I thought smp_store_release() was more lightweight than smp_wmb()?

>
>> +			smp_store_release(&tfile->head,
>> +					  (head + 1) & TUN_RING_MASK);
>> +		} else {
>> +			spin_unlock_irqrestore(&tfile->wlock, flags);
>> +			goto drop;
>> +		}
>> +
>> +		spin_unlock_irqrestore(&tfile->wlock, flags);
>> +	} else {
>> +		/* Enqueue packet */
>> +		skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
>> +	}
>>   
>>   	/* Notify and wake up reader process */
>>   	if (tfile->flags & TUN_FASYNC)
>> @@ -1085,6 +1148,13 @@ static void tun_net_init(struct net_device *dev)
>>   	}
>>   }
>>   
>> +static bool tun_queue_not_empty(struct tun_file *tfile)
>> +{
>> +	struct sock *sk = tfile->socket.sk;
>> +
>> +	return (!skb_queue_empty(&sk->sk_receive_queue) ||
>> +		ACCESS_ONCE(tfile->head) != ACCESS_ONCE(tfile->tail));
>> +}
>>   /* Character device part */
>>   
>>   /* Poll */
>> @@ -1104,7 +1174,7 @@ static unsigned int tun_chr_poll(struct file *file, poll_table *wait)
>>   
>>   	poll_wait(file, sk_sleep(sk), wait);
>>   
>> -	if (!skb_queue_empty(&sk->sk_receive_queue))
>> +	if (tun_queue_not_empty(tfile))
>>   		mask |= POLLIN | POLLRDNORM;
>>   
>>   	if (sock_writeable(sk) ||
>> @@ -1494,11 +1564,36 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
>>   	if (tun->dev->reg_state != NETREG_REGISTERED)
>>   		return -EIO;
>>   
>> -	/* Read frames from queue */
>> -	skb = __skb_recv_datagram(tfile->socket.sk, noblock ? MSG_DONTWAIT : 0,
>> -				  &peeked, &off, &err);
>> -	if (!skb)
>> -		return err;
>> +	if (tun->flags & IFF_TX_RING) {
>> +		unsigned long head, tail;
>> +		struct tun_desc *desc;
>> +
>> +		spin_lock(&tfile->rlock);
>> +		head = ACCESS_ONCE(tfile->head);
>> +		tail = tfile->tail;
>> +
>> +		if (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
>> +			/* read tail before reading descriptor at tail */
>> +			smp_rmb();
>> +			desc = &tfile->tx_descs[tail];
>> +			skb = desc->skb;
>> +			/* read descriptor before incrementing tail. */
>> +			smp_store_release(&tfile->tail,
>> +					  (tail + 1) & TUN_RING_MASK);
> same comments as purge, also - pls order code consistently.

Ok.

>
>> +		} else {
>> +			spin_unlock(&tfile->rlock);
>> +			return -EAGAIN;
>> +		}
>> +
>> +		spin_unlock(&tfile->rlock);
>> +	} else {
>> +		/* Read frames from queue */
>> +		skb = __skb_recv_datagram(tfile->socket.sk,
>> +					  noblock ? MSG_DONTWAIT : 0,
>> +					  &peeked, &off, &err);
>> +		if (!skb)
>> +			return err;
>> +	}
>>   
>>   	ret = tun_put_user(tun, tfile, skb, to);
>>   	if (unlikely(ret < 0))
>> @@ -1629,8 +1724,47 @@ out:
>>   	return ret;
>>   }
>>   
>> +static int tun_peek(struct socket *sock, bool exact)
>> +{
>> +	struct tun_file *tfile = container_of(sock, struct tun_file, socket);
>> +	struct sock *sk = sock->sk;
>> +	struct tun_struct *tun;
>> +	int ret = 0;
>> +
>> +	if (!exact)
>> +		return tun_queue_not_empty(tfile);
>> +
>> +	tun = __tun_get(tfile);
>> +	if (!tun)
>> +		return 0;
>> +
>> +	if (tun->flags & IFF_TX_RING) {
>> +		unsigned long head = ACCESS_ONCE(tfile->head);
>> +		unsigned long tail = ACCESS_ONCE(tfile->tail);
>> +
>> +		if (head != tail)
>> +			ret = tfile->tx_descs[tail].len;
> no ordering at all here?

Seems yes, we can't be accurate if there's are more consumers.

>
>> +	} else {
>> +		struct sk_buff *head;
>> +		unsigned long flags;
>> +
>> +		spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
>> +		head = skb_peek(&sk->sk_receive_queue);
>> +		if (likely(head)) {
>> +			ret = head->len;
>> +			if (skb_vlan_tag_present(head))
>> +				ret += VLAN_HLEN;
>> +		}
>> +		spin_unlock_irqrestore(&sk->sk_receive_queue.lock, flags);
>> +	}
>> +
>> +	tun_put(tun);
>> +	return ret;
>> +}
>> +
>>   /* Ops structure to mimic raw sockets with tun */
>>   static const struct proto_ops tun_socket_ops = {
>> +	.peek    = tun_peek,
>>   	.sendmsg = tun_sendmsg,
>>   	.recvmsg = tun_recvmsg,
>>   };
>> @@ -2306,13 +2440,13 @@ static int tun_chr_open(struct inode *inode, struct file * file)
>>   {
>>   	struct net *net = current->nsproxy->net_ns;
>>   	struct tun_file *tfile;
>> -
>>   	DBG1(KERN_INFO, "tunX: tun_chr_open\n");
>>   
>>   	tfile = (struct tun_file *)sk_alloc(net, AF_UNSPEC, GFP_KERNEL,
>>   					    &tun_proto, 0);
>>   	if (!tfile)
>>   		return -ENOMEM;
>> +
>>   	RCU_INIT_POINTER(tfile->tun, NULL);
>>   	tfile->flags = 0;
>>   	tfile->ifindex = 0;
>> @@ -2332,6 +2466,11 @@ static int tun_chr_open(struct inode *inode, struct file * file)
>>   	INIT_LIST_HEAD(&tfile->next);
>>   
>>   	sock_set_flag(&tfile->sk, SOCK_ZEROCOPY);
>> +	tfile->head = 0;
>> +	tfile->tail = 0;
>> +
>> +	spin_lock_init(&tfile->rlock);
>> +	spin_lock_init(&tfile->wlock);
>>   
>>   	return 0;
>>   }
>> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
>> index f744eeb..10ff494 100644
>> --- a/drivers/vhost/net.c
>> +++ b/drivers/vhost/net.c
>> @@ -455,10 +455,14 @@ out:
>>   
>>   static int peek_head_len(struct sock *sk)
>>   {
>> +	struct socket *sock = sk->sk_socket;
>>   	struct sk_buff *head;
>>   	int len = 0;
>>   	unsigned long flags;
>>   
>> +	if (sock->ops->peek)
>> +		return sock->ops->peek(sock, true);
>> +
>>   	spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
>>   	head = skb_peek(&sk->sk_receive_queue);
>>   	if (likely(head)) {
>> @@ -471,6 +475,16 @@ static int peek_head_len(struct sock *sk)
>>   	return len;
>>   }
>>   
>> +static int sk_has_rx_data(struct sock *sk)
>> +{
>> +	struct socket *sock = sk->sk_socket;
>> +
>> +	if (sock->ops->peek)
>> +		return sock->ops->peek(sock, false);
>> +
>> +	return skb_queue_empty(&sk->sk_receive_queue);
>> +}
>> +
>>   static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
>>   {
>>   	struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
>> @@ -487,7 +501,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
>>   		endtime = busy_clock() + vq->busyloop_timeout;
>>   
>>   		while (vhost_can_busy_poll(&net->dev, endtime) &&
>> -		       skb_queue_empty(&sk->sk_receive_queue) &&
>> +		       !sk_has_rx_data(sk) &&
>>   		       vhost_vq_avail_empty(&net->dev, vq))
>>   			cpu_relax_lowlatency();
>>   
>> diff --git a/include/linux/net.h b/include/linux/net.h
>> index 72c1e06..3c4ecd5 100644
>> --- a/include/linux/net.h
>> +++ b/include/linux/net.h
>> @@ -132,6 +132,7 @@ struct module;
>>   struct proto_ops {
>>   	int		family;
>>   	struct module	*owner;
>> +	int		(*peek) (struct socket *sock, bool exact);
>>   	int		(*release)   (struct socket *sock);
>>   	int		(*bind)	     (struct socket *sock,
>>   				      struct sockaddr *myaddr,
>> diff --git a/include/uapi/linux/if_tun.h b/include/uapi/linux/if_tun.h
>> index 3cb5e1d..d64ddc1 100644
>> --- a/include/uapi/linux/if_tun.h
>> +++ b/include/uapi/linux/if_tun.h
>> @@ -61,6 +61,7 @@
>>   #define IFF_TUN		0x0001
>>   #define IFF_TAP		0x0002
>>   #define IFF_NO_PI	0x1000
>> +#define IFF_TX_RING	0x0010
>>   /* This flag has no real effect */
>>   #define IFF_ONE_QUEUE	0x2000
>>   #define IFF_VNET_HDR	0x4000
>> -- 
>> 2.7.4

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ