lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <32b433a1-31f9-14ba-e8f6-87b69c2c4ac9@quicinc.com>
Date:   Tue, 16 Nov 2021 10:52:42 +0800
From:   "Aiqun(Maria) Yu" <quic_aiquny@...cinc.com>
To:     Waiman Long <longman@...hat.com>,
        Peter Zijlstra <peterz@...radead.org>,
        Ingo Molnar <mingo@...hat.com>, Will Deacon <will@...nel.org>
CC:     <linux-kernel@...r.kernel.org>,
        Davidlohr Bueso <dave@...olabs.net>,
        mazhenhua <mazhenhua@...omi.com>, Hillf Danton <hdanton@...a.com>
Subject: Re: [PATCH v5] locking/rwsem: Make handoff bit handling more
 consistent

On 11/16/2021 9:29 AM, Waiman Long wrote:
> There are some inconsistency in the way that the handoff bit is being
> handled in readers and writers.
> 
> Firstly, when a queue head writer set the handoff bit, it will clear it
> when the writer is being killed or interrupted on its way out without
> acquiring the lock. That is not the case for a queue head reader. The
> handoff bit will simply be inherited by the next waiter.
> 
> Secondly, in the out_nolock path of rwsem_down_read_slowpath(), both
> the waiter and handoff bits are cleared if the wait queue becomes empty.
> For rwsem_down_write_slowpath(), however, the handoff bit is not checked
> and cleared if the wait queue is empty. This can potentially make the
> handoff bit set with empty wait queue.
> 
> To make the handoff bit handling more consistent and robust, extract
> out handoff bit clearing code into the new rwsem_del_waiter() helper
> function.  The common function will only use atomic_long_andnot() to
> clear bits when the wait queue is empty to avoid possible race condition.
we do have race condition needed to be fixed with this change.
> If the first waiter with handoff bit set is killed or interrupted to
> exit the slowpath without acquiring the lock, the next waiter will
> inherit the handoff bit.
> 
> While at it, simplify the trylock for loop in rwsem_down_write_slowpath()
> to make it easier to read.
> 
> Fixes: 4f23dbc1e657 ("locking/rwsem: Implement lock handoff to prevent lock starvation")
> Suggested-by: Peter Zijlstra <peterz@...radead.org>
> Signed-off-by: Waiman Long <longman@...hat.com>
> ---
>   kernel/locking/rwsem.c | 171 ++++++++++++++++++++---------------------
>   1 file changed, 85 insertions(+), 86 deletions(-)
> 
> diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
> index c51387a43265..e039cf1605af 100644
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -105,9 +105,9 @@
>    * atomic_long_cmpxchg() will be used to obtain writer lock.
>    *
>    * There are three places where the lock handoff bit may be set or cleared.
> - * 1) rwsem_mark_wake() for readers.
> - * 2) rwsem_try_write_lock() for writers.
> - * 3) Error path of rwsem_down_write_slowpath().
> + * 1) rwsem_mark_wake() for readers		-- set, clear
> + * 2) rwsem_try_write_lock() for writers	-- set, clear
> + * 3) rwsem_del_waiter()			-- clear
>    *
>    * For all the above cases, wait_lock will be held. A writer must also
>    * be the first one in the wait_list to be eligible for setting the handoff
> @@ -334,6 +334,9 @@ struct rwsem_waiter {
>   	struct task_struct *task;
>   	enum rwsem_waiter_type type;
>   	unsigned long timeout;
> +
> +	/* Writer only, not initialized in reader */
> +	bool handoff_set;
>   };
>   #define rwsem_first_waiter(sem) \
>   	list_first_entry(&sem->wait_list, struct rwsem_waiter, list)
> @@ -344,12 +347,6 @@ enum rwsem_wake_type {
>   	RWSEM_WAKE_READ_OWNED	/* Waker thread holds the read lock */
>   };
>   
> -enum writer_wait_state {
> -	WRITER_NOT_FIRST,	/* Writer is not first in wait list */
> -	WRITER_FIRST,		/* Writer is first in wait list     */
> -	WRITER_HANDOFF		/* Writer is first & handoff needed */
> -};
> -
>   /*
>    * The typical HZ value is either 250 or 1000. So set the minimum waiting
>    * time to at least 4ms or 1 jiffy (if it is higher than 4ms) in the wait
> @@ -365,6 +362,31 @@ enum writer_wait_state {
>    */
>   #define MAX_READERS_WAKEUP	0x100
>   
> +static inline void
> +rwsem_add_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
> +{
> +	lockdep_assert_held(&sem->wait_lock);
> +	list_add_tail(&waiter->list, &sem->wait_list);
> +	/* caller will set RWSEM_FLAG_WAITERS */
> +}
> +
> +/*
> + * Remove a waiter from the wait_list and clear flags.
> + *
> + * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of
> + * this function. Modify with care.
> + */
> +static inline void
> +rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
> +{
> +	lockdep_assert_held(&sem->wait_lock);
> +	list_del(&waiter->list);
> +	if (likely(!list_empty(&sem->wait_list)))
> +		return;
> +
> +	atomic_long_andnot(RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS, &sem->count);
> +}
> +
>   /*
>    * handle the lock release when processes blocked on it that can now run
>    * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
> @@ -376,6 +398,8 @@ enum writer_wait_state {
>    *   preferably when the wait_lock is released
>    * - woken process blocks are discarded from the list after having task zeroed
>    * - writers are only marked woken if downgrading is false
> + *
> + * Implies rwsem_del_waiter() for all woken readers.
>    */
>   static void rwsem_mark_wake(struct rw_semaphore *sem,
>   			    enum rwsem_wake_type wake_type,
> @@ -490,18 +514,25 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
>   
>   	adjustment = woken * RWSEM_READER_BIAS - adjustment;
>   	lockevent_cond_inc(rwsem_wake_reader, woken);
> +
> +	oldcount = atomic_long_read(&sem->count);
>   	if (list_empty(&sem->wait_list)) {
> -		/* hit end of list above */
> +		/*
> +		 * Combined with list_move_tail() above, this implies
> +		 * rwsem_del_waiter().
> +		 */
>   		adjustment -= RWSEM_FLAG_WAITERS;
> +		if (oldcount & RWSEM_FLAG_HANDOFF)
> +			adjustment -= RWSEM_FLAG_HANDOFF;
> +	} else if (woken) {
> +		/*
> +		 * When we've woken a reader, we no longer need to force
> +		 * writers to give up the lock and we can clear HANDOFF.
> +		 */
> +		if (oldcount & RWSEM_FLAG_HANDOFF)
> +			adjustment -= RWSEM_FLAG_HANDOFF;
>   	}
>   
> -	/*
> -	 * When we've woken a reader, we no longer need to force writers
> -	 * to give up the lock and we can clear HANDOFF.
> -	 */
> -	if (woken && (atomic_long_read(&sem->count) & RWSEM_FLAG_HANDOFF))
> -		adjustment -= RWSEM_FLAG_HANDOFF;
> -
>   	if (adjustment)
>   		atomic_long_add(adjustment, &sem->count);
>   
> @@ -532,12 +563,12 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
>    * race conditions between checking the rwsem wait list and setting the
>    * sem->count accordingly.
>    *
> - * If wstate is WRITER_HANDOFF, it will make sure that either the handoff
> - * bit is set or the lock is acquired with handoff bit cleared.
> + * Implies rwsem_del_waiter() on success.
>    */
>   static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
> -					enum writer_wait_state wstate)
> +					struct rwsem_waiter *waiter)
>   {
> +	bool first = rwsem_first_waiter(sem) == waiter;
>   	long count, new;
>   
>   	lockdep_assert_held(&sem->wait_lock);
> @@ -546,13 +577,19 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
>   	do {
>   		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
>   
> -		if (has_handoff && wstate == WRITER_NOT_FIRST)
> -			return false;
> +		if (has_handoff) {
> +			if (!first)
> +				return false;
> +
> +			/* First waiter inherits a previously set handoff bit */
> +			waiter->handoff_set = true;
> +		}
>   
>   		new = count;
>   
>   		if (count & RWSEM_LOCK_MASK) {
> -			if (has_handoff || (wstate != WRITER_HANDOFF))
> +			if (has_handoff || (!rt_task(waiter->task) &&
> +					    !time_after(jiffies, waiter->timeout)))
>   				return false;
>   
>   			new |= RWSEM_FLAG_HANDOFF;
> @@ -569,9 +606,17 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
>   	 * We have either acquired the lock with handoff bit cleared or
>   	 * set the handoff bit.
>   	 */
> -	if (new & RWSEM_FLAG_HANDOFF)
> +	if (new & RWSEM_FLAG_HANDOFF) {
> +		waiter->handoff_set = true;
> +		lockevent_inc(rwsem_wlock_handoff);
>   		return false;
> +	}
>   
> +	/*
> +	 * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
> +	 * success.
> +	 */
> +	list_del(&waiter->list);
>   	rwsem_set_owner(sem);
>   	return true;
>   }
> @@ -956,7 +1001,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
>   		}
>   		adjustment += RWSEM_FLAG_WAITERS;
>   	}
> -	list_add_tail(&waiter.list, &sem->wait_list);
> +	rwsem_add_waiter(sem, &waiter);
>   
>   	/* we're now waiting on the lock, but no longer actively locking */
>   	count = atomic_long_add_return(adjustment, &sem->count);
> @@ -1002,11 +1047,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
>   	return sem;
>   
>   out_nolock:
> -	list_del(&waiter.list);
> -	if (list_empty(&sem->wait_list)) {
> -		atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF,
> -				   &sem->count);
> -	}
> +	rwsem_del_waiter(sem, &waiter);
>   	raw_spin_unlock_irq(&sem->wait_lock);
>   	__set_current_state(TASK_RUNNING);
>   	lockevent_inc(rwsem_rlock_fail);
> @@ -1020,9 +1061,7 @@ static struct rw_semaphore *
>   rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
>   {
>   	long count;
> -	enum writer_wait_state wstate;
>   	struct rwsem_waiter waiter;
> -	struct rw_semaphore *ret = sem;
>   	DEFINE_WAKE_Q(wake_q);
>   
>   	/* do optimistic spinning and steal lock if possible */
> @@ -1038,16 +1077,13 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
>   	waiter.task = current;
>   	waiter.type = RWSEM_WAITING_FOR_WRITE;
>   	waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT;
> +	waiter.handoff_set = false;
>   
>   	raw_spin_lock_irq(&sem->wait_lock);
> -
> -	/* account for this before adding a new element to the list */
> -	wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST;
> -
> -	list_add_tail(&waiter.list, &sem->wait_list);
> +	rwsem_add_waiter(sem, &waiter);
>   
>   	/* we're now waiting on the lock */
> -	if (wstate == WRITER_NOT_FIRST) {
> +	if (rwsem_first_waiter(sem) != &waiter) {
>   		count = atomic_long_read(&sem->count);
>   
>   		/*
> @@ -1083,13 +1119,16 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
>   	/* wait until we successfully acquire the lock */
>   	set_current_state(state);
>   	for (;;) {
> -		if (rwsem_try_write_lock(sem, wstate)) {
> +		if (rwsem_try_write_lock(sem, &waiter)) {
>   			/* rwsem_try_write_lock() implies ACQUIRE on success */
>   			break;
>   		}
>   
>   		raw_spin_unlock_irq(&sem->wait_lock);
>   
> +		if (signal_pending_state(state, current))
> +			goto out_nolock;
> +
>   		/*
>   		 * After setting the handoff bit and failing to acquire
>   		 * the lock, attempt to spin on owner to accelerate lock
> @@ -1098,7 +1137,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
>   		 * In this case, we attempt to acquire the lock again
>   		 * without sleeping.
>   		 */
> -		if (wstate == WRITER_HANDOFF) {
> +		if (waiter.handoff_set) {
>   			enum owner_state owner_state;
>   
>   			preempt_disable();
> @@ -1109,66 +1148,26 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
>   				goto trylock_again;
>   		}
>   
> -		/* Block until there are no active lockers. */
> -		for (;;) {
> -			if (signal_pending_state(state, current))
> -				goto out_nolock;
> -
> -			schedule();
> -			lockevent_inc(rwsem_sleep_writer);
> -			set_current_state(state);
> -			/*
> -			 * If HANDOFF bit is set, unconditionally do
> -			 * a trylock.
> -			 */
> -			if (wstate == WRITER_HANDOFF)
> -				break;
> -
> -			if ((wstate == WRITER_NOT_FIRST) &&
> -			    (rwsem_first_waiter(sem) == &waiter))
> -				wstate = WRITER_FIRST;
> -
> -			count = atomic_long_read(&sem->count);
> -			if (!(count & RWSEM_LOCK_MASK))
> -				break;
> -
> -			/*
> -			 * The setting of the handoff bit is deferred
> -			 * until rwsem_try_write_lock() is called.
> -			 */
> -			if ((wstate == WRITER_FIRST) && (rt_task(current) ||
> -			    time_after(jiffies, waiter.timeout))) {
> -				wstate = WRITER_HANDOFF;
> -				lockevent_inc(rwsem_wlock_handoff);
> -				break;
> -			}
> -		}
> +		schedule();
> +		lockevent_inc(rwsem_sleep_writer);
> +		set_current_state(state);
>   trylock_again:
>   		raw_spin_lock_irq(&sem->wait_lock);
>   	}
>   	__set_current_state(TASK_RUNNING);
> -	list_del(&waiter.list);
>   	raw_spin_unlock_irq(&sem->wait_lock);
>   	lockevent_inc(rwsem_wlock);
> -
> -	return ret;
> +	return sem;
>   
>   out_nolock:
>   	__set_current_state(TASK_RUNNING);
>   	raw_spin_lock_irq(&sem->wait_lock);
> -	list_del(&waiter.list);
> -
> -	if (unlikely(wstate == WRITER_HANDOFF))
> -		atomic_long_add(-RWSEM_FLAG_HANDOFF,  &sem->count);
> -
> -	if (list_empty(&sem->wait_list))
> -		atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count);
> -	else
> +	rwsem_del_waiter(sem, &waiter);
> +	if (!list_empty(&sem->wait_list))
>   		rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q);
>   	raw_spin_unlock_irq(&sem->wait_lock);
>   	wake_up_q(&wake_q);
>   	lockevent_inc(rwsem_wlock_fail);
> -
>   	return ERR_PTR(-EINTR);
>   }
>   
> 


-- 
Thx and BRs,
Aiqun(Maria) Yu

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ