[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <57397C2B.7000603@redhat.com>
Date: Mon, 16 May 2016 15:52:11 +0800
From: Jason Wang <jasowang@...hat.com>
To: "Michael S. Tsirkin" <mst@...hat.com>
Cc: davem@...emloft.net, netdev@...r.kernel.org,
linux-kernel@...r.kernel.org
Subject: Re: [PATCH net-next] tuntap: introduce tx skb ring
On 2016年05月16日 12:23, Michael S. Tsirkin wrote:
> On Mon, May 16, 2016 at 09:17:01AM +0800, Jason Wang wrote:
>> We used to queue tx packets in sk_receive_queue, this is less
>> efficient since it requires spinlocks to synchronize between producer
>> and consumer.
>>
>> This patch tries to address this by using circular buffer which allows
>> lockless synchronization. This is done by switching from
>> sk_receive_queue to a tx skb ring with a new flag IFF_TX_RING and when
>> this is set:
> Why do we need a new flag? Is there a userspace-visible
> behaviour change?
Probably yes since tx_queue_length does not work.
>
>> - store pointer to skb in circular buffer in tun_net_xmit(), and read
>> it from the circular buffer in tun_do_read().
>> - introduce a new proto_ops peek which could be implemented by
>> specific socket which does not use sk_receive_queue.
>> - store skb length in circular buffer too, and implement a lockless
>> peek for tuntap.
>> - change vhost_net to use proto_ops->peek() instead
>> - new spinlocks were introduced to synchronize among producers (and so
>> did for consumers).
>>
>> Pktgen test shows about 9% improvement on guest receiving pps:
>>
>> Before: ~1480000pps
>> After : ~1610000pps
>>
>> (I'm not sure noblocking read is still needed, so it was not included
>> in this patch)
> How do you mean? Of course we must support blocking and non-blocking
> read - userspace uses it.
Ok, will add this.
>
>> Signed-off-by: Jason Wang <jasowang@...hat.com>
>> ---
>> ---
>> drivers/net/tun.c | 157 +++++++++++++++++++++++++++++++++++++++++---
>> drivers/vhost/net.c | 16 ++++-
>> include/linux/net.h | 1 +
>> include/uapi/linux/if_tun.h | 1 +
>> 4 files changed, 165 insertions(+), 10 deletions(-)
>>
>> diff --git a/drivers/net/tun.c b/drivers/net/tun.c
>> index 425e983..6001ece 100644
>> --- a/drivers/net/tun.c
>> +++ b/drivers/net/tun.c
>> @@ -71,6 +71,7 @@
>> #include <net/sock.h>
>> #include <linux/seq_file.h>
>> #include <linux/uio.h>
>> +#include <linux/circ_buf.h>
>>
>> #include <asm/uaccess.h>
>>
>> @@ -130,6 +131,8 @@ struct tap_filter {
>> #define MAX_TAP_FLOWS 4096
>>
>> #define TUN_FLOW_EXPIRE (3 * HZ)
>> +#define TUN_RING_SIZE 256
> Can we resize this according to tx_queue_len set by user?
We can, but it needs lots of other changes, e.g being notified when
tx_queue_len was changed by user. And if tx_queue_length is not power of
2, we probably need modulus to calculate the capacity.
>
>> +#define TUN_RING_MASK (TUN_RING_SIZE - 1)
>>
>> struct tun_pcpu_stats {
>> u64 rx_packets;
>> @@ -142,6 +145,11 @@ struct tun_pcpu_stats {
>> u32 rx_frame_errors;
>> };
>>
>> +struct tun_desc {
>> + struct sk_buff *skb;
>> + int len; /* Cached skb len for peeking */
>> +};
>> +
>> /* A tun_file connects an open character device to a tuntap netdevice. It
>> * also contains all socket related structures (except sock_fprog and tap_filter)
>> * to serve as one transmit queue for tuntap device. The sock_fprog and
>> @@ -167,6 +175,13 @@ struct tun_file {
>> };
>> struct list_head next;
>> struct tun_struct *detached;
>> + /* reader lock */
>> + spinlock_t rlock;
>> + unsigned long tail;
>> + struct tun_desc tx_descs[TUN_RING_SIZE];
>> + /* writer lock */
>> + spinlock_t wlock;
>> + unsigned long head;
>> };
>>
>> struct tun_flow_entry {
>> @@ -515,7 +530,27 @@ static struct tun_struct *tun_enable_queue(struct tun_file *tfile)
>>
>> static void tun_queue_purge(struct tun_file *tfile)
>> {
>> + unsigned long head, tail;
>> + struct tun_desc *desc;
>> + struct sk_buff *skb;
>> skb_queue_purge(&tfile->sk.sk_receive_queue);
>> + spin_lock(&tfile->rlock);
>> +
>> + head = ACCESS_ONCE(tfile->head);
>> + tail = tfile->tail;
>> +
>> + /* read tail before reading descriptor at tail */
>> + smp_rmb();
> I think you mean read *head* here
Right.
>
>
>> +
>> + while (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
>> + desc = &tfile->tx_descs[tail];
>> + skb = desc->skb;
>> + kfree_skb(skb);
>> + tail = (tail + 1) & TUN_RING_MASK;
>> + /* read descriptor before incrementing tail. */
>> + smp_store_release(&tfile->tail, tail & TUN_RING_MASK);
>> + }
>> + spin_unlock(&tfile->rlock);
>> skb_queue_purge(&tfile->sk.sk_error_queue);
>> }
>>
> Barrier pairing seems messed up. Could you tag
> each barrier with its pair pls?
> E.g. add /* Barrier A for pairing */ Before barrier and
> its pair.
Ok.
for both tun_queue_purge() and tun_do_read():
smp_rmb() is paired with smp_store_release() in tun_net_xmit().
smp_store_release() is paired with spin_unlock()/spin_lock() in
tun_net_xmit().
>
>> @@ -824,6 +859,7 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>> int txq = skb->queue_mapping;
>> struct tun_file *tfile;
>> u32 numqueues = 0;
>> + unsigned long flags;
>>
>> rcu_read_lock();
>> tfile = rcu_dereference(tun->tfiles[txq]);
>> @@ -888,8 +924,35 @@ static netdev_tx_t tun_net_xmit(struct sk_buff *skb, struct net_device *dev)
>>
>> nf_reset(skb);
>>
>> - /* Enqueue packet */
>> - skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
>> + if (tun->flags & IFF_TX_RING) {
>> + unsigned long head, tail;
>> +
>> + spin_lock_irqsave(&tfile->wlock, flags);
>> +
>> + head = tfile->head;
>> + tail = ACCESS_ONCE(tfile->tail);
> this should be acquire
Consider there's always one slot left empty, so we need to produced two
skbs here before we could corrupt consumer. So the
spin_unlock()/spin_lock() provides ordering here?
>
>> +
>> + if (CIRC_SPACE(head, tail, TUN_RING_SIZE) >= 1) {
>> + struct tun_desc *desc = &tfile->tx_descs[head];
>> +
>> + desc->skb = skb;
>> + desc->len = skb->len;
>> + if (skb_vlan_tag_present(skb))
>> + desc->len += VLAN_HLEN;
>> +
>> + /* read descriptor before incrementing head. */
> write descriptor.
Right.
> so smp_wmb is enough.
I thought smp_store_release() was more lightweight than smp_wmb()?
>
>> + smp_store_release(&tfile->head,
>> + (head + 1) & TUN_RING_MASK);
>> + } else {
>> + spin_unlock_irqrestore(&tfile->wlock, flags);
>> + goto drop;
>> + }
>> +
>> + spin_unlock_irqrestore(&tfile->wlock, flags);
>> + } else {
>> + /* Enqueue packet */
>> + skb_queue_tail(&tfile->socket.sk->sk_receive_queue, skb);
>> + }
>>
>> /* Notify and wake up reader process */
>> if (tfile->flags & TUN_FASYNC)
>> @@ -1085,6 +1148,13 @@ static void tun_net_init(struct net_device *dev)
>> }
>> }
>>
>> +static bool tun_queue_not_empty(struct tun_file *tfile)
>> +{
>> + struct sock *sk = tfile->socket.sk;
>> +
>> + return (!skb_queue_empty(&sk->sk_receive_queue) ||
>> + ACCESS_ONCE(tfile->head) != ACCESS_ONCE(tfile->tail));
>> +}
>> /* Character device part */
>>
>> /* Poll */
>> @@ -1104,7 +1174,7 @@ static unsigned int tun_chr_poll(struct file *file, poll_table *wait)
>>
>> poll_wait(file, sk_sleep(sk), wait);
>>
>> - if (!skb_queue_empty(&sk->sk_receive_queue))
>> + if (tun_queue_not_empty(tfile))
>> mask |= POLLIN | POLLRDNORM;
>>
>> if (sock_writeable(sk) ||
>> @@ -1494,11 +1564,36 @@ static ssize_t tun_do_read(struct tun_struct *tun, struct tun_file *tfile,
>> if (tun->dev->reg_state != NETREG_REGISTERED)
>> return -EIO;
>>
>> - /* Read frames from queue */
>> - skb = __skb_recv_datagram(tfile->socket.sk, noblock ? MSG_DONTWAIT : 0,
>> - &peeked, &off, &err);
>> - if (!skb)
>> - return err;
>> + if (tun->flags & IFF_TX_RING) {
>> + unsigned long head, tail;
>> + struct tun_desc *desc;
>> +
>> + spin_lock(&tfile->rlock);
>> + head = ACCESS_ONCE(tfile->head);
>> + tail = tfile->tail;
>> +
>> + if (CIRC_CNT(head, tail, TUN_RING_SIZE) >= 1) {
>> + /* read tail before reading descriptor at tail */
>> + smp_rmb();
>> + desc = &tfile->tx_descs[tail];
>> + skb = desc->skb;
>> + /* read descriptor before incrementing tail. */
>> + smp_store_release(&tfile->tail,
>> + (tail + 1) & TUN_RING_MASK);
> same comments as purge, also - pls order code consistently.
Ok.
>
>> + } else {
>> + spin_unlock(&tfile->rlock);
>> + return -EAGAIN;
>> + }
>> +
>> + spin_unlock(&tfile->rlock);
>> + } else {
>> + /* Read frames from queue */
>> + skb = __skb_recv_datagram(tfile->socket.sk,
>> + noblock ? MSG_DONTWAIT : 0,
>> + &peeked, &off, &err);
>> + if (!skb)
>> + return err;
>> + }
>>
>> ret = tun_put_user(tun, tfile, skb, to);
>> if (unlikely(ret < 0))
>> @@ -1629,8 +1724,47 @@ out:
>> return ret;
>> }
>>
>> +static int tun_peek(struct socket *sock, bool exact)
>> +{
>> + struct tun_file *tfile = container_of(sock, struct tun_file, socket);
>> + struct sock *sk = sock->sk;
>> + struct tun_struct *tun;
>> + int ret = 0;
>> +
>> + if (!exact)
>> + return tun_queue_not_empty(tfile);
>> +
>> + tun = __tun_get(tfile);
>> + if (!tun)
>> + return 0;
>> +
>> + if (tun->flags & IFF_TX_RING) {
>> + unsigned long head = ACCESS_ONCE(tfile->head);
>> + unsigned long tail = ACCESS_ONCE(tfile->tail);
>> +
>> + if (head != tail)
>> + ret = tfile->tx_descs[tail].len;
> no ordering at all here?
Seems yes, we can't be accurate if there's are more consumers.
>
>> + } else {
>> + struct sk_buff *head;
>> + unsigned long flags;
>> +
>> + spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
>> + head = skb_peek(&sk->sk_receive_queue);
>> + if (likely(head)) {
>> + ret = head->len;
>> + if (skb_vlan_tag_present(head))
>> + ret += VLAN_HLEN;
>> + }
>> + spin_unlock_irqrestore(&sk->sk_receive_queue.lock, flags);
>> + }
>> +
>> + tun_put(tun);
>> + return ret;
>> +}
>> +
>> /* Ops structure to mimic raw sockets with tun */
>> static const struct proto_ops tun_socket_ops = {
>> + .peek = tun_peek,
>> .sendmsg = tun_sendmsg,
>> .recvmsg = tun_recvmsg,
>> };
>> @@ -2306,13 +2440,13 @@ static int tun_chr_open(struct inode *inode, struct file * file)
>> {
>> struct net *net = current->nsproxy->net_ns;
>> struct tun_file *tfile;
>> -
>> DBG1(KERN_INFO, "tunX: tun_chr_open\n");
>>
>> tfile = (struct tun_file *)sk_alloc(net, AF_UNSPEC, GFP_KERNEL,
>> &tun_proto, 0);
>> if (!tfile)
>> return -ENOMEM;
>> +
>> RCU_INIT_POINTER(tfile->tun, NULL);
>> tfile->flags = 0;
>> tfile->ifindex = 0;
>> @@ -2332,6 +2466,11 @@ static int tun_chr_open(struct inode *inode, struct file * file)
>> INIT_LIST_HEAD(&tfile->next);
>>
>> sock_set_flag(&tfile->sk, SOCK_ZEROCOPY);
>> + tfile->head = 0;
>> + tfile->tail = 0;
>> +
>> + spin_lock_init(&tfile->rlock);
>> + spin_lock_init(&tfile->wlock);
>>
>> return 0;
>> }
>> diff --git a/drivers/vhost/net.c b/drivers/vhost/net.c
>> index f744eeb..10ff494 100644
>> --- a/drivers/vhost/net.c
>> +++ b/drivers/vhost/net.c
>> @@ -455,10 +455,14 @@ out:
>>
>> static int peek_head_len(struct sock *sk)
>> {
>> + struct socket *sock = sk->sk_socket;
>> struct sk_buff *head;
>> int len = 0;
>> unsigned long flags;
>>
>> + if (sock->ops->peek)
>> + return sock->ops->peek(sock, true);
>> +
>> spin_lock_irqsave(&sk->sk_receive_queue.lock, flags);
>> head = skb_peek(&sk->sk_receive_queue);
>> if (likely(head)) {
>> @@ -471,6 +475,16 @@ static int peek_head_len(struct sock *sk)
>> return len;
>> }
>>
>> +static int sk_has_rx_data(struct sock *sk)
>> +{
>> + struct socket *sock = sk->sk_socket;
>> +
>> + if (sock->ops->peek)
>> + return sock->ops->peek(sock, false);
>> +
>> + return skb_queue_empty(&sk->sk_receive_queue);
>> +}
>> +
>> static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
>> {
>> struct vhost_net_virtqueue *nvq = &net->vqs[VHOST_NET_VQ_TX];
>> @@ -487,7 +501,7 @@ static int vhost_net_rx_peek_head_len(struct vhost_net *net, struct sock *sk)
>> endtime = busy_clock() + vq->busyloop_timeout;
>>
>> while (vhost_can_busy_poll(&net->dev, endtime) &&
>> - skb_queue_empty(&sk->sk_receive_queue) &&
>> + !sk_has_rx_data(sk) &&
>> vhost_vq_avail_empty(&net->dev, vq))
>> cpu_relax_lowlatency();
>>
>> diff --git a/include/linux/net.h b/include/linux/net.h
>> index 72c1e06..3c4ecd5 100644
>> --- a/include/linux/net.h
>> +++ b/include/linux/net.h
>> @@ -132,6 +132,7 @@ struct module;
>> struct proto_ops {
>> int family;
>> struct module *owner;
>> + int (*peek) (struct socket *sock, bool exact);
>> int (*release) (struct socket *sock);
>> int (*bind) (struct socket *sock,
>> struct sockaddr *myaddr,
>> diff --git a/include/uapi/linux/if_tun.h b/include/uapi/linux/if_tun.h
>> index 3cb5e1d..d64ddc1 100644
>> --- a/include/uapi/linux/if_tun.h
>> +++ b/include/uapi/linux/if_tun.h
>> @@ -61,6 +61,7 @@
>> #define IFF_TUN 0x0001
>> #define IFF_TAP 0x0002
>> #define IFF_NO_PI 0x1000
>> +#define IFF_TX_RING 0x0010
>> /* This flag has no real effect */
>> #define IFF_ONE_QUEUE 0x2000
>> #define IFF_VNET_HDR 0x4000
>> --
>> 2.7.4
Powered by blists - more mailing lists