[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <51bc1c38-da20-1090-e3ef-1972f28adfee@redhat.com>
Date: Thu, 27 May 2021 14:53:02 +0800
From: Jason Wang <jasowang@...hat.com>
To: Yunsheng Lin <linyunsheng@...wei.com>, davem@...emloft.net,
kuba@...nel.org
Cc: will@...nel.org, peterz@...radead.org, paulmck@...nel.org,
linux-kernel@...r.kernel.org, netdev@...r.kernel.org,
mst@...hat.com, brouer@...hat.com
Subject: Re: [PATCH net-next] ptr_ring: make __ptr_ring_empty() checking more
reliable
在 2021/5/27 下午2:07, Yunsheng Lin 写道:
> On 2021/5/27 12:57, Jason Wang wrote:
>> 在 2021/5/26 下午8:29, Yunsheng Lin 写道:
>>> Currently r->queue[] is cleared after r->consumer_head is moved
>>> forward, which makes the __ptr_ring_empty() checking called in
>>> page_pool_refill_alloc_cache() unreliable if the checking is done
>>> after the r->queue clearing and before the consumer_head moving
>>> forward.
>>>
>>> Move the r->queue[] clearing after consumer_head moving forward
>>> to make __ptr_ring_empty() checking more reliable.
>>
>> If I understand this correctly, this can only happens if you run __ptr_ring_empty() in parallel with ptr_ring_discard_one().
> Yes.
>
>> I think those two needs to be serialized. Or did I miss anything?
> As the below comment in __ptr_ring_discard_one, if the above is true, I
> do not think we need to keep consumer_head valid at all times, right?
>
>
> /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
> * to work correctly.
> */
I'm not sure I understand. But my point is that you need to synchronize
the __ptr_ring_discard_one() and __ptr_empty() as explained in the
comment above __ptr_ring_empty():
/*
* Test ring empty status without taking any locks.
*
* NB: This is only safe to call if ring is never resized.
*
* However, if some other CPU consumes ring entries at the same time,
the value
* returned is not guaranteed to be correct.
*
* In this case - to avoid incorrectly detecting the ring
* as empty - the CPU consuming the ring entries is responsible
* for either consuming all ring entries until the ring is empty,
* or synchronizing with some other CPU and causing it to
* re-test __ptr_ring_empty and/or consume the ring enteries
* after the synchronization point.
*
* Note: callers invoking this in a loop must use a compiler barrier,
* for example cpu_relax().
*/
Thanks
>> Thanks
>>
>>
>>> Signed-off-by: Yunsheng Lin <linyunsheng@...wei.com>
>>> ---
>>> include/linux/ptr_ring.h | 26 +++++++++++++++++---------
>>> 1 file changed, 17 insertions(+), 9 deletions(-)
>>>
>>> diff --git a/include/linux/ptr_ring.h b/include/linux/ptr_ring.h
>>> index 808f9d3..f32f052 100644
>>> --- a/include/linux/ptr_ring.h
>>> +++ b/include/linux/ptr_ring.h
>>> @@ -261,8 +261,7 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
>>> /* Note: we must keep consumer_head valid at all times for __ptr_ring_empty
>>> * to work correctly.
>>> */
>>> - int consumer_head = r->consumer_head;
>>> - int head = consumer_head++;
>>> + int consumer_head = r->consumer_head + 1;
>>> /* Once we have processed enough entries invalidate them in
>>> * the ring all at once so producer can reuse their space in the ring.
>>> @@ -271,19 +270,28 @@ static inline void __ptr_ring_discard_one(struct ptr_ring *r)
>>> */
>>> if (unlikely(consumer_head - r->consumer_tail >= r->batch ||
>>> consumer_head >= r->size)) {
>>> + int tail = r->consumer_tail;
>>> + int head = consumer_head;
>>> +
>>> + if (unlikely(consumer_head >= r->size)) {
>>> + r->consumer_tail = 0;
>>> + WRITE_ONCE(r->consumer_head, 0);
>>> + } else {
>>> + r->consumer_tail = consumer_head;
>>> + WRITE_ONCE(r->consumer_head, consumer_head);
>>> + }
>>> +
>>> /* Zero out entries in the reverse order: this way we touch the
>>> * cache line that producer might currently be reading the last;
>>> * producer won't make progress and touch other cache lines
>>> * besides the first one until we write out all entries.
>>> */
>>> - while (likely(head >= r->consumer_tail))
>>> - r->queue[head--] = NULL;
>>> - r->consumer_tail = consumer_head;
>>> - }
>>> - if (unlikely(consumer_head >= r->size)) {
>>> - consumer_head = 0;
>>> - r->consumer_tail = 0;
>>> + while (likely(--head >= tail))
>>> + r->queue[head] = NULL;
>>> +
>>> + return;
>>> }
>>> +
>>> /* matching READ_ONCE in __ptr_ring_empty for lockless tests */
>>> WRITE_ONCE(r->consumer_head, consumer_head);
>>> }
>>
>> .
>>
Powered by blists - more mailing lists