lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [day] [month] [year] [list]
Date:   Mon, 19 Apr 2021 10:46:15 +0800
From:   Jason Wang <jasowang@...hat.com>
To:     Xuan Zhuo <xuanzhuo@...ux.alibaba.com>
Cc:     "Michael S. Tsirkin" <mst@...hat.com>,
        "David S. Miller" <davem@...emloft.net>,
        Jakub Kicinski <kuba@...nel.org>,
        Björn Töpel <bjorn@...nel.org>,
        Magnus Karlsson <magnus.karlsson@...el.com>,
        Jonathan Lemon <jonathan.lemon@...il.com>,
        Alexei Starovoitov <ast@...nel.org>,
        Daniel Borkmann <daniel@...earbox.net>,
        Jesper Dangaard Brouer <hawk@...nel.org>,
        John Fastabend <john.fastabend@...il.com>,
        virtualization@...ts.linux-foundation.org, bpf@...r.kernel.org,
        "dust.li" <dust.li@...ux.alibaba.com>, netdev@...r.kernel.org
Subject: Re: [PATCH net-next v4 09/10] virtio-net: xsk zero copy xmit
 implement wakeup and xmit


在 2021/4/16 下午2:29, Xuan Zhuo 写道:
> On Fri, 16 Apr 2021 13:35:33 +0800, Jason Wang <jasowang@...hat.com> wrote:
>> 在 2021/4/15 下午6:27, Xuan Zhuo 写道:
>>> On Wed, 14 Apr 2021 13:46:45 +0800, Jason Wang <jasowang@...hat.com> wrote:
>>>> 在 2021/4/13 上午11:15, Xuan Zhuo 写道:
>>>>> This patch implements the core part of xsk zerocopy xmit.
>>>>>
>>>>> When the user calls sendto to consume the data in the xsk tx queue,
>>>>> virtnet_xsk_wakeup() will be called.
>>>>>
>>>>> In wakeup, it will try to send a part of the data directly. There are
>>>>> two purposes for this realization:
>>>>>
>>>>> 1. Send part of the data quickly to reduce the transmission delay of the
>>>>>       first packet.
>>>>> 2. Trigger tx interrupt, start napi to consume xsk tx data.
>>>>>
>>>>> All sent xsk packets share the virtio-net header of xsk_hdr. If xsk
>>>>> needs to support csum and other functions later, consider assigning xsk
>>>>> hdr separately for each sent packet.
>>>>>
>>>>> There are now three situations in free_old_xmit(): skb, xdp frame, xsk
>>>>> desc.  Based on the last two bit of ptr returned by virtqueue_get_buf():
>>>>>        00 is skb by default.
>>>>>        01 represents the packet sent by xdp
>>>>>        10 is the packet sent by xsk
>>>>>
>>>>> If the xmit work of xsk has not been completed, but the ring is full,
>>>>> napi must first exit and wait for the ring to be available, so
>>>>> need_wakeup() is set. If free_old_xmit() is called first by start_xmit(),
>>>>> we can quickly wake up napi to execute xsk xmit task.
>>>>>
>>>>> When recycling, we need to count the number of bytes sent, so put xsk
>>>>> desc->len into the ptr pointer. Because ptr does not point to meaningful
>>>>> objects in xsk.
>>>>>
>>>>> Signed-off-by: Xuan Zhuo <xuanzhuo@...ux.alibaba.com>
>>>>> Reviewed-by: Dust Li <dust.li@...ux.alibaba.com>
>>>>> ---
>>>>>     drivers/net/virtio_net.c | 296 ++++++++++++++++++++++++++++++++++++++-
>>>>>     1 file changed, 292 insertions(+), 4 deletions(-)
>>>>>
>>>>> diff --git a/drivers/net/virtio_net.c b/drivers/net/virtio_net.c
>>>>> index 8242a9e9f17d..c441d6bf1510 100644
>>>>> --- a/drivers/net/virtio_net.c
>>>>> +++ b/drivers/net/virtio_net.c
>>>>> @@ -46,6 +46,11 @@ module_param(napi_tx, bool, 0644);
>>>>>     #define VIRTIO_XDP_REDIR	BIT(1)
>>>>>
>>>>>     #define VIRTIO_XDP_FLAG	BIT(0)
>>>>> +#define VIRTIO_XSK_FLAG	BIT(1)
>>>>> +
>>>>> +#define VIRTIO_XSK_PTR_SHIFT       4
>>>>> +
>>>>> +static struct virtio_net_hdr_mrg_rxbuf xsk_hdr;
>>>>>
>>>>>     /* RX packet size EWMA. The average packet size is used to determine the packet
>>>>>      * buffer size when refilling RX rings. As the entire RX ring may be refilled
>>>>> @@ -138,6 +143,12 @@ struct send_queue {
>>>>>     	struct {
>>>>>     		/* xsk pool */
>>>>>     		struct xsk_buff_pool __rcu *pool;
>>>>> +
>>>>> +		/* save the desc for next xmit, when xmit fail. */
>>>>> +		struct xdp_desc last_desc;
>>>> As replied in the pervious version this looks tricky. I think we need to
>>>> make sure to reserve some slots as skb path did.
>>>>
>>>> This looks exactly like what stmmac did which alos shares XDP and skb
>>>> for the same ring.
>>>>
>>>>
>>>>> +
>>>>> +		/* xsk wait for tx inter or softirq */
>>>>> +		bool need_wakeup;
>>>>>     	} xsk;
>>>>>     };
>>>>>
>>>>> @@ -255,6 +266,15 @@ struct padded_vnet_hdr {
>>>>>     	char padding[4];
>>>>>     };
>>>>>
>>>>> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
>>>>> +			   int budget, bool in_napi);
>>>>> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num);
>>>>> +
>>>>> +static bool is_skb_ptr(void *ptr)
>>>>> +{
>>>>> +	return !((unsigned long)ptr & (VIRTIO_XDP_FLAG | VIRTIO_XSK_FLAG));
>>>>> +}
>>>>> +
>>>>>     static bool is_xdp_frame(void *ptr)
>>>>>     {
>>>>>     	return (unsigned long)ptr & VIRTIO_XDP_FLAG;
>>>>> @@ -265,6 +285,19 @@ static void *xdp_to_ptr(struct xdp_frame *ptr)
>>>>>     	return (void *)((unsigned long)ptr | VIRTIO_XDP_FLAG);
>>>>>     }
>>>>>
>>>>> +static void *xsk_to_ptr(struct xdp_desc *desc)
>>>>> +{
>>>>> +	/* save the desc len to ptr */
>>>>> +	u64 p = desc->len << VIRTIO_XSK_PTR_SHIFT;
>>>>> +
>>>>> +	return (void *)((unsigned long)p | VIRTIO_XSK_FLAG);
>>>>> +}
>>>>> +
>>>>> +static void ptr_to_xsk(void *ptr, struct xdp_desc *desc)
>>>>> +{
>>>>> +	desc->len = ((unsigned long)ptr) >> VIRTIO_XSK_PTR_SHIFT;
>>>>> +}
>>>>> +
>>>>>     static struct xdp_frame *ptr_to_xdp(void *ptr)
>>>>>     {
>>>>>     	return (struct xdp_frame *)((unsigned long)ptr & ~VIRTIO_XDP_FLAG);
>>>>> @@ -273,25 +306,35 @@ static struct xdp_frame *ptr_to_xdp(void *ptr)
>>>>>     static void __free_old_xmit(struct send_queue *sq, bool in_napi,
>>>>>     			    struct virtnet_sq_stats *stats)
>>>>>     {
>>>>> +	unsigned int xsknum = 0;
>>>>>     	unsigned int len;
>>>>>     	void *ptr;
>>>>>
>>>>>     	while ((ptr = virtqueue_get_buf(sq->vq, &len)) != NULL) {
>>>>> -		if (likely(!is_xdp_frame(ptr))) {
>>>>> +		if (is_skb_ptr(ptr)) {
>>>>>     			struct sk_buff *skb = ptr;
>>>>>
>>>>>     			pr_debug("Sent skb %p\n", skb);
>>>>>
>>>>>     			stats->bytes += skb->len;
>>>>>     			napi_consume_skb(skb, in_napi);
>>>>> -		} else {
>>>>> +		} else if (is_xdp_frame(ptr)) {
>>>>>     			struct xdp_frame *frame = ptr_to_xdp(ptr);
>>>>>
>>>>>     			stats->bytes += frame->len;
>>>>>     			xdp_return_frame(frame);
>>>>> +		} else {
>>>>> +			struct xdp_desc desc;
>>>>> +
>>>>> +			ptr_to_xsk(ptr, &desc);
>>>>> +			stats->bytes += desc.len;
>>>>> +			++xsknum;
>>>>>     		}
>>>>>     		stats->packets++;
>>>>>     	}
>>>>> +
>>>>> +	if (xsknum)
>>>>> +		virtnet_xsk_complete(sq, xsknum);
>>>>>     }
>>>>>
>>>>>     /* Converting between virtqueue no. and kernel tx/rx queue no.
>>>>> @@ -1529,6 +1572,19 @@ static int virtnet_open(struct net_device *dev)
>>>>>     	return 0;
>>>>>     }
>>>>>
>>>>> +static int virtnet_poll_xsk(struct send_queue *sq, int budget)
>>>>> +{
>>>>> +	struct xsk_buff_pool *pool;
>>>>> +	int work_done = 0;
>>>>> +
>>>>> +	rcu_read_lock();
>>>>> +	pool = rcu_dereference(sq->xsk.pool);
>>>>> +	if (pool)
>>>>> +		work_done = virtnet_xsk_run(sq, pool, budget, true);
>>>>> +	rcu_read_unlock();
>>>>> +	return work_done;
>>>>> +}
>>>>> +
>>>>>     static int virtnet_poll_tx(struct napi_struct *napi, int budget)
>>>>>     {
>>>>>     	struct send_queue *sq = container_of(napi, struct send_queue, napi);
>>>>> @@ -1545,6 +1601,7 @@ static int virtnet_poll_tx(struct napi_struct *napi, int budget)
>>>>>
>>>>>     	txq = netdev_get_tx_queue(vi->dev, index);
>>>>>     	__netif_tx_lock(txq, raw_smp_processor_id());
>>>>> +	work_done += virtnet_poll_xsk(sq, budget);
>>>>>     	free_old_xmit(sq, true);
>>>>>     	__netif_tx_unlock(txq);
>>>>>
>>>>> @@ -2535,6 +2592,234 @@ static int virtnet_xdp_set(struct net_device *dev, struct bpf_prog *prog,
>>>>>     	return err;
>>>>>     }
>>>>>
>>>>> +static void virtnet_xsk_check_queue(struct send_queue *sq)
>>>>> +{
>>>>> +	struct virtnet_info *vi = sq->vq->vdev->priv;
>>>>> +	struct net_device *dev = vi->dev;
>>>>> +	int qnum = sq - vi->sq;
>>>>> +
>>>>> +	/* If this sq is not the exclusive queue of the current cpu,
>>>>> +	 * then it may be called by start_xmit, so check it running out
>>>>> +	 * of space.
>>>>> +	 *
>>>> I think it's better to move this check after is_xdp_raw_buffer_queue().
>>> Sorry, do not understand.
>>
>> So what I meant is:
>>
>>
>> /* If it is a raw buffer queue, ... */
>>       if (is_xdp_raw_buffer_queue())
>>
>> /* if it is not the exclusive queue */
>>       if (sq->vq->num_free ...)
> I understand.
>
>>
>>>>> +	 * And if it is a raw buffer queue, it does not check whether the status
>>>>> +	 * of the queue is stopped when sending. So there is no need to check
>>>>> +	 * the situation of the raw buffer queue.
>>>>> +	 */
>>>>> +	if (is_xdp_raw_buffer_queue(vi, qnum))
>>>>> +		return;
>>>>> +
>>>>> +	/* Stop the queue to avoid getting packets that we are
>>>>> +	 * then unable to transmit. Then wait the tx interrupt.
>>>>> +	 */
>>>>> +	if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
>>>>> +		netif_stop_subqueue(dev, qnum);
>>>> Is there any way to stop xsk TX here?
>>> xsk tx is driven by tx napi, so stop has no meaning.
>>
>> So NAPI can be stopped.
>>
>>
>>>
>>>>> +}
>>>>> +
>>>>> +static void virtnet_xsk_complete(struct send_queue *sq, u32 num)
>>>>> +{
>>>>> +	struct xsk_buff_pool *pool;
>>>>> +
>>>>> +	rcu_read_lock();
>>>>> +
>>>>> +	pool = rcu_dereference(sq->xsk.pool);
>>>>> +	if (!pool) {
>>>>> +		rcu_read_unlock();
>>>>> +		return;
>>>>> +	}
>>>>> +	xsk_tx_completed(pool, num);
>>>>> +	rcu_read_unlock();
>>>>> +
>>>>> +	if (sq->xsk.need_wakeup) {
>>>>> +		sq->xsk.need_wakeup = false;
>>>>> +		virtqueue_napi_schedule(&sq->napi, sq->vq);
>>>>> +	}
>>>>> +}
>>>>> +
>>>>> +static int virtnet_xsk_xmit(struct send_queue *sq, struct xsk_buff_pool *pool,
>>>>> +			    struct xdp_desc *desc)
>>>>> +{
>>>>> +	struct virtnet_info *vi;
>>>>> +	u32 offset, n, len;
>>>>> +	struct page *page;
>>>>> +	void *data;
>>>>> +	u64 addr;
>>>>> +	int err;
>>>>> +
>>>>> +	vi = sq->vq->vdev->priv;
>>>>> +	addr = desc->addr;
>>>>> +
>>>>> +	data = xsk_buff_raw_get_data(pool, addr);
>>>>> +	offset = offset_in_page(data);
>>>>> +
>>>>> +	/* xsk unaligned mode, desc may use two pages */
>>>>> +	if (desc->len > PAGE_SIZE - offset)
>>>>> +		n = 3;
>>>>> +	else
>>>>> +		n = 2;
>>>>> +
>>>>> +	sg_init_table(sq->sg, n);
>>>>> +	sg_set_buf(sq->sg, &xsk_hdr, vi->hdr_len);
>>>>> +
>>>>> +	/* handle for xsk first page */
>>>>> +	len = min_t(int, desc->len, PAGE_SIZE - offset);
>>>>> +	page = xsk_buff_xdp_get_page(pool, addr);
>>>>> +	sg_set_page(sq->sg + 1, page, len, offset);
>>>>> +
>>>>> +	/* xsk unaligned mode, handle for the second page */
>>>>> +	if (len < desc->len) {
>>>>> +		page = xsk_buff_xdp_get_page(pool, addr + len);
>>>>> +		len = min_t(int, desc->len - len, PAGE_SIZE);
>>>>> +		sg_set_page(sq->sg + 2, page, len, 0);
>>>>> +	}
>>>>> +
>>>>> +	err = virtqueue_add_outbuf(sq->vq, sq->sg, n, xsk_to_ptr(desc),
>>>>> +				   GFP_ATOMIC);
>>>>> +	if (unlikely(err))
>>>>> +		sq->xsk.last_desc = *desc;
>>>>> +
>>>>> +	return err;
>>>>> +}
>>>>> +
>>>>> +static int virtnet_xsk_xmit_batch(struct send_queue *sq,
>>>>> +				  struct xsk_buff_pool *pool,
>>>>> +				  unsigned int budget,
>>>>> +				  bool in_napi, int *done,
>>>>> +				  struct virtnet_sq_stats *stats)
>>>>> +{
>>>>> +	struct xdp_desc desc;
>>>>> +	int err, packet = 0;
>>>>> +	int ret = -EAGAIN;
>>>>> +
>>>>> +	if (sq->xsk.last_desc.addr) {
>>>>> +		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS)
>>>>> +			return -EBUSY;
>>>>> +
>>>>> +		err = virtnet_xsk_xmit(sq, pool, &sq->xsk.last_desc);
>>>>> +		if (unlikely(err))
>>>>> +			return -EBUSY;
>>>>> +
>>>>> +		++packet;
>>>>> +		--budget;
>>>>> +		sq->xsk.last_desc.addr = 0;
>>>> So I think we don't need to do this since we try always to reserve 2 +
>>>> MAX_SKB_FRAGS, then it means we get -EIO/-ENOMEM which is bascially a
>>>> broken device or dma map.
>>>>
>>> Does it mean that when there are enough slots, virtqueue_add_outbuf() returns
>>> failure -EIO/-ENOMEM, which means that the network card is no longer working.
>>
>> Or it could be a bug of the driver. And you don't need to check num_free
>> before if you always try to reserve sufficient slots.
>>
>>
>>> I originally thought that an error was returned, but it may work normally next
>>> time. It should be my fault.
>>>
>>> Thank you very much, I learned it. I will delete the related code.
>>>
>>>>> +	}
>>>>> +
>>>>> +	while (budget-- > 0) {
>>>>> +		if (sq->vq->num_free < 2 + MAX_SKB_FRAGS) {
>>>>> +			ret = -EBUSY;
>>>>> +			break;
>>>>> +		}
>>>>> +
>>>>> +		if (!xsk_tx_peek_desc(pool, &desc)) {
>>>>> +			/* done */
>>>>> +			ret = 0;
>>>>> +			break;
>>>>> +		}
>>>>> +
>>>>> +		err = virtnet_xsk_xmit(sq, pool, &desc);
>>>>> +		if (unlikely(err)) {
>>>>> +			ret = -EBUSY;
>>>> Since the function will be called by NAPI I think we to report the
>>>> number of packets that is transmitted as well.
>>> A 'desc' points to a packet, so only one packet is sent by virtnet_xsk_xmit().
>>> I have this information in the following variable "packet" statistics. Finally,
>>> use the parameter "done" to pass to the upper layer.
>>
>> I meant you did:
>>
>> virtnet_poll_tx()
>>       work_done += virtnet_poll_xsk()
>>       virtnet_xsk_run()
>>           virtnet_xsk_xmit_batch()
>>
>> If there's no suffcieint slot you still need to return the number of
>> packet that has been sent.
> I don’t understand what you mean.
>
> As long as there are packets sent, "++packet" will be counted, and finally
> returned to virtnet_xsk_run() through "done", regardless of whether there is
> insufficient slot.


Right, I misread the code.


>
>>
>>
>>>>> +			break;
>>>>> +		}
>>>>> +
>>>>> +		++packet;
>>>>> +	}
>>>>> +
>>>>> +	if (packet) {
>>>>> +		if (virtqueue_kick_prepare(sq->vq) && virtqueue_notify(sq->vq))
>>>>> +			++stats->kicks;
>>>>> +
>>>>> +		*done = packet;
>>>>> +		stats->xdp_tx += packet;
>>>>> +
>>>>> +		xsk_tx_release(pool);
>>>>> +	}
>>>>> +
>>>>> +	return ret;
>>>>> +}
>>>>> +
>>>>> +static int virtnet_xsk_run(struct send_queue *sq, struct xsk_buff_pool *pool,
>>>>> +			   int budget, bool in_napi)
>>>>> +{
>>>>> +	struct virtnet_sq_stats stats = {};
>>>>> +	int done = 0;
>>>>> +	int err;
>>>>> +
>>>>> +	sq->xsk.need_wakeup = false;
>>>>> +	__free_old_xmit(sq, in_napi, &stats);
>>>>> +
>>>>> +	/* return err:
>>>>> +	 * -EAGAIN: done == budget
>>>>> +	 * -EBUSY:  done < budget
>>>>> +	 *  0    :  done < budget
>>>>> +	 */
>>>>> +	err = virtnet_xsk_xmit_batch(sq, pool, budget, in_napi, &done, &stats);
>>>>> +	if (err == -EBUSY) {
>>>>> +		__free_old_xmit(sq, in_napi, &stats);
>>>>> +
>>>>> +		/* If the space is enough, let napi run again. */
>>>>> +		if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
>>>>> +			done = budget;
>>>> Why you need to run NAPI isntead of a netif_tx_wake()?
>>> When virtnet_xsk_run() is called by virtnet_xsk_wakeup(), the return value is
>>> not concerned.
>>>
>>> virtnet_xsk_run() is already in napi when it is called by poll_tx(), so if there
>>> are more slots, I try to return "budget" and let napi call poll_tx() again
>>
>> Well, my question is why you don't need to wakeup the qdisc in this
>> case. Note that the queue could be shared with ndo_start_xmit().
> virtnet_xsk_run() will been called by virtnet_poll_tx() or virtnet_xsk_wakeup().
>
> 1. virtnet_poll_tx() will call netif_tx_wake_queue()
>
> static int virtnet_poll_tx(struct napi_struct *napi, int budget)
> {
> 	.....
>
> 	if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
> 		netif_tx_wake_queue(txq);
>
> 2. virtnet_xsk_wakeup()
>       You are right. We can indeed call virtnet_xsk_wakeup() here.
>
>>
>>>>> +		else
>>>>> +			sq->xsk.need_wakeup = true;
>>>> So done is 0, is this intended?
>>> When err == 0, it indicates that the xsk queue has been exhausted. At this time,
>>> done < budget, return done directly, and napi can be stopped.
>>>
>>> When err == -EAGAIN, it indicates that the budget amount of patch has been sent,
>>> and done should be returned to the upper layer. Wait for napi to call poll_tx()
>>> again
>>
>> So the code is only for -EBUSY if I read the code correctly. And in this
>> case you return 0.
> This case, I return 'done' and the done < budge.  'done' may not be 0.
>
> done < budget, so napi will stop.


So to question is about this:

                 /* If the space is enough, let napi run again. */
                 if (sq->vq->num_free >= 2 + MAX_SKB_FRAGS)
                         done = budget;
else
                         sq->xsk.need_wakeup = true;

So done is 0 when num_free < 2 + MAX_SKB_FRAGS, I think you should 
return done here.

Note that done is still less than budget in this case.

Another question, when num_free >= 2 + MAX_SKB_FRAGS why not simply 
re-try virtnet_xsk_xmit_batch here?


>
>
>>
>>> In both cases, done is directly returned to the upper layer without special
>>> processing.
>>>
>>>>> +	}
>>>>> +
>>>>> +	virtnet_xsk_check_queue(sq);
>>>>> +
>>>>> +	u64_stats_update_begin(&sq->stats.syncp);
>>>>> +	sq->stats.packets += stats.packets;
>>>>> +	sq->stats.bytes += stats.bytes;
>>>>> +	sq->stats.kicks += stats.kicks;
>>>>> +	sq->stats.xdp_tx += stats.xdp_tx;
>>>>> +	u64_stats_update_end(&sq->stats.syncp);
>>>>> +
>>>>> +	return done;
>>>>> +}
>>>>> +
>>>>> +static int virtnet_xsk_wakeup(struct net_device *dev, u32 qid, u32 flag)
>>>>> +{
>>>>> +	struct virtnet_info *vi = netdev_priv(dev);
>>>>> +	struct xsk_buff_pool *pool;
>>>>> +	struct netdev_queue *txq;
>>>>> +	struct send_queue *sq;
>>>>> +
>>>>> +	if (!netif_running(dev))
>>>>> +		return -ENETDOWN;
>>>>> +
>>>>> +	if (qid >= vi->curr_queue_pairs)
>>>>> +		return -EINVAL;
>>>>> +
>>>>> +	sq = &vi->sq[qid];
>>>>> +
>>>>> +	rcu_read_lock();
>>>>> +
>>>>> +	pool = rcu_dereference(sq->xsk.pool);
>>>>> +	if (!pool)
>>>>> +		goto end;
>>>>> +
>>>>> +	if (napi_if_scheduled_mark_missed(&sq->napi))
>>>>> +		goto end;
>>>>> +
>>>>> +	txq = netdev_get_tx_queue(dev, qid);
>>>>> +
>>>>> +	__netif_tx_lock_bh(txq);
>>>>> +
>>>>> +	/* Send part of the packet directly to reduce the delay in sending the
>>>>> +	 * packet, and this can actively trigger the tx interrupts.
>>>>> +	 *
>>>>> +	 * If no packet is sent out, the ring of the device is full. In this
>>>>> +	 * case, we will still get a tx interrupt response. Then we will deal
>>>>> +	 * with the subsequent packet sending work.
>>>> So stmmac schedule NAPI here, do you have perf numbers for this improvement?
>>> virtnet_xsk_wakeup() is called by sendto(). The purpose is to start consuming
>>> the data in the xsk tx queue. The purpose here is to make napi run.
>>>
>>> When napi is not running, I try to send a certain amount of data:
>>> 1. Reduce the delay of the first packet
>>> 2. Trigger hardware to generate tx interrupt
>>
>> I dont' see the code the trigger hardware interrupt and I don't believe
>> virtio can do this.
>>
>> So the point is we need
>>
>> 1) schedule NAPI when napi_if_scheduled_mark_missed() returns false
>> 2) introduce this optimization on top with perf numbers
> It is true that virtio net does not directly let the hardware generate a tx
> method. So I try to send data here to wait for the tx interrupt notification
> from the hardware to recycle the sent packets.
>
> Is this method not feasible?


I think the answer is yes. But you need to benchmark this optimization 
to make sure it can give us the imporvement as you expected.

So I suggest to do something like follow:

1) start with a simple napi schedule here
2) add your optimization on top and benchmark the difference


>
> Do you mean the performance improvement brought by xsk zerocopy? I should put it
> in the patch set cover. Indeed I should put one in the commit log of this patch.
>
> There is a problem with that direct schedule NAPI. It will wait for NAPI to run
> on the current cpu. This will cause an obvious delay. Especially the current
> user process may occupy the cpu.


Yes, but can you simply do:

local_bh_disalbe();
netif_napi_schedule();
local_bh_enable();

?

Thanks


>
> Thanks.
>
>> Thanks
>>
>>> Thanks.
>>>
>>>> Thanks
>>>>
>>>>
>>>>> +	 */
>>>>> +	virtnet_xsk_run(sq, pool, napi_weight, false);
>>>>> +
>>>>> +	__netif_tx_unlock_bh(txq);
>>>>> +
>>>>> +end:
>>>>> +	rcu_read_unlock();
>>>>> +	return 0;
>>>>> +}
>>>>> +
>>>>>     static int virtnet_xsk_pool_enable(struct net_device *dev,
>>>>>     				   struct xsk_buff_pool *pool,
>>>>>     				   u16 qid)
>>>>> @@ -2559,6 +2844,8 @@ static int virtnet_xsk_pool_enable(struct net_device *dev,
>>>>>     		return -EPERM;
>>>>>
>>>>>     	rcu_read_lock();
>>>>> +	memset(&sq->xsk, 0, sizeof(sq->xsk));
>>>>> +
>>>>>     	/* Here is already protected by rtnl_lock, so rcu_assign_pointer is
>>>>>     	 * safe.
>>>>>     	 */
>>>>> @@ -2658,6 +2945,7 @@ static const struct net_device_ops virtnet_netdev = {
>>>>>     	.ndo_vlan_rx_kill_vid = virtnet_vlan_rx_kill_vid,
>>>>>     	.ndo_bpf		= virtnet_xdp,
>>>>>     	.ndo_xdp_xmit		= virtnet_xdp_xmit,
>>>>> +	.ndo_xsk_wakeup         = virtnet_xsk_wakeup,
>>>>>     	.ndo_features_check	= passthru_features_check,
>>>>>     	.ndo_get_phys_port_name	= virtnet_get_phys_port_name,
>>>>>     	.ndo_set_features	= virtnet_set_features,
>>>>> @@ -2761,9 +3049,9 @@ static void free_unused_bufs(struct virtnet_info *vi)
>>>>>     	for (i = 0; i < vi->max_queue_pairs; i++) {
>>>>>     		struct virtqueue *vq = vi->sq[i].vq;
>>>>>     		while ((buf = virtqueue_detach_unused_buf(vq)) != NULL) {
>>>>> -			if (!is_xdp_frame(buf))
>>>>> +			if (is_skb_ptr(buf))
>>>>>     				dev_kfree_skb(buf);
>>>>> -			else
>>>>> +			else if (is_xdp_frame(buf))
>>>>>     				xdp_return_frame(ptr_to_xdp(buf));
>>>>>     		}
>>>>>     	}

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ