lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [day] [month] [year] [list]
Message-ID: <95556bf6-9f78-670e-0f36-1de1e955c7ae@redhat.com>
Date:   Fri, 16 Apr 2021 13:35:33 +0800
From:   Jason Wang <jasowang@...hat.com>
To:     Xuan Zhuo <xuanzhuo@...ux.alibaba.com>
Cc:     "Michael S. Tsirkin" <mst@...hat.com>,
        "David S. Miller" <davem@...emloft.net>,
        Jakub Kicinski <kuba@...nel.org>,
        Björn Töpel <bjorn@...nel.org>,
        Magnus Karlsson <magnus.karlsson@...el.com>,
        Jonathan Lemon <jonathan.lemon@...il.com>,
        Alexei Starovoitov <ast@...nel.org>,
        Daniel Borkmann <daniel@...earbox.net>,
        Jesper Dangaard Brouer <hawk@...nel.org>,
        John Fastabend <john.fastabend@...il.com>,
        virtualization@...ts.linux-foundation.org, bpf@...r.kernel.org,
        "dust.li" <dust.li@...ux.alibaba.com>, netdev@...r.kernel.org
Subject: Re: [PATCH net-next v4 09/10] virtio-net: xsk zero copy xmit
 implement wakeup and xmit


在 2021/4/15 下午6:27, Xuan Zhuo 写道:
> On Wed, 14 Apr 2021 13:46:45 +0800, Jason Wang <jasowang@...hat.com> wrote:
>> 在 2021/4/13 上午11:15, Xuan Zhuo 写道:
>>> This patch implements the core part of xsk zerocopy xmit.
>>>
>>> When the user calls sendto to consume the data in the xsk tx queue,
>>> virtnet_xsk_wakeup() will be called.
>>>
>>> In wakeup, it will try to send a part of the data directly. There are
>>> two purposes for this realization:
>>>
>>> 1. Send part of the data quickly to reduce the transmission delay of the
>>>      first packet.
>>> 2. Trigger tx interrupt, start napi to consume xsk tx data.
>>>
>>> All sent xsk packets share the virtio-net header of xsk_hdr. If xsk
>>> needs to support csum and other functions later, consider assigning xsk
>>> hdr separately for each sent packet.
>>>
>>> There are now three situations in free_old_xmit(): skb, xdp frame, xsk
>>> desc.  Based on the last two bit of ptr returned by virtqueue_get_buf():
>>>       00 is skb by default.
>>>       01 represents the packet sent by xdp
>>>       10 is the packet sent by xsk
>>>
>>> If the xmit work of xsk has not been completed, but the ring is full,
>>> napi must first exit and wait for the ring to be available, so
>>> need_wakeup() is set. If free_old_xmit() is called first by start_xmit(),
>>> we can quickly wake up napi to execute xsk xmit task.
>>>
>>> When recycling, we need to count the number of bytes sent, so put xsk
>>> desc->len into the ptr pointer. Because ptr does not point to meaningful
>>> objects in xsk.
>>>
>>> Signed-off-by: Xuan Zhuo <xuanzhuo@...ux.alibaba.com>
>>> Reviewed-by: Dust Li <dust.li@...ux.alibaba.com>
>>> ---
>>>    drivers/net/virtio_net.c | 296 ++++++++++++++++++++++++++++++++++++++-
>>>    1 file changed, 292 insertions(+), 4 deletions(-)
>>>
>>> diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
>>> index 8242a9e9f17d..c441d6bf1510 100644
>>> --- a/drivers/net/virtio_net.c
>>> +++ b/drivers/net/virtio_net.c
>>> @@ -46,6 +46,11 @@ module_param(napi_tx, bool, 0644);
>>>    #define VIRTIO_XDP_REDIR	BIT(1)
>>>
>>>    #define VIRTIO_XDP_FLAG	BIT(0)
>>> +#define VIRTIO_XSK_FLAG	BIT(1)
>>> +
>>> +#define VIRTIO_XSK_PTR_SHIFT       4
>>> +
>>> +static struct virtio_net_hdr_mrg_rxbuf xsk_hdr;
>>>
>>>    /* RX packet size EWMA. The average packet size is used to determine the packet
>>>     * buffer size when refilling RX rings. As the entire RX ring may be refilled
>>> @@ -138,6 +143,12 @@ struct send_queue {
>>>    	struct {
>>>    		/* xsk pool */
>>>    		struct xsk_buff_pool __rcu *pool;
>>> +
>>> +		/* save the desc for next xmit, when xmit fail. */
>>> +		struct xdp_desc last_desc;
>>
>> As replied in the pervious version this looks tricky. I think we need to
>> make sure to reserve some slots as skb path did.
>>
>> This looks exactly like what stmmac did which alos shares XDP and skb
>> for the same ring.
>>
>>
>>> +
>>> +		/* xsk wait for tx inter or softirq */
>>> +		bool need_wakeup;
>>>    	} xsk;
>>>    };
>>>
>>> @@ -255,6 +266,15 @@ struct padded_vnet_hdr {
>>>    	char padding[4];
>>>    };
>>>
>>> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
>>> +			   int budget, bool in_napi);
>>> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num);
>>> +
>>> +static bool is_skb_ptr(void *ptr)
>>> +{
>>> +	return !((unsigned long)ptr & (VIRTIO_XDP_FLAG | VIRTIO_XSK_FLAG));
>>> +}
>>> +
>>>    static bool is_xdp_frame(void *ptr)
>>>    {
>>>    	return (unsigned long)ptr & VIRTIO_XDP_FLAG;
>>> @@ -265,6 +285,19 @@ static void *xdp_to_ptr(struct xdp_frame *ptr)
>>>    	return (void *)((unsigned long)ptr | VIRTIO_XDP_FLAG);
>>>    }
>>>
>>> +static void *xsk_to_ptr(struct xdp_desc *desc)
>>> +{
>>> +	/* save the desc len to ptr */
>>> +	u64 p = desc->len << VIRTIO_XSK_PTR_SHIFT;
>>> +
>>> +	return (void *)((unsigned long)p | VIRTIO_XSK_FLAG);
>>> +}
>>> +
>>> +static void ptr_to_xsk(void *ptr, struct xdp_desc *desc)
>>> +{
>>> +	desc->len = ((unsigned long)ptr) >> VIRTIO_XSK_PTR_SHIFT;
>>> +}
>>> +
>>>    static struct xdp_frame *ptr_to_xdp(void *ptr)
>>>    {
>>>    	return (struct xdp_frame *)((unsigned long)ptr & ~VIRTIO_XDP_FLAG);
>>> @@ -273,25 +306,35 @@ static struct xdp_frame *ptr_to_xdp(void *ptr)
>>>    static void __free_old_xmit(struct send_queue *sq, bool in_napi,
>>>    			    struct virtnet_sq_stats *stats)
>>>    {
>>> +	unsigned int xsknum = 0;
>>>    	unsigned int len;
>>>    	void *ptr;
>>>
>>>    	while ((ptr = virtqueue_get_buf(sq->vq, &len)) != NULL) {
>>> -		if (likely(!is_xdp_frame(ptr))) {
>>> +		if (is_skb_ptr(ptr)) {
>>>    			struct sk_buff *skb = ptr;
>>>
>>>    			pr_debug("Sent skb %p\n", skb);
>>>
>>>    			stats->bytes += skb->len;
>>>    			napi_consume_skb(skb, in_napi);
>>> -		} else {
>>> +		} else if (is_xdp_frame(ptr)) {
>>>    			struct xdp_frame *frame = ptr_to_xdp(ptr);
>>>
>>>    			stats->bytes += frame->len;
>>>    			xdp_return_frame(frame);
>>> +		} else {
>>> +			struct xdp_desc desc;
>>> +
>>> +			ptr_to_xsk(ptr, &desc);
>>> +			stats->bytes += desc.len;
>>> +			++xsknum;
>>>    		}
>>>    		stats->packets++;
>>>    	}
>>> +
>>> +	if (xsknum)
>>> +		virtnet_xsk_complete(sq, xsknum);
>>>    }
>>>
>>>    /* Converting between virtqueue no. and kernel tx/rx queue no.
>>> @@ -1529,6 +1572,19 @@ static int virtnet_open(struct net_device *dev)
>>>    	return 0;
>>>    }
>>>
>>> +static int virtnet_poll_xsk(struct send_queue *sq, int budget)
>>> +{
>>> +	struct xsk_buff_pool *pool;
>>> +	int work_done = 0;
>>> +
>>> +	rcu_read_lock();
>>> +	pool = rcu_dereference(sq->xsk.pool);
>>> +	if (pool)
>>> +		work_done = virtnet_xsk_run(sq, pool, budget, true);
>>> +	rcu_read_unlock();
>>> +	return work_done;
>>> +}
>>> +
>>>    static int virtnet_poll_tx(struct napi_struct *napi, int budget)
>>>    {
>>>    	struct send_queue *sq = container_of(napi, struct send_queue, napi);
>>> @@ -1545,6 +1601,7 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget)
>>>
>>>    	txq = netdev_get_tx_queue(vi->dev, index);
>>>    	__netif_tx_lock(txq, raw_smp_processor_id());
>>> +	work_done += virtnet_poll_xsk(sq, budget);
>>>    	free_old_xmit(sq, true);
>>>    	__netif_tx_unlock(txq);
>>>
>>> @@ -2535,6 +2592,234 @@ static int virtnet_xdp_set(struct net_device *dev, struct bpf_prog *prog,
>>>    	return err;
>>>    }
>>>
>>> +static void virtnet_xsk_check_queue(struct send_queue *sq)
>>> +{
>>> +	struct virtnet_info *vi = sq->vq->vdev->priv;
>>> +	struct net_device *dev = vi->dev;
>>> +	int qnum = sq - vi->sq;
>>> +
>>> +	/* If this sq is not the exclusive queue of the current cpu,
>>> +	 * then it may be called by start_xmit, so check it running out
>>> +	 * of space.
>>> +	 *
>>
>> I think it's better to move this check after is_xdp_raw_buffer_queue().
> Sorry, do not understand.


So what I meant is:


/* If it is a raw buffer queue, ... */
     if (is_xdp_raw_buffer_queue())

/* if it is not the exclusive queue */
     if (sq->vq->num_free ...)


>>
>>> +	 * And if it is a raw buffer queue, it does not check whether the status
>>> +	 * of the queue is stopped when sending. So there is no need to check
>>> +	 * the situation of the raw buffer queue.
>>> +	 */
>>> +	if (is_xdp_raw_buffer_queue(vi, qnum))
>>> +		return;
>>> +
>>> +	/* Stop the queue to avoid getting packets that we are
>>> +	 * then unable to transmit. Then wait the tx interrupt.
>>> +	 */
>>> +	if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
>>> +		netif_stop_subqueue(dev, qnum);
>>
>> Is there any way to stop xsk TX here?
>
> xsk tx is driven by tx napi, so stop has no meaning.


So NAPI can be stopped.


>
>
>>
>>> +}
>>> +
>>> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num)
>>> +{
>>> +	struct xsk_buff_pool *pool;
>>> +
>>> +	rcu_read_lock();
>>> +
>>> +	pool = rcu_dereference(sq->xsk.pool);
>>> +	if (!pool) {
>>> +		rcu_read_unlock();
>>> +		return;
>>> +	}
>>> +	xsk_tx_completed(pool, num);
>>> +	rcu_read_unlock();
>>> +
>>> +	if (sq->xsk.need_wakeup) {
>>> +		sq->xsk.need_wakeup = false;
>>> +		virtqueue_napi_schedule(&sq->napi, sq->vq);
>>> +	}
>>> +}
>>> +
>>> +static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool,
>>> +			    struct xdp_desc *desc)
>>> +{
>>> +	struct virtnet_info *vi;
>>> +	u32 offset, n, len;
>>> +	struct page *page;
>>> +	void *data;
>>> +	u64 addr;
>>> +	int err;
>>> +
>>> +	vi = sq->vq->vdev->priv;
>>> +	addr = desc->addr;
>>> +
>>> +	data = xsk_buff_raw_get_data(pool, addr);
>>> +	offset = offset_in_page(data);
>>> +
>>> +	/* xsk unaligned mode, desc may use two pages */
>>> +	if (desc->len > PAGE_SIZE - offset)
>>> +		n = 3;
>>> +	else
>>> +		n = 2;
>>> +
>>> +	sg_init_table(sq->sg, n);
>>> +	sg_set_buf(sq->sg, &xsk_hdr, vi->hdr_len);
>>> +
>>> +	/* handle for xsk first page */
>>> +	len = min_t(int, desc->len, PAGE_SIZE - offset);
>>> +	page = xsk_buff_xdp_get_page(pool, addr);
>>> +	sg_set_page(sq->sg + 1, page, len, offset);
>>> +
>>> +	/* xsk unaligned mode, handle for the second page */
>>> +	if (len < desc->len) {
>>> +		page = xsk_buff_xdp_get_page(pool, addr + len);
>>> +		len = min_t(int, desc->len - len, PAGE_SIZE);
>>> +		sg_set_page(sq->sg + 2, page, len, 0);
>>> +	}
>>> +
>>> +	err = virtqueue_add_outbuf(sq->vq, sq->sg, n, xsk_to_ptr(desc),
>>> +				   GFP_ATOMIC);
>>> +	if (unlikely(err))
>>> +		sq->xsk.last_desc = *desc;
>>> +
>>> +	return err;
>>> +}
>>> +
>>> +static int virtnet_xsk_xmit_batch(struct send_queue *sq,
>>> +				  struct xsk_buff_pool *pool,
>>> +				  unsigned int budget,
>>> +				  bool in_napi, int *done,
>>> +				  struct virtnet_sq_stats *stats)
>>> +{
>>> +	struct xdp_desc desc;
>>> +	int err, packet = 0;
>>> +	int ret = -EAGAIN;
>>> +
>>> +	if (sq->xsk.last_desc.addr) {
>>> +		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
>>> +			return -EBUSY;
>>> +
>>> +		err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc);
>>> +		if (unlikely(err))
>>> +			return -EBUSY;
>>> +
>>> +		++packet;
>>> +		--budget;
>>> +		sq->xsk.last_desc.addr = 0;
>>
>> So I think we don't need to do this since we try always to reserve 2 +
>> MAX_SKB_FRAGS, then it means we get -EIO/-ENOMEM which is bascially a
>> broken device or dma map.
>>
> Does it mean that when there are enough slots, virtqueue_add_outbuf() returns
> failure -EIO/-ENOMEM, which means that the network card is no longer working.


Or it could be a bug of the driver. And you don't need to check num_free 
before if you always try to reserve sufficient slots.


>
> I originally thought that an error was returned, but it may work normally next
> time. It should be my fault.
>
> Thank you very much, I learned it. I will delete the related code.
>
>>> +	}
>>> +
>>> +	while (budget-- > 0) {
>>> +		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) {
>>> +			ret = -EBUSY;
>>> +			break;
>>> +		}
>>> +
>>> +		if (!xsk_tx_peek_desc(pool, &desc)) {
>>> +			/* done */
>>> +			ret = 0;
>>> +			break;
>>> +		}
>>> +
>>> +		err = virtnet_xsk_xmit(sq, pool, &desc);
>>> +		if (unlikely(err)) {
>>> +			ret = -EBUSY;
>>
>> Since the function will be called by NAPI I think we to report the
>> number of packets that is transmitted as well.
>
> A 'desc' points to a packet, so only one packet is sent by virtnet_xsk_xmit().
> I have this information in the following variable "packet" statistics. Finally,
> use the parameter "done" to pass to the upper layer.


I meant you did:

virtnet_poll_tx()
     work_done += virtnet_poll_xsk()
     virtnet_xsk_run()
         virtnet_xsk_xmit_batch()

If there's no suffcieint slot you still need to return the number of 
packet that has been sent.



>
>>
>>> +			break;
>>> +		}
>>> +
>>> +		++packet;
>>> +	}
>>> +
>>> +	if (packet) {
>>> +		if (virtqueue_kick_prepare(sq->vq) && virtqueue_notify(sq->vq))
>>> +			++stats->kicks;
>>> +
>>> +		*done = packet;
>>> +		stats->xdp_tx += packet;
>>> +
>>> +		xsk_tx_release(pool);
>>> +	}
>>> +
>>> +	return ret;
>>> +}
>>> +
>>> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
>>> +			   int budget, bool in_napi)
>>> +{
>>> +	struct virtnet_sq_stats stats = {};
>>> +	int done = 0;
>>> +	int err;
>>> +
>>> +	sq->xsk.need_wakeup = false;
>>> +	__free_old_xmit(sq, in_napi, &stats);
>>> +
>>> +	/* return err:
>>> +	 * -EAGAIN: done == budget
>>> +	 * -EBUSY:  done < budget
>>> +	 *  0    :  done < budget
>>> +	 */
>>> +	err = virtnet_xsk_xmit_batch(sq, pool, budget, in_napi, &done, &stats);
>>> +	if (err == -EBUSY) {
>>> +		__free_old_xmit(sq, in_napi, &stats);
>>> +
>>> +		/* If the space is enough, let napi run again. */
>>> +		if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
>>> +			done = budget;
>>
>> Why you need to run NAPI isntead of a netif_tx_wake()?
>
> When virtnet_xsk_run() is called by virtnet_xsk_wakeup(), the return value is
> not concerned.
>
> virtnet_xsk_run() is already in napi when it is called by poll_tx(), so if there
> are more slots, I try to return "budget" and let napi call poll_tx() again


Well, my question is why you don't need to wakeup the qdisc in this 
case. Note that the queue could be shared with ndo_start_xmit().


>
>>
>>> +		else
>>> +			sq->xsk.need_wakeup = true;
>>
>> So done is 0, is this intended?
>
> When err == 0, it indicates that the xsk queue has been exhausted. At this time,
> done < budget, return done directly, and napi can be stopped.
>
> When err == -EAGAIN, it indicates that the budget amount of patch has been sent,
> and done should be returned to the upper layer. Wait for napi to call poll_tx()
> again


So the code is only for -EBUSY if I read the code correctly. And in this 
case you return 0.


>
> In both cases, done is directly returned to the upper layer without special
> processing.
>
>>
>>> +	}
>>> +
>>> +	virtnet_xsk_check_queue(sq);
>>> +
>>> +	u64_stats_update_begin(&sq->stats.syncp);
>>> +	sq->stats.packets += stats.packets;
>>> +	sq->stats.bytes += stats.bytes;
>>> +	sq->stats.kicks += stats.kicks;
>>> +	sq->stats.xdp_tx += stats.xdp_tx;
>>> +	u64_stats_update_end(&sq->stats.syncp);
>>> +
>>> +	return done;
>>> +}
>>> +
>>> +static int virtnet_xsk_wakeup(struct net_device *dev, u32 qid, u32 flag)
>>> +{
>>> +	struct virtnet_info *vi = netdev_priv(dev);
>>> +	struct xsk_buff_pool *pool;
>>> +	struct netdev_queue *txq;
>>> +	struct send_queue *sq;
>>> +
>>> +	if (!netif_running(dev))
>>> +		return -ENETDOWN;
>>> +
>>> +	if (qid >= vi->curr_queue_pairs)
>>> +		return -EINVAL;
>>> +
>>> +	sq = &vi->sq[qid];
>>> +
>>> +	rcu_read_lock();
>>> +
>>> +	pool = rcu_dereference(sq->xsk.pool);
>>> +	if (!pool)
>>> +		goto end;
>>> +
>>> +	if (napi_if_scheduled_mark_missed(&sq->napi))
>>> +		goto end;
>>> +
>>> +	txq = netdev_get_tx_queue(dev, qid);
>>> +
>>> +	__netif_tx_lock_bh(txq);
>>> +
>>> +	/* Send part of the packet directly to reduce the delay in sending the
>>> +	 * packet, and this can actively trigger the tx interrupts.
>>> +	 *
>>> +	 * If no packet is sent out, the ring of the device is full. In this
>>> +	 * case, we will still get a tx interrupt response. Then we will deal
>>> +	 * with the subsequent packet sending work.
>>
>> So stmmac schedule NAPI here, do you have perf numbers for this improvement?
>
> virtnet_xsk_wakeup() is called by sendto(). The purpose is to start consuming
> the data in the xsk tx queue. The purpose here is to make napi run.
>
> When napi is not running, I try to send a certain amount of data:
> 1. Reduce the delay of the first packet
> 2. Trigger hardware to generate tx interrupt


I dont' see the code the trigger hardware interrupt and I don't believe 
virtio can do this.

So the point is we need

1) schedule NAPI when napi_if_scheduled_mark_missed() returns false
2) introduce this optimization on top with perf numbers

Thanks

>
> Thanks.
>
>> Thanks
>>
>>
>>> +	 */
>>> +	virtnet_xsk_run(sq, pool, napi_weight, false);
>>> +
>>> +	__netif_tx_unlock_bh(txq);
>>> +
>>> +end:
>>> +	rcu_read_unlock();
>>> +	return 0;
>>> +}
>>> +
>>>    static int virtnet_xsk_pool_enable(struct net_device *dev,
>>>    				   struct xsk_buff_pool *pool,
>>>    				   u16 qid)
>>> @@ -2559,6 +2844,8 @@ static int virtnet_xsk_pool_enable(struct net_device *dev,
>>>    		return -EPERM;
>>>
>>>    	rcu_read_lock();
>>> +	memset(&sq->xsk, 0, sizeof(sq->xsk));
>>> +
>>>    	/* Here is already protected by rtnl_lock, so rcu_assign_pointer is
>>>    	 * safe.
>>>    	 */
>>> @@ -2658,6 +2945,7 @@ static const struct net_device_ops virtnet_netdev = {
>>>    	.ndo_vlan_rx_kill_vid = virtnet_vlan_rx_kill_vid,
>>>    	.ndo_bpf		= virtnet_xdp,
>>>    	.ndo_xdp_xmit		= virtnet_xdp_xmit,
>>> +	.ndo_xsk_wakeup         = virtnet_xsk_wakeup,
>>>    	.ndo_features_check	= passthru_features_check,
>>>    	.ndo_get_phys_port_name	= virtnet_get_phys_port_name,
>>>    	.ndo_set_features	= virtnet_set_features,
>>> @@ -2761,9 +3049,9 @@ static void free_unused_bufs(struct virtnet_info *vi)
>>>    	for (i = 0; i < vi->max_queue_pairs; i++) {
>>>    		struct virtqueue *vq = vi->sq[i].vq;
>>>    		while ((buf = virtqueue_detach_unused_buf(vq)) != NULL) {
>>> -			if (!is_xdp_frame(buf))
>>> +			if (is_skb_ptr(buf))
>>>    				dev_kfree_skb(buf);
>>> -			else
>>> +			else if (is_xdp_frame(buf))
>>>    				xdp_return_frame(ptr_to_xdp(buf));
>>>    		}
>>>    	}

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ