Currently we must drop hb->lock in order to call rt_mutex_timed_futex_lock(), such that it can block and wait for lock acquisition. The problem with this is that at this point (and after a failed acquire but before re-acquiring hb->lock) the hb queue and rt_mutex wait list disagree on who is waiting to acquire the futex. This leads to a 'funny' state in wake_futex_pi() which we then need to deal with in fixup_owner(). This inconsistent state would become even more of a problem when we move rt_mutex_unlock() out from under hb->lock, which we want to do because of PI inversion issues. Avoid the funny state by reusing the rt_mutex_{start,finish}_proxy_lock() primitives that were created for requeue-pi. The idea is to enqueue the rt_mutex_waiter on the rt_mutex wait list before dropping the hb->lock, this is what rt_mutex_start_proxy_lock() allows. We must then wait for the lock with hb->lock dropped, _however_ we must not do remove_waiter() in case the lock acquire fails, this means we need to split rt_mutex_finish_proxy_lock() into two parts: wait and cleanup. With this done we can do something along the lines of: __queue_me(); rt_mutex_start_proxy_lock(); spin_unlock(q.lock_ptr); ret = rt_mutex_wait_proxy_lock(); spin_lock(q.lock_ptr); if (ret) rt_mutex_cleanup_proxy_lock(); And the hb queue and rt_mutex wait lists are always in sync, with the one except of rt_mutex_futex_unlock(), which removes the new owner from the rt_mutex wait list which will remain on the hb queue until it returns from rt_mutex_wait_proxy_lock(). This one case is fine, since unlock has no concurrency with itself. That is, the new owner must finish and return from futex_lock_pi() before it can do another unlock (through futex_unlock_pi()), at which point its state is coherent again. Signed-off-by: Peter Zijlstra (Intel) --- kernel/futex.c | 78 ++++++++++++++++++++++++++++------------ kernel/locking/rtmutex.c | 47 +++++++++++++----------- kernel/locking/rtmutex_common.h | 10 +++-- 3 files changed, 88 insertions(+), 47 deletions(-) --- a/kernel/futex.c +++ b/kernel/futex.c @@ -2088,20 +2088,7 @@ queue_unlock(struct futex_hash_bucket *h hb_waiters_dec(hb); } -/** - * queue_me() - Enqueue the futex_q on the futex_hash_bucket - * @q: The futex_q to enqueue - * @hb: The destination hash bucket - * - * The hb->lock must be held by the caller, and is released here. A call to - * queue_me() is typically paired with exactly one call to unqueue_me(). The - * exceptions involve the PI related operations, which may use unqueue_me_pi() - * or nothing if the unqueue is done as part of the wake process and the unqueue - * state is implicit in the state of woken task (see futex_wait_requeue_pi() for - * an example). - */ -static inline void queue_me(struct futex_q *q, struct futex_hash_bucket *hb) - __releases(&hb->lock) +static inline void __queue_me(struct futex_q *q, struct futex_hash_bucket *hb) { int prio; @@ -2118,6 +2105,24 @@ static inline void queue_me(struct futex plist_node_init(&q->list, prio); plist_add(&q->list, &hb->chain); q->task = current; +} + +/** + * queue_me() - Enqueue the futex_q on the futex_hash_bucket + * @q: The futex_q to enqueue + * @hb: The destination hash bucket + * + * The hb->lock must be held by the caller, and is released here. A call to + * queue_me() is typically paired with exactly one call to unqueue_me(). The + * exceptions involve the PI related operations, which may use unqueue_me_pi() + * or nothing if the unqueue is done as part of the wake process and the unqueue + * state is implicit in the state of woken task (see futex_wait_requeue_pi() for + * an example). + */ +static inline void queue_me(struct futex_q *q, struct futex_hash_bucket *hb) + __releases(&hb->lock) +{ + __queue_me(q, hb); spin_unlock(&hb->lock); } @@ -2593,6 +2598,7 @@ static int futex_lock_pi(u32 __user *uad ktime_t *time, int trylock) { struct hrtimer_sleeper timeout, *to = NULL; + struct rt_mutex_waiter rt_waiter; struct futex_hash_bucket *hb; struct futex_q q = futex_q_init; int res, ret; @@ -2645,25 +2651,49 @@ static int futex_lock_pi(u32 __user *uad } } + WARN_ON(!q.pi_state); + /* * Only actually queue now that the atomic ops are done: */ - queue_me(&q, hb); + __queue_me(&q, hb); - WARN_ON(!q.pi_state); - /* - * Block on the PI mutex: - */ - if (!trylock) { - ret = rt_mutex_timed_futex_lock(&q.pi_state->pi_mutex, to); - } else { + debug_rt_mutex_init_waiter(&rt_waiter); + RB_CLEAR_NODE(&rt_waiter.pi_tree_entry); + RB_CLEAR_NODE(&rt_waiter.tree_entry); + rt_waiter.task = NULL; + + if (trylock) { ret = rt_mutex_futex_trylock(&q.pi_state->pi_mutex); /* Fixup the trylock return value: */ ret = ret ? 0 : -EWOULDBLOCK; + goto did_trylock; } + /* + * We must add ourselves to the rt_mutex waitlist while holding hb->lock + * such that the hb and rt_mutex wait lists match. + */ + rt_mutex_start_proxy_lock(&q.pi_state->pi_mutex, &rt_waiter, current); + spin_unlock(q.lock_ptr); + + if (unlikely(to)) + hrtimer_start_expires(&to->timer, HRTIMER_MODE_ABS); + + ret = rt_mutex_wait_proxy_lock(&q.pi_state->pi_mutex, to, &rt_waiter); + spin_lock(q.lock_ptr); /* + * If we failed to acquire the lock (signal/timeout), we must + * first acquire the hb->lock before removing the lock from the + * rt_mutex waitqueue, such that we can keep the hb and rt_mutex + * wait lists consistent. + */ + if (ret) + rt_mutex_cleanup_proxy_lock(&q.pi_state->pi_mutex, &rt_waiter); + +did_trylock: + /* * Fixup the pi_state owner and possibly acquire the lock if we * haven't already. */ @@ -3005,10 +3035,12 @@ static int futex_wait_requeue_pi(u32 __u */ WARN_ON(!q.pi_state); pi_mutex = &q.pi_state->pi_mutex; - ret = rt_mutex_finish_proxy_lock(pi_mutex, to, &rt_waiter); + ret = rt_mutex_wait_proxy_lock(pi_mutex, to, &rt_waiter); debug_rt_mutex_free_waiter(&rt_waiter); spin_lock(q.lock_ptr); + if (ret) + rt_mutex_cleanup_proxy_lock(pi_mutex, &rt_waiter); /* * Fixup the pi_state owner and possibly acquire the lock if we * haven't already. --- a/kernel/locking/rtmutex.c +++ b/kernel/locking/rtmutex.c @@ -1485,19 +1485,6 @@ int __sched rt_mutex_lock_interruptible( EXPORT_SYMBOL_GPL(rt_mutex_lock_interruptible); /* - * Futex variant with full deadlock detection. - * Futex variants must not use the fast-path, see __rt_mutex_futex_unlock(). - */ -int __sched rt_mutex_timed_futex_lock(struct rt_mutex *lock, - struct hrtimer_sleeper *timeout) -{ - might_sleep(); - - return rt_mutex_slowlock(lock, TASK_INTERRUPTIBLE, - timeout, RT_MUTEX_FULL_CHAINWALK); -} - -/* * Futex variant, must not use fastpath. */ int __sched rt_mutex_futex_trylock(struct rt_mutex *lock) @@ -1745,21 +1732,23 @@ struct task_struct *rt_mutex_next_owner( } /** - * rt_mutex_finish_proxy_lock() - Complete lock acquisition + * rt_mutex_wait_proxy_lock() - Wait for lock acquisition * @lock: the rt_mutex we were woken on * @to: the timeout, null if none. hrtimer should already have * been started. * @waiter: the pre-initialized rt_mutex_waiter * - * Complete the lock acquisition started our behalf by another thread. + * Wait for the the lock acquisition started on our behalf by + * rt_mutex_start_proxy_lock(). Upon failure, the caller must call + * rt_mutex_cleanup_proxy_lock(). * * Returns: * 0 - success * <0 - error, one of -EINTR, -ETIMEDOUT * - * Special API call for PI-futex requeue support + * Special API call for PI-futex support */ -int rt_mutex_finish_proxy_lock(struct rt_mutex *lock, +int rt_mutex_wait_proxy_lock(struct rt_mutex *lock, struct hrtimer_sleeper *to, struct rt_mutex_waiter *waiter) { @@ -1772,9 +1761,6 @@ int rt_mutex_finish_proxy_lock(struct rt /* sleep on the mutex */ ret = __rt_mutex_slowlock(lock, TASK_INTERRUPTIBLE, to, waiter); - if (unlikely(ret)) - remove_waiter(lock, waiter); - /* * try_to_take_rt_mutex() sets the waiter bit unconditionally. We might * have to fix that up. @@ -1785,3 +1771,24 @@ int rt_mutex_finish_proxy_lock(struct rt return ret; } + +/** + * rt_mutex_cleanup_proxy_lock() - Cleanup failed lock acquisition + * @lock: the rt_mutex we were woken on + * @waiter: the pre-initialized rt_mutex_waiter + * + * Clean up the failed lock acquisition as per rt_mutex_wait_proxy_lock(). + * + * Special API call for PI-futex support + */ +void rt_mutex_cleanup_proxy_lock(struct rt_mutex *lock, + struct rt_mutex_waiter *waiter) +{ + raw_spin_lock_irq(&lock->wait_lock); + + remove_waiter(lock, waiter); + fixup_rt_mutex_waiters(lock); + + raw_spin_unlock_irq(&lock->wait_lock); +} + --- a/kernel/locking/rtmutex_common.h +++ b/kernel/locking/rtmutex_common.h @@ -105,11 +105,13 @@ extern void rt_mutex_proxy_unlock(struct extern int rt_mutex_start_proxy_lock(struct rt_mutex *lock, struct rt_mutex_waiter *waiter, struct task_struct *task); -extern int rt_mutex_finish_proxy_lock(struct rt_mutex *lock, - struct hrtimer_sleeper *to, - struct rt_mutex_waiter *waiter); -extern int rt_mutex_timed_futex_lock(struct rt_mutex *l, struct hrtimer_sleeper *to); +extern int rt_mutex_wait_proxy_lock(struct rt_mutex *lock, + struct hrtimer_sleeper *to, + struct rt_mutex_waiter *waiter); +extern void rt_mutex_cleanup_proxy_lock(struct rt_mutex *lock, + struct rt_mutex_waiter *waiter); + extern int rt_mutex_futex_trylock(struct rt_mutex *l); extern void rt_mutex_futex_unlock(struct rt_mutex *lock);