lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <bf2948c4-dd6a-1cf6-16b5-39e5e17ef72a@redhat.com>
Date:   Thu, 23 Feb 2023 16:38:08 -0500
From:   Waiman Long <longman@...hat.com>
To:     Peter Zijlstra <peterz@...radead.org>, mingo@...hat.com,
        will@...nel.org
Cc:     linux-kernel@...r.kernel.org, boqun.feng@...il.com
Subject: Re: [PATCH 3/6] locking/rwsem: Rework writer wakeup

On 2/23/23 07:26, Peter Zijlstra wrote:
> Currently readers and writers have distinctly different wait/wake
> methods. For readers the ->count adjustment happens on the wakeup
> side, while for writers the ->count adjustment happens on the wait
> side.
>
> This asymmetry is unfortunate since the wake side has an additional
> guarantee -- specifically, the wake side has observed the unlocked
> state, and thus it can know that speculative READER_BIAS perbutations
> on ->count are just that, they will be undone.
>
> Additionally, unifying the wait/wake methods allows sharing code.
>
> As such, do a straight-forward transform of the writer wakeup into the
> wake side.
>
> Signed-off-by: Peter Zijlstra (Intel) <peterz@...radead.org>
> ---
>   kernel/locking/rwsem.c |  253 ++++++++++++++++++++++---------------------------
>   1 file changed, 115 insertions(+), 138 deletions(-)
>
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -107,7 +107,7 @@
>    *
>    * There are three places where the lock handoff bit may be set or cleared.
>    * 1) rwsem_mark_wake() for readers		-- set, clear
> - * 2) rwsem_try_write_lock() for writers	-- set, clear
> + * 2) rwsem_writer_wake() for writers	-- set, clear
>    * 3) rwsem_del_waiter()			-- clear
>    *
>    * For all the above cases, wait_lock will be held. A writer must also
> @@ -377,7 +377,7 @@ rwsem_add_waiter(struct rw_semaphore *se
>   /*
>    * Remove a waiter from the wait_list and clear flags.
>    *
> - * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of
> + * Both rwsem_mark_wake() and rwsem_writer_wake() contain a full 'copy' of
>    * this function. Modify with care.
>    *
>    * Return: true if wait_list isn't empty and false otherwise
> @@ -394,6 +394,100 @@ rwsem_del_waiter(struct rw_semaphore *se
>   	return false;
>   }
>   
> +static inline void
> +rwsem_waiter_wake(struct rwsem_waiter *waiter, struct wake_q_head *wake_q)
> +{
> +	struct task_struct *tsk;
> +
> +	tsk = waiter->task;
> +	get_task_struct(tsk);
> +
> +	/*
> +	 * Ensure calling get_task_struct() before setting the reader
> +	 * waiter to nil such that rwsem_down_read_slowpath() cannot
> +	 * race with do_exit() by always holding a reference count
> +	 * to the task to wakeup.
> +	 */
> +	smp_store_release(&waiter->task, NULL);
> +	/*
> +	 * Ensure issuing the wakeup (either by us or someone else)
> +	 * after setting the reader waiter to nil.
> +	 */
> +	wake_q_add_safe(wake_q, tsk);
> +}
> +
> +/*
> + * This function must be called with the sem->wait_lock held to prevent
> + * race conditions between checking the rwsem wait list and setting the
> + * sem->count accordingly.
> + *
> + * Implies rwsem_del_waiter() on success.
> + */
> +static void rwsem_writer_wake(struct rw_semaphore *sem,
> +			      struct rwsem_waiter *waiter,
> +			      struct wake_q_head *wake_q)
> +{
> +	struct rwsem_waiter *first = rwsem_first_waiter(sem);
> +	long count, new;
> +
> +	lockdep_assert_held(&sem->wait_lock);
> +
> +	count = atomic_long_read(&sem->count);
> +	do {
> +		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
> +
> +		if (has_handoff) {
> +			/*
> +			 * Honor handoff bit and yield only when the first
> +			 * waiter is the one that set it. Otherwisee, we
> +			 * still try to acquire the rwsem.
> +			 */
> +			if (first->handoff_set && (waiter != first))
> +				return;
> +		}
This "if" statement if for a non-first waiter that somehow got woken up 
to have a chance to steal the lock. Now the handoff is done in the wake 
side for the first waiter, this "if" statement is not applicable and can 
be removed.
> +
> +		new = count;
> +
> +		if (count & RWSEM_LOCK_MASK) {
> +			/*
> +			 * A waiter (first or not) can set the handoff bit
> +			 * if it is an RT task or wait in the wait queue
> +			 * for too long.
> +			 */
> +			if (has_handoff || (!rt_task(waiter->task) &&
> +					    !time_after(jiffies, waiter->timeout)))
> +				return;
> +
> +			new |= RWSEM_FLAG_HANDOFF;
> +		} else {
> +			new |= RWSEM_WRITER_LOCKED;
> +			new &= ~RWSEM_FLAG_HANDOFF;
> +
> +			if (list_is_singular(&sem->wait_list))
> +				new &= ~RWSEM_FLAG_WAITERS;
> +		}
> +	} while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
> +
> +	/*
> +	 * We have either acquired the lock with handoff bit cleared or set
> +	 * the handoff bit. Only the first waiter can have its handoff_set
> +	 * set here to enable optimistic spinning in slowpath loop.
> +	 */
> +	if (new & RWSEM_FLAG_HANDOFF) {
> +		first->handoff_set = true;
> +		lockevent_inc(rwsem_wlock_handoff);
> +		return;
> +	}
> +
> +	/*
> +	 * Have rwsem_writer_wake() fully imply rwsem_del_waiter() on
> +	 * success.
> +	 */
> +	list_del(&waiter->list);
> +	rwsem_set_owner(sem);
> +	rwsem_waiter_wake(waiter, wake_q);
> +}
> +
>   /*
>    * handle the lock release when processes blocked on it that can now run
>    * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
> @@ -424,23 +518,12 @@ static void rwsem_mark_wake(struct rw_se
>   	 */
>   	waiter = rwsem_first_waiter(sem);
>   
> -	if (waiter->type != RWSEM_WAITING_FOR_WRITE)
> -		goto wake_readers;
> -
> -	if (wake_type == RWSEM_WAKE_ANY) {
> -		/*
> -		 * Mark writer at the front of the queue for wakeup.
> -		 * Until the task is actually later awoken later by
> -		 * the caller, other writers are able to steal it.
> -		 * Readers, on the other hand, will block as they
> -		 * will notice the queued writer.
> -		 */
> -		wake_q_add(wake_q, waiter->task);
> -		lockevent_inc(rwsem_wake_writer);
> +	if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
> +		if (wake_type == RWSEM_WAKE_ANY)
> +			rwsem_writer_wake(sem, waiter, wake_q);
> +		return;
>   	}
> -	return;
>   
> -wake_readers:
>   	/*
>   	 * No reader wakeup if there are too many of them already.
>   	 */
> @@ -547,25 +630,8 @@ static void rwsem_mark_wake(struct rw_se
>   		atomic_long_add(adjustment, &sem->count);
>   
>   	/* 2nd pass */
> -	list_for_each_entry_safe(waiter, tmp, &wlist, list) {
> -		struct task_struct *tsk;
> -
> -		tsk = waiter->task;
> -		get_task_struct(tsk);
> -
> -		/*
> -		 * Ensure calling get_task_struct() before setting the reader
> -		 * waiter to nil such that rwsem_down_read_slowpath() cannot
> -		 * race with do_exit() by always holding a reference count
> -		 * to the task to wakeup.
> -		 */
> -		smp_store_release(&waiter->task, NULL);
> -		/*
> -		 * Ensure issuing the wakeup (either by us or someone else)
> -		 * after setting the reader waiter to nil.
> -		 */
> -		wake_q_add_safe(wake_q, tsk);
> -	}
> +	list_for_each_entry_safe(waiter, tmp, &wlist, list)
> +		rwsem_waiter_wake(waiter, wake_q);
>   }
>   
>   /*
> @@ -596,77 +662,6 @@ rwsem_del_wake_waiter(struct rw_semaphor
>   }
>   
>   /*
> - * This function must be called with the sem->wait_lock held to prevent
> - * race conditions between checking the rwsem wait list and setting the
> - * sem->count accordingly.
> - *
> - * Implies rwsem_del_waiter() on success.
> - */
> -static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
> -					struct rwsem_waiter *waiter)
> -{
> -	struct rwsem_waiter *first = rwsem_first_waiter(sem);
> -	long count, new;
> -
> -	lockdep_assert_held(&sem->wait_lock);
> -
> -	count = atomic_long_read(&sem->count);
> -	do {
> -		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
> -
> -		if (has_handoff) {
> -			/*
> -			 * Honor handoff bit and yield only when the first
> -			 * waiter is the one that set it. Otherwisee, we
> -			 * still try to acquire the rwsem.
> -			 */
> -			if (first->handoff_set && (waiter != first))
> -				return false;
> -		}
> -
> -		new = count;
> -
> -		if (count & RWSEM_LOCK_MASK) {
> -			/*
> -			 * A waiter (first or not) can set the handoff bit
> -			 * if it is an RT task or wait in the wait queue
> -			 * for too long.
> -			 */
> -			if (has_handoff || (!rt_task(waiter->task) &&
> -					    !time_after(jiffies, waiter->timeout)))
> -				return false;
> -
> -			new |= RWSEM_FLAG_HANDOFF;
> -		} else {
> -			new |= RWSEM_WRITER_LOCKED;
> -			new &= ~RWSEM_FLAG_HANDOFF;
> -
> -			if (list_is_singular(&sem->wait_list))
> -				new &= ~RWSEM_FLAG_WAITERS;
> -		}
> -	} while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
> -
> -	/*
> -	 * We have either acquired the lock with handoff bit cleared or set
> -	 * the handoff bit. Only the first waiter can have its handoff_set
> -	 * set here to enable optimistic spinning in slowpath loop.
> -	 */
> -	if (new & RWSEM_FLAG_HANDOFF) {
> -		first->handoff_set = true;
> -		lockevent_inc(rwsem_wlock_handoff);
> -		return false;
> -	}
> -
> -	/*
> -	 * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
> -	 * success.
> -	 */
> -	list_del(&waiter->list);
> -	rwsem_set_owner(sem);
> -	return true;
> -}
> -
> -/*
>    * The rwsem_spin_on_owner() function returns the following 4 values
>    * depending on the lock owner state.
>    *   OWNER_NULL  : owner is currently NULL
> @@ -1072,7 +1067,7 @@ rwsem_down_read_slowpath(struct rw_semap
>   	for (;;) {
>   		set_current_state(state);
>   		if (!smp_load_acquire(&waiter.task)) {
> -			/* Matches rwsem_mark_wake()'s smp_store_release(). */
> +			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
>   			break;
>   		}
>   		if (signal_pending_state(state, current)) {
> @@ -1143,54 +1138,36 @@ rwsem_down_write_slowpath(struct rw_sema
>   	} else {
>   		atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
>   	}
> +	raw_spin_unlock_irq(&sem->wait_lock);
>   
>   	/* wait until we successfully acquire the lock */
> -	set_current_state(state);
>   	trace_contention_begin(sem, LCB_F_WRITE);
>   
>   	for (;;) {
> -		if (rwsem_try_write_lock(sem, &waiter)) {
> -			/* rwsem_try_write_lock() implies ACQUIRE on success */
> +		set_current_state(state);
> +		if (!smp_load_acquire(&waiter.task)) {
> +			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
>   			break;
>   		}
> -
> -		raw_spin_unlock_irq(&sem->wait_lock);
> -
> -		if (signal_pending_state(state, current))
> -			goto out_nolock;
> -
> -		/*
> -		 * After setting the handoff bit and failing to acquire
> -		 * the lock, attempt to spin on owner to accelerate lock
> -		 * transfer. If the previous owner is a on-cpu writer and it
> -		 * has just released the lock, OWNER_NULL will be returned.
> -		 * In this case, we attempt to acquire the lock again
> -		 * without sleeping.
> -		 */
> -		if (waiter.handoff_set) {
> -			enum owner_state owner_state;
> -
> -			owner_state = rwsem_spin_on_owner(sem);
> -			if (owner_state == OWNER_NULL)
> -				goto trylock_again;
> +		if (signal_pending_state(state, current)) {
> +			raw_spin_lock_irq(&sem->wait_lock);
> +			if (waiter.task)
> +				goto out_nolock;
> +			raw_spin_unlock_irq(&sem->wait_lock);
> +			/* Ordered by sem->wait_lock against rwsem_mark_wake(). */
> +			break;
>   		}
> -
>   		schedule_preempt_disabled();
>   		lockevent_inc(rwsem_sleep_writer);
> -		set_current_state(state);
> -trylock_again:
> -		raw_spin_lock_irq(&sem->wait_lock);
>   	}
>   	__set_current_state(TASK_RUNNING);
> -	raw_spin_unlock_irq(&sem->wait_lock);
>   	lockevent_inc(rwsem_wlock);
>   	trace_contention_end(sem, 0);
>   	return sem;
>   
>   out_nolock:
> -	__set_current_state(TASK_RUNNING);
> -	raw_spin_lock_irq(&sem->wait_lock);
>   	rwsem_del_wake_waiter(sem, &waiter, &wake_q);
> +	__set_current_state(TASK_RUNNING);
>   	lockevent_inc(rwsem_wlock_fail);
>   	trace_contention_end(sem, -EINTR);
>   	return ERR_PTR(-EINTR);

I believe it is better to change state inside the wait_lock critical 
section to provide a release barrier for free.

Cheers,
Longman

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ