There's a number of 'interesting' problems with FUTEX_UNLOCK_PI, all caused by holding hb->lock while doing the rt_mutex_unlock() equivalient. This patch doesn't attempt to fix any of the actual problems, but instead reworks the code to not hold hb->lock across the unlock, paving the way to actually fix the problems later. The current reason we hold hb->lock over unlock is that it serializes against FUTEX_LOCK_PI and avoids new waiters from coming in, this then ensures the rt_mutex_next_owner() value is stable and can be written into the user-space futex value before doing the unlock. Such that the unlock will indeed end up at new_owner. This patch recognises that holding rt_mutex::wait_lock results in the very same guarantee, no new waiters can come in while we hold that lock -- after all, waiters would need this lock to queue themselves. It therefore restructures the code to keep rt_mutex::wait_lock held. This (of course) is not entirely straight forward either, see the comment in rt_mutex_slowunlock(), doing the unlock itself might drop wait_lock, letting new waiters in. To cure this rt_mutex_futex_unlock() becomes a variant of rt_mutex_slowunlock() that return -EAGAIN instead. This ensures the FUTEX_UNLOCK_PI code aborts and restarts the entire operation. Signed-off-by: Peter Zijlstra (Intel) --- kernel/futex.c | 63 +++++++++++++++++-------------- kernel/locking/rtmutex.c | 81 ++++++++++++++++++++++++++++++++++++---- kernel/locking/rtmutex_common.h | 4 - 3 files changed, 110 insertions(+), 38 deletions(-) --- a/kernel/futex.c +++ b/kernel/futex.c @@ -1294,24 +1294,21 @@ static void mark_wake_futex(struct wake_ static int wake_futex_pi(u32 __user *uaddr, u32 uval, struct futex_q *top_waiter, struct futex_hash_bucket *hb) { - struct task_struct *new_owner; struct futex_pi_state *pi_state = top_waiter->pi_state; u32 uninitialized_var(curval), newval; + struct task_struct *new_owner; WAKE_Q(wake_q); - bool deboost; int ret = 0; - if (!pi_state) - return -EINVAL; + raw_spin_lock_irq(&pi_state->pi_mutex.wait_lock); + WARN_ON_ONCE(!atomic_inc_not_zero(&pi_state->refcount)); /* - * If current does not own the pi_state then the futex is - * inconsistent and user space fiddled with the futex value. + * Now that we hold wait_lock, no new waiters can happen on the + * rt_mutex and new owner is stable. Drop hb->lock. */ - if (pi_state->owner != current) - return -EINVAL; + spin_unlock(&hb->lock); - raw_spin_lock_irq(&pi_state->pi_mutex.wait_lock); new_owner = rt_mutex_next_owner(&pi_state->pi_mutex); /* @@ -1334,6 +1331,7 @@ static int wake_futex_pi(u32 __user *uad if (cmpxchg_futex_value_locked(&curval, uaddr, uval, newval)) { ret = -EFAULT; + } else if (curval != uval) { /* * If a unconditional UNLOCK_PI operation (user space did not @@ -1346,10 +1344,9 @@ static int wake_futex_pi(u32 __user *uad else ret = -EINVAL; } - if (ret) { - raw_spin_unlock_irq(&pi_state->pi_mutex.wait_lock); - return ret; - } + + if (ret) + goto out_unlock; raw_spin_lock(&pi_state->owner->pi_lock); WARN_ON(list_empty(&pi_state->list)); @@ -1362,22 +1359,20 @@ static int wake_futex_pi(u32 __user *uad pi_state->owner = new_owner; raw_spin_unlock(&new_owner->pi_lock); - raw_spin_unlock_irq(&pi_state->pi_mutex.wait_lock); + ret = rt_mutex_futex_unlock(&pi_state->pi_mutex, &wake_q); - deboost = rt_mutex_futex_unlock(&pi_state->pi_mutex, &wake_q); +out_unlock: + raw_spin_unlock_irq(&pi_state->pi_mutex.wait_lock); - /* - * First unlock HB so the waiter does not spin on it once he got woken - * up. Second wake up the waiter before the priority is adjusted. If we - * deboost first (and lose our higher priority), then the task might get - * scheduled away before the wake up can take place. - */ - spin_unlock(&hb->lock); - wake_up_q(&wake_q); - if (deboost) + if (ret > 0) { + wake_up_q(&wake_q); rt_mutex_adjust_prio(current); + ret = 0; + } - return 0; + put_pi_state(pi_state); + + return ret; } /* @@ -2656,6 +2651,20 @@ static int futex_unlock_pi(u32 __user *u */ top_waiter = futex_top_waiter(hb, &key); if (top_waiter) { + + if (!top_waiter->pi_state) + goto out_unlock; + + /* + * If current does not own the pi_state then the futex is + * inconsistent and user space fiddled with the futex value. + */ + if (top_waiter->pi_state->owner != current) + goto out_unlock; + + /* + * wait_futex_pi() _will_ drop hb->lock + */ ret = wake_futex_pi(uaddr, uval, top_waiter, hb); /* * In case of success wake_futex_pi dropped the hash @@ -2674,7 +2683,6 @@ static int futex_unlock_pi(u32 __user *u * setting the FUTEX_WAITERS bit. Try again. */ if (ret == -EAGAIN) { - spin_unlock(&hb->lock); put_futex_key(&key); goto retry; } @@ -2682,7 +2690,7 @@ static int futex_unlock_pi(u32 __user *u * wake_futex_pi has detected invalid state. Tell user * space. */ - goto out_unlock; + goto out_putkey; } /* @@ -2707,7 +2715,6 @@ static int futex_unlock_pi(u32 __user *u return ret; pi_faulted: - spin_unlock(&hb->lock); put_futex_key(&key); ret = fault_in_user_writeable(uaddr); --- a/kernel/locking/rtmutex.c +++ b/kernel/locking/rtmutex.c @@ -1489,19 +1489,84 @@ void __sched rt_mutex_unlock(struct rt_m EXPORT_SYMBOL_GPL(rt_mutex_unlock); /** - * rt_mutex_futex_unlock - Futex variant of rt_mutex_unlock + * rt_mutex_futex_unlock - Futex variant of rt_mutex_slowunlock * @lock: the rt_mutex to be unlocked + * @wqh: wake queue * - * Returns: true/false indicating whether priority adjustment is - * required or not. + * This cannot just be rt_mutex_slowunlock() since that does the wait_lock + * dance, and the futex code is tricky (ha!) and uses the wait_lock + * to hold off new waiters, so dropping this lock invalidates prior state + * and we need to redo all of it. + * + * Returns: + * -EAGAIN: if we need to retry the whole operation; + * 1: if we need to wake and deboost; + * 0: if we're done. */ -bool __sched rt_mutex_futex_unlock(struct rt_mutex *lock, - struct wake_q_head *wqh) +int __sched rt_mutex_futex_unlock(struct rt_mutex *lock, + struct wake_q_head *wake_q) { - if (likely(rt_mutex_cmpxchg_release(lock, current, NULL))) - return false; + lockdep_assert_held(&lock->wait_lock); + + debug_rt_mutex_unlock(lock); + + /* + * We must be careful here if the fast path is enabled. If we + * have no waiters queued we cannot set owner to NULL here + * because of: + * + * foo->lock->owner = NULL; + * rtmutex_lock(foo->lock); <- fast path + * free = atomic_dec_and_test(foo->refcnt); + * rtmutex_unlock(foo->lock); <- fast path + * if (free) + * kfree(foo); + * raw_spin_unlock(foo->lock->wait_lock); + * + * So for the fastpath enabled kernel: + * + * Nothing can set the waiters bit as long as we hold + * lock->wait_lock. So we do the following sequence: + * + * owner = rt_mutex_owner(lock); + * clear_rt_mutex_waiters(lock); + * raw_spin_unlock(&lock->wait_lock); + * if (cmpxchg(&lock->owner, owner, 0) == owner) + * return; + * goto retry; + * + * The fastpath disabled variant is simple as all access to + * lock->owner is serialized by lock->wait_lock: + * + * lock->owner = NULL; + * raw_spin_unlock(&lock->wait_lock); + */ + if (!rt_mutex_has_waiters(lock)) { + unsigned long flags; + int ret = 0; + + local_save_flags(flags); + + /* Drops lock->wait_lock ! */ + if (unlock_rt_mutex_safe(lock, flags)) + ret = -EAGAIN; + + /* Relock the rtmutex and try again */ + raw_spin_lock_irq(&lock->wait_lock); + + return ret; + } + + /* + * The wakeup next waiter path does not suffer from the above + * race. See the comments there. + * + * Queue the next waiter for wakeup once we release the wait_lock. + */ + mark_wakeup_next_waiter(wake_q, lock); - return rt_mutex_slowunlock(lock, wqh); + /* Do wakeups and deboost. */ + return 1; } /** --- a/kernel/locking/rtmutex_common.h +++ b/kernel/locking/rtmutex_common.h @@ -109,8 +109,8 @@ extern int rt_mutex_finish_proxy_lock(st struct hrtimer_sleeper *to, struct rt_mutex_waiter *waiter); extern int rt_mutex_timed_futex_lock(struct rt_mutex *l, struct hrtimer_sleeper *to); -extern bool rt_mutex_futex_unlock(struct rt_mutex *lock, - struct wake_q_head *wqh); +extern int rt_mutex_futex_unlock(struct rt_mutex *lock, + struct wake_q_head *wqh); extern void rt_mutex_adjust_prio(struct task_struct *task); #ifdef CONFIG_DEBUG_RT_MUTEXES