[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <bf2948c4-dd6a-1cf6-16b5-39e5e17ef72a@redhat.com>
Date: Thu, 23 Feb 2023 16:38:08 -0500
From: Waiman Long <longman@...hat.com>
To: Peter Zijlstra <peterz@...radead.org>, mingo@...hat.com,
will@...nel.org
Cc: linux-kernel@...r.kernel.org, boqun.feng@...il.com
Subject: Re: [PATCH 3/6] locking/rwsem: Rework writer wakeup
On 2/23/23 07:26, Peter Zijlstra wrote:
> Currently readers and writers have distinctly different wait/wake
> methods. For readers the ->count adjustment happens on the wakeup
> side, while for writers the ->count adjustment happens on the wait
> side.
>
> This asymmetry is unfortunate since the wake side has an additional
> guarantee -- specifically, the wake side has observed the unlocked
> state, and thus it can know that speculative READER_BIAS perbutations
> on ->count are just that, they will be undone.
>
> Additionally, unifying the wait/wake methods allows sharing code.
>
> As such, do a straight-forward transform of the writer wakeup into the
> wake side.
>
> Signed-off-by: Peter Zijlstra (Intel) <peterz@...radead.org>
> ---
> kernel/locking/rwsem.c | 253 ++++++++++++++++++++++---------------------------
> 1 file changed, 115 insertions(+), 138 deletions(-)
>
> --- a/kernel/locking/rwsem.c
> +++ b/kernel/locking/rwsem.c
> @@ -107,7 +107,7 @@
> *
> * There are three places where the lock handoff bit may be set or cleared.
> * 1) rwsem_mark_wake() for readers -- set, clear
> - * 2) rwsem_try_write_lock() for writers -- set, clear
> + * 2) rwsem_writer_wake() for writers -- set, clear
> * 3) rwsem_del_waiter() -- clear
> *
> * For all the above cases, wait_lock will be held. A writer must also
> @@ -377,7 +377,7 @@ rwsem_add_waiter(struct rw_semaphore *se
> /*
> * Remove a waiter from the wait_list and clear flags.
> *
> - * Both rwsem_mark_wake() and rwsem_try_write_lock() contain a full 'copy' of
> + * Both rwsem_mark_wake() and rwsem_writer_wake() contain a full 'copy' of
> * this function. Modify with care.
> *
> * Return: true if wait_list isn't empty and false otherwise
> @@ -394,6 +394,100 @@ rwsem_del_waiter(struct rw_semaphore *se
> return false;
> }
>
> +static inline void
> +rwsem_waiter_wake(struct rwsem_waiter *waiter, struct wake_q_head *wake_q)
> +{
> + struct task_struct *tsk;
> +
> + tsk = waiter->task;
> + get_task_struct(tsk);
> +
> + /*
> + * Ensure calling get_task_struct() before setting the reader
> + * waiter to nil such that rwsem_down_read_slowpath() cannot
> + * race with do_exit() by always holding a reference count
> + * to the task to wakeup.
> + */
> + smp_store_release(&waiter->task, NULL);
> + /*
> + * Ensure issuing the wakeup (either by us or someone else)
> + * after setting the reader waiter to nil.
> + */
> + wake_q_add_safe(wake_q, tsk);
> +}
> +
> +/*
> + * This function must be called with the sem->wait_lock held to prevent
> + * race conditions between checking the rwsem wait list and setting the
> + * sem->count accordingly.
> + *
> + * Implies rwsem_del_waiter() on success.
> + */
> +static void rwsem_writer_wake(struct rw_semaphore *sem,
> + struct rwsem_waiter *waiter,
> + struct wake_q_head *wake_q)
> +{
> + struct rwsem_waiter *first = rwsem_first_waiter(sem);
> + long count, new;
> +
> + lockdep_assert_held(&sem->wait_lock);
> +
> + count = atomic_long_read(&sem->count);
> + do {
> + bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
> +
> + if (has_handoff) {
> + /*
> + * Honor handoff bit and yield only when the first
> + * waiter is the one that set it. Otherwisee, we
> + * still try to acquire the rwsem.
> + */
> + if (first->handoff_set && (waiter != first))
> + return;
> + }
This "if" statement if for a non-first waiter that somehow got woken up
to have a chance to steal the lock. Now the handoff is done in the wake
side for the first waiter, this "if" statement is not applicable and can
be removed.
> +
> + new = count;
> +
> + if (count & RWSEM_LOCK_MASK) {
> + /*
> + * A waiter (first or not) can set the handoff bit
> + * if it is an RT task or wait in the wait queue
> + * for too long.
> + */
> + if (has_handoff || (!rt_task(waiter->task) &&
> + !time_after(jiffies, waiter->timeout)))
> + return;
> +
> + new |= RWSEM_FLAG_HANDOFF;
> + } else {
> + new |= RWSEM_WRITER_LOCKED;
> + new &= ~RWSEM_FLAG_HANDOFF;
> +
> + if (list_is_singular(&sem->wait_list))
> + new &= ~RWSEM_FLAG_WAITERS;
> + }
> + } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
> +
> + /*
> + * We have either acquired the lock with handoff bit cleared or set
> + * the handoff bit. Only the first waiter can have its handoff_set
> + * set here to enable optimistic spinning in slowpath loop.
> + */
> + if (new & RWSEM_FLAG_HANDOFF) {
> + first->handoff_set = true;
> + lockevent_inc(rwsem_wlock_handoff);
> + return;
> + }
> +
> + /*
> + * Have rwsem_writer_wake() fully imply rwsem_del_waiter() on
> + * success.
> + */
> + list_del(&waiter->list);
> + rwsem_set_owner(sem);
> + rwsem_waiter_wake(waiter, wake_q);
> +}
> +
> /*
> * handle the lock release when processes blocked on it that can now run
> * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
> @@ -424,23 +518,12 @@ static void rwsem_mark_wake(struct rw_se
> */
> waiter = rwsem_first_waiter(sem);
>
> - if (waiter->type != RWSEM_WAITING_FOR_WRITE)
> - goto wake_readers;
> -
> - if (wake_type == RWSEM_WAKE_ANY) {
> - /*
> - * Mark writer at the front of the queue for wakeup.
> - * Until the task is actually later awoken later by
> - * the caller, other writers are able to steal it.
> - * Readers, on the other hand, will block as they
> - * will notice the queued writer.
> - */
> - wake_q_add(wake_q, waiter->task);
> - lockevent_inc(rwsem_wake_writer);
> + if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
> + if (wake_type == RWSEM_WAKE_ANY)
> + rwsem_writer_wake(sem, waiter, wake_q);
> + return;
> }
> - return;
>
> -wake_readers:
> /*
> * No reader wakeup if there are too many of them already.
> */
> @@ -547,25 +630,8 @@ static void rwsem_mark_wake(struct rw_se
> atomic_long_add(adjustment, &sem->count);
>
> /* 2nd pass */
> - list_for_each_entry_safe(waiter, tmp, &wlist, list) {
> - struct task_struct *tsk;
> -
> - tsk = waiter->task;
> - get_task_struct(tsk);
> -
> - /*
> - * Ensure calling get_task_struct() before setting the reader
> - * waiter to nil such that rwsem_down_read_slowpath() cannot
> - * race with do_exit() by always holding a reference count
> - * to the task to wakeup.
> - */
> - smp_store_release(&waiter->task, NULL);
> - /*
> - * Ensure issuing the wakeup (either by us or someone else)
> - * after setting the reader waiter to nil.
> - */
> - wake_q_add_safe(wake_q, tsk);
> - }
> + list_for_each_entry_safe(waiter, tmp, &wlist, list)
> + rwsem_waiter_wake(waiter, wake_q);
> }
>
> /*
> @@ -596,77 +662,6 @@ rwsem_del_wake_waiter(struct rw_semaphor
> }
>
> /*
> - * This function must be called with the sem->wait_lock held to prevent
> - * race conditions between checking the rwsem wait list and setting the
> - * sem->count accordingly.
> - *
> - * Implies rwsem_del_waiter() on success.
> - */
> -static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
> - struct rwsem_waiter *waiter)
> -{
> - struct rwsem_waiter *first = rwsem_first_waiter(sem);
> - long count, new;
> -
> - lockdep_assert_held(&sem->wait_lock);
> -
> - count = atomic_long_read(&sem->count);
> - do {
> - bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
> -
> - if (has_handoff) {
> - /*
> - * Honor handoff bit and yield only when the first
> - * waiter is the one that set it. Otherwisee, we
> - * still try to acquire the rwsem.
> - */
> - if (first->handoff_set && (waiter != first))
> - return false;
> - }
> -
> - new = count;
> -
> - if (count & RWSEM_LOCK_MASK) {
> - /*
> - * A waiter (first or not) can set the handoff bit
> - * if it is an RT task or wait in the wait queue
> - * for too long.
> - */
> - if (has_handoff || (!rt_task(waiter->task) &&
> - !time_after(jiffies, waiter->timeout)))
> - return false;
> -
> - new |= RWSEM_FLAG_HANDOFF;
> - } else {
> - new |= RWSEM_WRITER_LOCKED;
> - new &= ~RWSEM_FLAG_HANDOFF;
> -
> - if (list_is_singular(&sem->wait_list))
> - new &= ~RWSEM_FLAG_WAITERS;
> - }
> - } while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
> -
> - /*
> - * We have either acquired the lock with handoff bit cleared or set
> - * the handoff bit. Only the first waiter can have its handoff_set
> - * set here to enable optimistic spinning in slowpath loop.
> - */
> - if (new & RWSEM_FLAG_HANDOFF) {
> - first->handoff_set = true;
> - lockevent_inc(rwsem_wlock_handoff);
> - return false;
> - }
> -
> - /*
> - * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
> - * success.
> - */
> - list_del(&waiter->list);
> - rwsem_set_owner(sem);
> - return true;
> -}
> -
> -/*
> * The rwsem_spin_on_owner() function returns the following 4 values
> * depending on the lock owner state.
> * OWNER_NULL : owner is currently NULL
> @@ -1072,7 +1067,7 @@ rwsem_down_read_slowpath(struct rw_semap
> for (;;) {
> set_current_state(state);
> if (!smp_load_acquire(&waiter.task)) {
> - /* Matches rwsem_mark_wake()'s smp_store_release(). */
> + /* Matches rwsem_waiter_wake()'s smp_store_release(). */
> break;
> }
> if (signal_pending_state(state, current)) {
> @@ -1143,54 +1138,36 @@ rwsem_down_write_slowpath(struct rw_sema
> } else {
> atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
> }
> + raw_spin_unlock_irq(&sem->wait_lock);
>
> /* wait until we successfully acquire the lock */
> - set_current_state(state);
> trace_contention_begin(sem, LCB_F_WRITE);
>
> for (;;) {
> - if (rwsem_try_write_lock(sem, &waiter)) {
> - /* rwsem_try_write_lock() implies ACQUIRE on success */
> + set_current_state(state);
> + if (!smp_load_acquire(&waiter.task)) {
> + /* Matches rwsem_waiter_wake()'s smp_store_release(). */
> break;
> }
> -
> - raw_spin_unlock_irq(&sem->wait_lock);
> -
> - if (signal_pending_state(state, current))
> - goto out_nolock;
> -
> - /*
> - * After setting the handoff bit and failing to acquire
> - * the lock, attempt to spin on owner to accelerate lock
> - * transfer. If the previous owner is a on-cpu writer and it
> - * has just released the lock, OWNER_NULL will be returned.
> - * In this case, we attempt to acquire the lock again
> - * without sleeping.
> - */
> - if (waiter.handoff_set) {
> - enum owner_state owner_state;
> -
> - owner_state = rwsem_spin_on_owner(sem);
> - if (owner_state == OWNER_NULL)
> - goto trylock_again;
> + if (signal_pending_state(state, current)) {
> + raw_spin_lock_irq(&sem->wait_lock);
> + if (waiter.task)
> + goto out_nolock;
> + raw_spin_unlock_irq(&sem->wait_lock);
> + /* Ordered by sem->wait_lock against rwsem_mark_wake(). */
> + break;
> }
> -
> schedule_preempt_disabled();
> lockevent_inc(rwsem_sleep_writer);
> - set_current_state(state);
> -trylock_again:
> - raw_spin_lock_irq(&sem->wait_lock);
> }
> __set_current_state(TASK_RUNNING);
> - raw_spin_unlock_irq(&sem->wait_lock);
> lockevent_inc(rwsem_wlock);
> trace_contention_end(sem, 0);
> return sem;
>
> out_nolock:
> - __set_current_state(TASK_RUNNING);
> - raw_spin_lock_irq(&sem->wait_lock);
> rwsem_del_wake_waiter(sem, &waiter, &wake_q);
> + __set_current_state(TASK_RUNNING);
> lockevent_inc(rwsem_wlock_fail);
> trace_contention_end(sem, -EINTR);
> return ERR_PTR(-EINTR);
I believe it is better to change state inside the wait_lock critical
section to provide a release barrier for free.
Cheers,
Longman
Powered by blists - more mailing lists