lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <45caebb9-3c9a-5a62-3b0b-6310af267687@redhat.com>
Date:   Mon, 15 Nov 2021 11:58:53 -0500
From:   Waiman Long <longman@...hat.com>
To:     "Aiqun(Maria) Yu" <quic_aiquny@...cinc.com>,
        Peter Zijlstra <peterz@...radead.org>
Cc:     Ingo Molnar <mingo@...hat.com>, Will Deacon <will@...nel.org>,
        linux-kernel@...r.kernel.org, Davidlohr Bueso <dave@...olabs.net>,
        mazhenhua <mazhenhua@...omi.com>, Hillf Danton <hdanton@...a.com>
Subject: Re: [PATCH v4] locking/rwsem: Make handoff bit handling more
 consistent


On 11/15/21 10:45, Aiqun(Maria) Yu wrote:
> On 11/12/2021 8:10 PM, Peter Zijlstra wrote:
>> On Thu, Nov 11, 2021 at 11:07:53PM -0500, Waiman Long wrote:
>>> @@ -889,6 +892,20 @@ rwsem_spin_on_owner(struct rw_semaphore *sem)
>>>   }
>>>   #endif
>>>   +/*
>>> + * Common code to handle rwsem flags in out_nolock path with 
>>> wait_lock held.
>>> + * If there is more than one waiter in the queue and the HANDOFF 
>>> bit is set,
>>> + * the next waiter will inherit it if the first waiter is removed.
>>> + */
>>> +static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore 
>>> *sem,
>>> +                        struct rwsem_waiter *waiter)
>>
>> I'm going to rename that, it doesn't just clear flags, it dequeues the
>> waiter.
>>
>> Argh, rwsem_mark_wake() doesn't clear HANDOFF when list_empty(), and
>> write_slowpath() is *far* too clever about all of this.
>>
>>
>>> +{
>>> +    list_del(&waiter->list);
>>     if (list_empty(&sem->wait_list)) {
>>> + atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
>>> +                   &sem->count);
>>     }
>>> +}
>>
>>
>>
>>> @@ -1098,7 +1110,7 @@ rwsem_down_write_slowpath(struct rw_semaphore 
>>> *sem, int state)
>>>            * In this case, we attempt to acquire the lock again
>>>            * without sleeping.
>>>            */
>>> -        if (wstate == WRITER_HANDOFF) {
>>> +        if (waiter.handoff_set) {
>>
>> I'm thinking this wants to be something like:
>>
>>         if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
>>
>>>               enum owner_state owner_state;
>>>                 preempt_disable();
>>
>> How's this (on top) then?
>>
>> ---
>> --- a/kernel/locking/rwsem.c
>> +++ b/kernel/locking/rwsem.c
>> @@ -104,11 +104,10 @@
>>    * atomic_long_fetch_add() is used to obtain reader lock, whereas
>>    * atomic_long_cmpxchg() will be used to obtain writer lock.
>>    *
>> - * There are four places where the lock handoff bit may be set or 
>> cleared.
>> - * 1) rwsem_mark_wake() for readers            -- set, clear
>> - * 2) rwsem_try_write_lock() for writers       -- set, clear
>> - * 3) Error path of rwsem_down_write_slowpath() -- clear
>> - * 4) Error path of rwsem_down_read_slowpath()  -- clear
>> + * There are three places where the lock handoff bit may be set or 
>> cleared.
>> + * 1) rwsem_mark_wake() for readers        -- set, clear
>> + * 2) rwsem_try_write_lock() for writers    -- set, clear
>> + * 3) rwsem_del_waiter()            -- clear
>>    *
>>    * For all the above cases, wait_lock will be held. A writer must also
>>    * be the first one in the wait_list to be eligible for setting the 
>> handoff
>> @@ -363,6 +362,31 @@ enum rwsem_wake_type {
>>    */
>>   #define MAX_READERS_WAKEUP    0x100
>>   +static inline void
>> +rwsem_add_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
>> +{
>> +    lockdep_assert_held(&sem->wait_lock);
>> +    list_add_tail(&waiter->list, &sem->wait_list);
>> +    /* caller will set RWSEM_FLAG_WAITERS */
>     /* each time a waiter is just added in to the list,
>      * handoff_set initialed as false. */
>     waiter->handoff_set = false;

waiter initialization is done at entry to the slowpath function. This 
helper function just insert the waiter into the wait list.


>> +}
>> +
>> +/*
>> + * Remove a waiter from the wait_list and clear flags.
>> + *
>> + * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 
>> 'copy' of
>> + * this function. Modify with care.
>> + */
>> +static inline void
>> +rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
>> +{
>> +    lockdep_assert_held(&sem->wait_lock);
>> +    list_del(&waiter->list);
> what about avoid unnecessary inherit of handoff bit for waiters?
>
> if (waiter->handoff_set)
>     atomic_long_andnot(RWSEM_FLAG_HANDOFF, &sem->count);

We have decided that to let the next waiter inherit the handoff bit. So 
there is no need to clear it unless there is no more waiter in the queue.


>> +    if (likely(!list_empty(&sem->wait_list)))
>> +        return;
>> +
>> +    atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS, 
>> &sem->count);
>> +}
>> +
>>   /*
>>    * handle the lock release when processes blocked on it that can 
>> now run
>>    * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS 
>> bit must
>> @@ -374,6 +398,8 @@ enum rwsem_wake_type {
>>    *   preferably when the wait_lock is released
>>    * - woken process blocks are discarded from the list after having 
>> task zeroed
>>    * - writers are only marked woken if downgrading is false
>> + *
>> + * Implies rwsem_del_waiter() for all woken readers.
>>    */
>>   static void rwsem_mark_wake(struct rw_semaphore *sem,
>>                   enum rwsem_wake_type wake_type,
>> @@ -488,18 +514,25 @@ static void rwsem_mark_wake(struct rw_se
>>         adjustment = woken * RWSEM_READER_BIAS - adjustment;
>>       lockevent_cond_inc(rwsem_wake_reader, woken);
>> +
>> +    oldcount = atomic_long_read(&sem->count);
>>       if (list_empty(&sem->wait_list)) {
>> -        /* hit end of list above */
>> +        /*
>> +         * Combined with list_move_tail() above, this implies
>> +         * rwsem_del_waiter().
>> +         */
>>           adjustment -= RWSEM_FLAG_WAITERS;
>> +        if (oldcount & RWSEM_FLAG_HANDOFF)
>> +            adjustment -= RWSEM_FLAG_HANDOFF;
>> +    } else if (woken) {
>> +        /*
>> +         * When we've woken a reader, we no longer need to force
>> +         * writers to give up the lock and we can clear HANDOFF.
>> +         */
>> +        if (oldcount & RWSEM_FLAG_HANDOFF)
>> +            adjustment -= RWSEM_FLAG_HANDOFF;
>>       }
>>   -    /*
>> -     * When we've woken a reader, we no longer need to force writers
>> -     * to give up the lock and we can clear HANDOFF.
>> -     */
>> -    if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
>> -        adjustment -= RWSEM_FLAG_HANDOFF;
>> -
>>       if (adjustment)
>>           atomic_long_add(adjustment, &sem->count);
>>   @@ -529,6 +562,8 @@ static void rwsem_mark_wake(struct rw_se
>>    * This function must be called with the sem->wait_lock held to 
>> prevent
>>    * race conditions between checking the rwsem wait list and setting 
>> the
>>    * sem->count accordingly.
>> + *
>> + * Implies rwsem_del_waiter() on success.
>>    */
>>   static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
>>                       struct rwsem_waiter *waiter)
>> @@ -575,6 +610,11 @@ static inline bool rwsem_try_write_lock(
>>           return false;
>>       }
>>   +    /*
>> +     * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
>> +     * success.
>> +     */
>> +    list_del(&waiter->list);
>>       rwsem_set_owner(sem);
>>       return true;
>>   }
>> @@ -893,20 +933,6 @@ rwsem_spin_on_owner(struct rw_semaphore
>>   #endif
>>     /*
>> - * Common code to handle rwsem flags in out_nolock path with 
>> wait_lock held.
>> - * If there is more than one waiter in the queue and the HANDOFF bit 
>> is set,
>> - * the next waiter will inherit it if the first waiter is removed.
>> - */
>> -static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore 
>> *sem,
>> -                        struct rwsem_waiter *waiter)
>> -{
>> -    list_del(&waiter->list);
>> -    if (list_empty(&sem->wait_list))
>> -        atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS,
>> -                   &sem->count);
>> -}
>> -
>> -/*
>>    * Wait for the read lock to be granted
>>    */
>>   static struct rw_semaphore __sched *
>> @@ -973,7 +999,7 @@ rwsem_down_read_slowpath(struct rw_semap
>>           }
>>           adjustment += RWSEM_FLAG_WAITERS;
>>       }
>> -    list_add_tail(&waiter.list, &sem->wait_list);
>> +    rwsem_add_waiter(sem, &waiter);
>>         /* we're now waiting on the lock, but no longer actively 
>> locking */
>>       count = atomic_long_add_return(adjustment, &sem->count);
>> @@ -1019,7 +1045,7 @@ rwsem_down_read_slowpath(struct rw_semap
>>       return sem;
>>     out_nolock:
>> -    rwsem_out_nolock_clear_flags(sem, &waiter);
>> +    rwsem_del_waiter(sem, &waiter);
>>       raw_spin_unlock_irq(&sem->wait_lock);
>>       __set_current_state(TASK_RUNNING);
>>       lockevent_inc(rwsem_rlock_fail);
>> @@ -1034,7 +1060,6 @@ rwsem_down_write_slowpath(struct rw_sema
>>   {
>>       long count;
>>       struct rwsem_waiter waiter;
>> -    struct rw_semaphore *ret = sem;
>>       DEFINE_WAKE_Q(wake_q);
>>         /* do optimistic spinning and steal lock if possible */
>> @@ -1053,7 +1078,7 @@ rwsem_down_write_slowpath(struct rw_sema
>>       waiter.handoff_set = false;
>>         raw_spin_lock_irq(&sem->wait_lock);
>> -    list_add_tail(&waiter.list, &sem->wait_list);
>> +    rwsem_add_waiter(sem, &waiter);
>>         /* we're now waiting on the lock */
>>       if (rwsem_first_waiter(sem) != &waiter) {
>> @@ -1110,7 +1135,7 @@ rwsem_down_write_slowpath(struct rw_sema
>>            * In this case, we attempt to acquire the lock again
>>            * without sleeping.
>>            */
>> -        if (waiter.handoff_set) {
>> +        if (rwsem_first_waiter(sem) == &waiter && waiter.handoff_set) {
>>               enum owner_state owner_state;
>>                 preempt_disable();
>> @@ -1128,16 +1153,14 @@ rwsem_down_write_slowpath(struct rw_sema
>>           raw_spin_lock_irq(&sem->wait_lock);
>>       }
>>       __set_current_state(TASK_RUNNING);
>> -    list_del(&waiter.list);
>>       raw_spin_unlock_irq(&sem->wait_lock);
>>       lockevent_inc(rwsem_wlock);
>> -
>> -    return ret;
>> +    return sem;
>>     out_nolock:
>>       __set_current_state(TASK_RUNNING);
>>       raw_spin_lock_irq(&sem->wait_lock);
>> -    rwsem_out_nolock_clear_flags(sem, &waiter);
>> +    rwsem_del_waiter(sem, &waiter);
>>       if (!list_empty(&sem->wait_list))
>>           rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
>>       raw_spin_unlock_irq(&sem->wait_lock);
>>
>
> what about avoid unnecessary inherit of handoff bit for waiters?
>     1. when set handoff bit also set waiter.handoff_set as  true;
>     2. when clear handoff bit also set waiter.handoff_set as false.
>     3. And rwsem_add_waiter initial as false;
>     4. And rwsem_del_waiter also can clear the handoff bit according 
> to waiter.handoff_set.
>
> Because handoff bit can have better performance if correctly set and 
> cleared.
>
Killing or interrupt a waiter to force it to quit is not considered a 
fast path operation. It is an exception rather than the rule. So 
performance consideration is less important here.

Cheers,
Longman

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ