lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Date:   Tue, 30 Aug 2016 17:08:41 +0200
From:   Peter Zijlstra <peterz@...radead.org>
To:     Waiman Long <Waiman.Long@....com>
Cc:     Ingo Molnar <mingo@...hat.com>, linux-kernel@...r.kernel.org,
        Linus Torvalds <torvalds@...ux-foundation.org>,
        Ding Tianhong <dingtianhong@...wei.com>,
        Jason Low <jason.low2@....com>,
        Davidlohr Bueso <dave@...olabs.net>,
        "Paul E. McKenney" <paulmck@...ibm.com>,
        Thomas Gleixner <tglx@...utronix.de>,
        Will Deacon <Will.Deacon@....com>,
        Tim Chen <tim.c.chen@...ux.intel.com>,
        Imre Deak <imre.deak@...el.com>
Subject: Re: [RFC PATCH-queue/locking/rfc 2/2] locking/mutex: Enable
 optimistic spinning of woken waiter

On Fri, Aug 26, 2016 at 07:35:09PM -0400, Waiman Long wrote:

> @@ -624,13 +649,24 @@ __mutex_lock_common(struct mutex *lock, long state, unsigned int subclass,
>  		/* didn't get the lock, go to sleep: */
>  		spin_unlock_mutex(&lock->wait_lock, flags);
>  		schedule_preempt_disabled();
>  
> +		/*
> +		 * Both __mutex_trylock() and __mutex_waiter_is_first()
> +		 * can be done without the protection of wait_lock.
> +		 */

True, but it took me a little while to figure out why
__mutex_waiter_is_first() is safe without the lock :-)

> +		acquired = __mutex_trylock(lock);
>  
> +		if (!acquired && __mutex_waiter_is_first(lock, &waiter)) {
>  			__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
> +			/*
> +			 * Wait until the lock is handed off or the owner
> +			 * sleeps.
> +			 */
> +			acquired = mutex_optimistic_spin(lock, ww_ctx,
> +							 use_ww_ctx, true);
> +		}

That said; I think there's a few problems with this. Since we now poke
at the loop termination conditions outside of the wait_lock, it becomes
important where we do the task->state vs wakeup bits.

Specifically, since we still have state==RUNNING here, its possible
we'll fail to acquire the lock _and_ miss the wakeup from
mutex_unlock(). Leaving us stuck forever more.

Also, we should do the __mutex_trylock _after_ we set the handoff,
otherwise its possible we get the lock handed (miss the wakeup as per
the above) and fail to notice, again going back to sleep forever more.

> +
> +		spin_lock_mutex(&lock->wait_lock, flags);
>  	}
>  	__set_task_state(task, TASK_RUNNING);

I'm thinking something like the below on top of yours will cure things..
have not tested yet...

---
--- a/kernel/locking/mutex.c
+++ b/kernel/locking/mutex.c
@@ -394,10 +394,9 @@ static inline int mutex_can_spin_on_owne
  */
 static bool mutex_optimistic_spin(struct mutex *lock,
 				  struct ww_acquire_ctx *ww_ctx,
-				  const bool use_ww_ctx, bool waiter)
+				  const bool use_ww_ctx, const bool waiter)
 {
 	struct task_struct *task = current;
-	bool acquired = false;
 
 	if (!waiter) {
 		/*
@@ -408,7 +407,7 @@ static bool mutex_optimistic_spin(struct
 		 * to call mutex_can_spin_on_owner().
 		 */
 		if (!mutex_can_spin_on_owner(lock))
-			goto done;
+			goto fail;
 
 		/*
 		 * In order to avoid a stampede of mutex spinners trying to
@@ -416,10 +415,10 @@ static bool mutex_optimistic_spin(struct
 		 * MCS (queued) lock first before spinning on the owner field.
 		 */
 		if (!osq_lock(&lock->osq))
-			goto done;
+			goto fail;
 	}
 
-	while (true) {
+	for (;;) {
 		struct task_struct *owner;
 
 		if (use_ww_ctx && ww_ctx->acquired > 0) {
@@ -435,7 +434,7 @@ static bool mutex_optimistic_spin(struct
 			 * performed the optimistic spinning cannot be done.
 			 */
 			if (READ_ONCE(ww->ctx))
-				break;
+				goto fail_unlock;
 		}
 
 		/*
@@ -443,23 +442,16 @@ static bool mutex_optimistic_spin(struct
 		 * release the lock or go to sleep.
 		 */
 		owner = __mutex_owner(lock);
-
-		if (owner == task)
-			goto gotlock;
-
 		if (owner) {
+			if (waiter && owner == task)
+				goto gotlock_acquire;
+
 			if (!mutex_spin_on_owner(lock, owner))
-				break;
-			/*
-			 * For waiter-spinner, recheck the owner field
-			 * as it may have been changed to itself.
-			 */
-			if (waiter && (__mutex_owner(lock) == task))
-				goto gotlock;
+				goto fail_unlock;
 		}
 
 		/* Try to acquire the mutex if it is unlocked. */
-		if (__mutex_trylock(lock, false))
+		if (__mutex_trylock(lock, waiter))
 			goto gotlock;
 
 		/*
@@ -469,21 +461,28 @@ static bool mutex_optimistic_spin(struct
 		 * values at the cost of a few extra spins.
 		 */
 		cpu_relax_lowlatency();
-		continue;
-gotlock:
-		acquired = true;
-		break;
 	}
 
+gotlock_acquire:
+	smp_mb(); /* ACQUIRE */
+gotlock:
+	if (!waiter)
+		osq_unlock(&lock->osq);
+
+	return true;
+
+
+fail_unlock:
 	if (!waiter)
 		osq_unlock(&lock->osq);
-done:
+
+fail:
 	/*
 	 * If we fell out of the spin path because of need_resched(),
 	 * reschedule now, before we try-lock the mutex. This avoids getting
 	 * scheduled out right after we obtained the mutex.
 	 */
-	if (!acquired && need_resched()) {
+	if (need_resched()) {
 		/*
 		 * We _should_ have TASK_RUNNING here, but just in case
 		 * we do not, make it so, otherwise we might get stuck.
@@ -492,12 +491,12 @@ static bool mutex_optimistic_spin(struct
 		schedule_preempt_disabled();
 	}
 
-	return acquired;
+	return false;
 }
 #else
 static bool mutex_optimistic_spin(struct mutex *lock,
 				  struct ww_acquire_ctx *ww_ctx,
-				  const bool use_ww_ctx, bool waiter)
+				  const bool use_ww_ctx, const bool waiter)
 {
 	return false;
 }
@@ -590,7 +589,6 @@ __mutex_lock_common(struct mutex *lock,
 	unsigned long flags;
 	struct ww_mutex *ww;
 	int ret;
-	bool acquired;
 
 	if (use_ww_ctx) {
 		ww = container_of(lock, struct ww_mutex, base);
@@ -606,7 +604,7 @@ __mutex_lock_common(struct mutex *lock,
 	 * possible to allow recursive lock attempts by accident.
 	 */
 	if (__mutex_trylock(lock, false) ||
-	    mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx)) {
+	    mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, false)) {
 		/* got the lock, yay! */
 		lock_acquired(&lock->dep_map, ip);
 		if (use_ww_ctx)
@@ -638,7 +636,8 @@ __mutex_lock_common(struct mutex *lock,
 
 	lock_contended(&lock->dep_map, ip);
 
-	for (acquired = false; !acquired; ) {
+	set_task_state(task, state);
+	for (;;) {
 		/*
 		 * got a signal? (This code gets eliminated in the
 		 * TASK_UNINTERRUPTIBLE case.)
@@ -654,30 +653,23 @@ __mutex_lock_common(struct mutex *lock,
 				goto err;
 		}
 
-		__set_task_state(task, state);
-
-		/* didn't get the lock, go to sleep: */
 		spin_unlock_mutex(&lock->wait_lock, flags);
 		schedule_preempt_disabled();
 
-		/*
-		 * Both __mutex_trylock() and __mutex_waiter_is_first()
-		 * can be done without the protection of wait_lock.
-		 */
-		acquired = __mutex_trylock(lock, true);
+		set_task_state(task, state);
 
-		if (!acquired && __mutex_waiter_is_first(lock, &waiter)) {
+		if (__mutex_waiter_is_first(lock, &waiter)) {
 			__mutex_set_flag(lock, MUTEX_FLAG_HANDOFF);
-			/*
-			 * Wait until the lock is handed off or the owner
-			 * sleeps.
-			 */
-			acquired = mutex_optimistic_spin(lock, ww_ctx,
-							 use_ww_ctx, true);
+			if (mutex_optimistic_spin(lock, ww_ctx, use_ww_ctx, true))
+				break;
 		}
 
+		if (__mutex_trylock(lock, true))
+			break;
+
 		spin_lock_mutex(&lock->wait_lock, flags);
 	}
+	spin_lock_mutex(&lock->wait_lock, flags);
 	__set_task_state(task, TASK_RUNNING);
 
 remove_waiter:
@@ -700,6 +692,7 @@ __mutex_lock_common(struct mutex *lock,
 	return 0;
 
 err:
+	__set_task_state(task, TASK_RUNNING);
 	mutex_remove_waiter(lock, &waiter, task);
 	spin_unlock_mutex(&lock->wait_lock, flags);
 	debug_mutex_free_waiter(&waiter);

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ