From 1c76a9c1b9d16d0ceb07f643803035177b4042a5 Mon Sep 17 00:00:00 2001 From: Waiman Long Date: Thu, 11 Nov 2021 13:49:35 -0500 Subject: [PATCH] locking/rwsem: Make handoff bit handling more consistent There are some inconsistency in the way that the handoff bit is being handled in readers and writers. Firstly, when a queue head writer set the handoff bit, it will clear it when the writer is being killed or interrupted on its way out without acquiring the lock. That is not the case for a queue head reader. The handoff bit will simply be inherited by the next waiter. Secondly, in the out_nolock path of rwsem_down_read_slowpath(), both the waiter and handoff bits are cleared if the wait queue becomes empty. For rwsem_down_write_slowpath(), however, the handoff bit is not checked and cleared if the wait queue is empty. This can potentially make the handoff bit set with empty wait queue. To make the handoff bit handling more consistent and robust, extract out the rwsem flags handling code into a commont rwsem_out_nolock() function and call it from both the reader and writer's out_nolock paths. The common function will only use atomic_long_andnot() to clear bits to avoid possible race condition. This will elminate the handoff bit set with empty wait queue case as well as the possible race condition that may screw up the count value. More states are stored in rwsem_waiter structure and writer handoff bit setting are all pushed to rwsem_try_write_lock(). This simplifies the trylock loop in rwsem_down_write_slowpath(). Fixes: 4f23dbc1e657 ("locking/rwsem: Implement lock handoff to prevent lock starvation") Suggested-by: Peter Zijlstra Signed-off-by: Waiman Long --- kernel/locking/rwsem.c | 109 ++++++++++++++++++----------------------- 1 file changed, 49 insertions(+), 60 deletions(-) diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c index c51387a43265..b5fe21d5916d 100644 --- a/kernel/locking/rwsem.c +++ b/kernel/locking/rwsem.c @@ -104,10 +104,11 @@ * atomic_long_fetch_add() is used to obtain reader lock, whereas * atomic_long_cmpxchg() will be used to obtain writer lock. * - * There are three places where the lock handoff bit may be set or cleared. - * 1) rwsem_mark_wake() for readers. - * 2) rwsem_try_write_lock() for writers. - * 3) Error path of rwsem_down_write_slowpath(). + * There are four places where the lock handoff bit may be set or cleared. + * 1) rwsem_mark_wake() for readers -- set, clear + * 2) rwsem_try_write_lock() for writers -- set, clear + * 3) Error path of rwsem_down_write_slowpath() -- clear + * 4) Error path of rwsem_down_read_slowpath() -- clear * * For all the above cases, wait_lock will be held. A writer must also * be the first one in the wait_list to be eligible for setting the handoff @@ -334,6 +335,7 @@ struct rwsem_waiter { struct task_struct *task; enum rwsem_waiter_type type; unsigned long timeout; + bool handoff_set, rt_task; }; #define rwsem_first_waiter(sem) \ list_first_entry(&sem->wait_list, struct rwsem_waiter, list) @@ -344,12 +346,6 @@ enum rwsem_wake_type { RWSEM_WAKE_READ_OWNED /* Waker thread holds the read lock */ }; -enum writer_wait_state { - WRITER_NOT_FIRST, /* Writer is not first in wait list */ - WRITER_FIRST, /* Writer is first in wait list */ - WRITER_HANDOFF /* Writer is first & handoff needed */ -}; - /* * The typical HZ value is either 250 or 1000. So set the minimum waiting * time to at least 4ms or 1 jiffy (if it is higher than 4ms) in the wait @@ -434,6 +430,7 @@ static void rwsem_mark_wake(struct rw_semaphore *sem, if (!(oldcount & RWSEM_FLAG_HANDOFF) && time_after(jiffies, waiter->timeout)) { adjustment -= RWSEM_FLAG_HANDOFF; + waiter->handoff_set = true; lockevent_inc(rwsem_rlock_handoff); } @@ -531,14 +528,12 @@ static void rwsem_mark_wake(struct rw_semaphore *sem, * This function must be called with the sem->wait_lock held to prevent * race conditions between checking the rwsem wait list and setting the * sem->count accordingly. - * - * If wstate is WRITER_HANDOFF, it will make sure that either the handoff - * bit is set or the lock is acquired with handoff bit cleared. */ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem, - enum writer_wait_state wstate) + struct rwsem_waiter *waiter) { long count, new; + bool first = rwsem_first_waiter(sem) == waiter; lockdep_assert_held(&sem->wait_lock); @@ -546,13 +541,14 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem, do { bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF); - if (has_handoff && wstate == WRITER_NOT_FIRST) + if (has_handoff && !first) return false; new = count; if (count & RWSEM_LOCK_MASK) { - if (has_handoff || (wstate != WRITER_HANDOFF)) + if (has_handoff || (!waiter->rt_task && + !time_after(jiffies, waiter->timeout))) return false; new |= RWSEM_FLAG_HANDOFF; @@ -569,8 +565,11 @@ static inline bool rwsem_try_write_lock(struct rw_semaphore *sem, * We have either acquired the lock with handoff bit cleared or * set the handoff bit. */ - if (new & RWSEM_FLAG_HANDOFF) + if (new & RWSEM_FLAG_HANDOFF) { + waiter->handoff_set = true; + lockevent_inc(rwsem_wlock_handoff); return false; + } rwsem_set_owner(sem); return true; @@ -889,6 +888,24 @@ rwsem_spin_on_owner(struct rw_semaphore *sem) } #endif +/* + * Common code to handle rwsem flags in out_nolock path with wait_lock held. + */ +static inline void rwsem_out_nolock_clear_flags(struct rw_semaphore *sem, + struct rwsem_waiter *waiter) +{ + long flags = 0; + + list_del(&waiter->list); + if (list_empty(&sem->wait_list)) + flags = RWSEM_FLAG_HANDOFF | RWSEM_FLAG_WAITERS; + else if (waiter->handoff_set) + flags = RWSEM_FLAG_HANDOFF; + + if (flags) + atomic_long_andnot(flags, &sem->count); +} + /* * Wait for the read lock to be granted */ @@ -936,6 +953,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat waiter.task = current; waiter.type = RWSEM_WAITING_FOR_READ; waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT; + waiter.handoff_set = false; raw_spin_lock_irq(&sem->wait_lock); if (list_empty(&sem->wait_list)) { @@ -1002,11 +1020,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat return sem; out_nolock: - list_del(&waiter.list); - if (list_empty(&sem->wait_list)) { - atomic_long_andnot(RWSEM_FLAG_WAITERS|RWSEM_FLAG_HANDOFF, - &sem->count); - } + rwsem_out_nolock_clear_flags(sem, &waiter); raw_spin_unlock_irq(&sem->wait_lock); __set_current_state(TASK_RUNNING); lockevent_inc(rwsem_rlock_fail); @@ -1020,7 +1034,6 @@ static struct rw_semaphore * rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) { long count; - enum writer_wait_state wstate; struct rwsem_waiter waiter; struct rw_semaphore *ret = sem; DEFINE_WAKE_Q(wake_q); @@ -1038,16 +1051,13 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) waiter.task = current; waiter.type = RWSEM_WAITING_FOR_WRITE; waiter.timeout = jiffies + RWSEM_WAIT_TIMEOUT; + waiter.rt_task = rt_task(current); raw_spin_lock_irq(&sem->wait_lock); - - /* account for this before adding a new element to the list */ - wstate = list_empty(&sem->wait_list) ? WRITER_FIRST : WRITER_NOT_FIRST; - list_add_tail(&waiter.list, &sem->wait_list); /* we're now waiting on the lock */ - if (wstate == WRITER_NOT_FIRST) { + if (rwsem_first_waiter(sem) != &waiter) { count = atomic_long_read(&sem->count); /* @@ -1083,7 +1093,7 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) /* wait until we successfully acquire the lock */ set_current_state(state); for (;;) { - if (rwsem_try_write_lock(sem, wstate)) { + if (rwsem_try_write_lock(sem, &waiter)) { /* rwsem_try_write_lock() implies ACQUIRE on success */ break; } @@ -1098,9 +1108,12 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) * In this case, we attempt to acquire the lock again * without sleeping. */ - if (wstate == WRITER_HANDOFF) { + if (waiter.handoff_set) { enum owner_state owner_state; + if (signal_pending_state(state, current)) + goto out_nolock; + preempt_disable(); owner_state = rwsem_spin_on_owner(sem); preempt_enable(); @@ -1117,31 +1130,14 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) schedule(); lockevent_inc(rwsem_sleep_writer); set_current_state(state); - /* - * If HANDOFF bit is set, unconditionally do - * a trylock. - */ - if (wstate == WRITER_HANDOFF) - break; - - if ((wstate == WRITER_NOT_FIRST) && - (rwsem_first_waiter(sem) == &waiter)) - wstate = WRITER_FIRST; - - count = atomic_long_read(&sem->count); - if (!(count & RWSEM_LOCK_MASK)) - break; /* - * The setting of the handoff bit is deferred - * until rwsem_try_write_lock() is called. + * Unconditionally do a trylock and spinning if + * HANDOFF bit is set. */ - if ((wstate == WRITER_FIRST) && (rt_task(current) || - time_after(jiffies, waiter.timeout))) { - wstate = WRITER_HANDOFF; - lockevent_inc(rwsem_wlock_handoff); + if (waiter.handoff_set || + !(atomic_long_read(&sem->count) & RWSEM_LOCK_MASK)) break; - } } trylock_again: raw_spin_lock_irq(&sem->wait_lock); @@ -1156,19 +1152,12 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state) out_nolock: __set_current_state(TASK_RUNNING); raw_spin_lock_irq(&sem->wait_lock); - list_del(&waiter.list); - - if (unlikely(wstate == WRITER_HANDOFF)) - atomic_long_add(-RWSEM_FLAG_HANDOFF, &sem->count); - - if (list_empty(&sem->wait_list)) - atomic_long_andnot(RWSEM_FLAG_WAITERS, &sem->count); - else + rwsem_out_nolock_clear_flags(sem, &waiter); + if (!list_empty(&sem->wait_list)) rwsem_mark_wake(sem, RWSEM_WAKE_ANY, &wake_q); raw_spin_unlock_irq(&sem->wait_lock); wake_up_q(&wake_q); lockevent_inc(rwsem_wlock_fail); - return ERR_PTR(-EINTR); } -- 2.27.0