lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20170901122320.nwrtaixslsfzr6ls@hirez.programming.kicks-ass.net>
Date:   Fri, 1 Sep 2017 14:23:20 +0200
From:   Peter Zijlstra <peterz@...radead.org>
To:     Nicholas Piggin <npiggin@...il.com>
Cc:     Ingo Molnar <mingo@...hat.com>, linux-kernel@...r.kernel.org
Subject: Re: [PATCH 1/2] locking: Use spin primitives for busy loops


This really should have been many patches.

On Sun, Aug 20, 2017 at 07:25:01PM +1000, Nicholas Piggin wrote:

> @@ -108,12 +109,10 @@ static inline unsigned __read_seqcount_begin(const seqcount_t *s)
>  {
>  	unsigned ret;
>  
> -repeat:
>  	ret = READ_ONCE(s->sequence);
> -	if (unlikely(ret & 1)) {
> -		cpu_relax();
> -		goto repeat;
> -	}
> +	if (unlikely(ret & 1))
> +		spin_until_cond( !((ret = READ_ONCE(s->sequence)) & 1) );
> +

That seems to suggest something like: cond_load(), similar to
smp_cond_load_acquire(), except without the smp/acquire parts.

>  	return ret;
>  }
>  
> diff --git a/kernel/locking/mcs_spinlock.h b/kernel/locking/mcs_spinlock.h
> index 6a385aabcce7..a91a0cc46a4c 100644
> --- a/kernel/locking/mcs_spinlock.h
> +++ b/kernel/locking/mcs_spinlock.h
> @@ -27,8 +27,7 @@ struct mcs_spinlock {
>   */
>  #define arch_mcs_spin_lock_contended(l)					\
>  do {									\
> -	while (!(smp_load_acquire(l)))					\
> -		cpu_relax();						\
> +	spin_until_cond(smp_load_acquire(l));				\
>  } while (0)

Torn on whether is makes sense to use smp_cond_load_acquire() here
instead. That avoids issuing the barrier on each loop.

>  #endif
>  
> @@ -107,8 +106,7 @@ void mcs_spin_unlock(struct mcs_spinlock **lock, struct mcs_spinlock *node)
>  		if (likely(cmpxchg_release(lock, node, NULL) == node))
>  			return;
>  		/* Wait until the next pointer is set */
> -		while (!(next = READ_ONCE(node->next)))
> -			cpu_relax();
> +		spin_until_cond((next = READ_ONCE(node->next)) != 0);
>  	}
>  
>  	/* Pass lock to next waiter. */

Hmm, you could've used that same pattern above too I suppose,

	spin_until_cond(!((ret = READ_ONCE(s->sequence)) & 1));

> diff --git a/kernel/locking/mutex.c b/kernel/locking/mutex.c
> index 858a07590e39..0ffa1cd7f12b 100644
> --- a/kernel/locking/mutex.c
> +++ b/kernel/locking/mutex.c
> @@ -427,6 +427,7 @@ bool mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner,
>  	bool ret = true;
>  
>  	rcu_read_lock();
> +	spin_begin();
>  	while (__mutex_owner(lock) == owner) {
>  		/*
>  		 * Ensure we emit the owner->on_cpu, dereference _after_
> @@ -450,8 +451,9 @@ bool mutex_spin_on_owner(struct mutex *lock, struct task_struct *owner,
>  			break;
>  		}
>  
> -		cpu_relax();
> +		spin_cpu_relax();
>  	}
> +	spin_end();
>  	rcu_read_unlock();

Since we just did a trylock we expect to wait, which is why we don't do
a condition test before spin_begin(), right?

>  
>  	return ret;
> @@ -532,6 +534,7 @@ mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
>  			goto fail;
>  	}
>  
> +	spin_begin();
>  	for (;;) {
>  		struct task_struct *owner;
>  
> @@ -553,8 +556,9 @@ mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
>  		 * memory barriers as we'll eventually observe the right
>  		 * values at the cost of a few extra spins.
>  		 */
> -		cpu_relax();
> +		spin_cpu_relax();
>  	}
> +	spin_end();
>  
>  	if (!waiter)
>  		osq_unlock(&lock->osq);
> @@ -563,6 +567,8 @@ mutex_optimistic_spin(struct mutex *lock, struct ww_acquire_ctx *ww_ctx,
>  
>  
>  fail_unlock:
> +	spin_end();
> +
>  	if (!waiter)
>  		osq_unlock(&lock->osq);
>  

The same, we just did a trylock before calling this.

However!, I think you just created a nested spin_begin()/spin_end,
looking at how PPC implements that, this doesn't look right.

> diff --git a/kernel/locking/osq_lock.c b/kernel/locking/osq_lock.c
> index a3167941093b..9dd58bbe60b7 100644
> --- a/kernel/locking/osq_lock.c
> +++ b/kernel/locking/osq_lock.c
> @@ -53,6 +53,7 @@ osq_wait_next(struct optimistic_spin_queue *lock,
>  	 */
>  	old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
>  
> +	spin_begin();
>  	for (;;) {
>  		if (atomic_read(&lock->tail) == curr &&
>  		    atomic_cmpxchg_acquire(&lock->tail, curr, old) == curr) {
> @@ -80,8 +81,9 @@ osq_wait_next(struct optimistic_spin_queue *lock,
>  				break;
>  		}
>  
> -		cpu_relax();
> +		spin_cpu_relax();
>  	}
> +	spin_end();
>  
>  	return next;
>  }

This however doesn't have a fast-path, we unconditionally do
spin_begin(). That looks sub-optimal. From looking at PPC that drops the
HT priority even though we might not end up waiting at all.


> @@ -107,6 +109,8 @@ bool osq_lock(struct optimistic_spin_queue *lock)
>  	if (old == OSQ_UNLOCKED_VAL)
>  		return true;
>  
> +	spin_begin();
> +
>  	prev = decode_cpu(old);
>  	node->prev = prev;
>  	WRITE_ONCE(prev->next, node);

Why so early? You again do HMT_low() even though we might not hit the
cpu_relax().

> @@ -129,8 +133,9 @@ bool osq_lock(struct optimistic_spin_queue *lock)
>  		if (need_resched() || vcpu_is_preempted(node_cpu(node->prev)))
>  			goto unqueue;
>  
> -		cpu_relax();
> +		spin_cpu_relax();
>  	}
> +	spin_end();
>  	return true;
>  
>  unqueue:



> @@ -152,10 +157,12 @@ bool osq_lock(struct optimistic_spin_queue *lock)
>  		 * in which case we should observe @node->locked becomming
>  		 * true.
>  		 */
> -		if (smp_load_acquire(&node->locked))
> +		if (smp_load_acquire(&node->locked)) {
> +			spin_end();
>  			return true;
> +		}
>  
> -		cpu_relax();
> +		spin_cpu_relax();
>  
>  		/*
>  		 * Or we race against a concurrent unqueue()'s step-B, in which
> @@ -164,6 +171,8 @@ bool osq_lock(struct optimistic_spin_queue *lock)
>  		prev = READ_ONCE(node->prev);
>  	}
>  
> +	spin_end();
> +

So here we did all of step-A in low-prio, that doesn't look right.

>  	/*
>  	 * Step - B -- stabilize @next
>  	 *
> diff --git a/kernel/locking/qrwlock.c b/kernel/locking/qrwlock.c
> index 2655f26ec882..186ff495097d 100644
> --- a/kernel/locking/qrwlock.c
> +++ b/kernel/locking/qrwlock.c
> @@ -54,10 +54,12 @@ struct __qrwlock {
>  static __always_inline void
>  rspin_until_writer_unlock(struct qrwlock *lock, u32 cnts)
>  {
> +	spin_begin();
>  	while ((cnts & _QW_WMASK) == _QW_LOCKED) {
> -		cpu_relax();
> +		spin_cpu_relax();
>  		cnts = atomic_read_acquire(&lock->cnts);
>  	}
> +	spin_end();
>  }

Again, unconditional :/

>  /**
> @@ -124,6 +126,7 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
>  	 * Set the waiting flag to notify readers that a writer is pending,
>  	 * or wait for a previous writer to go away.
>  	 */
> +	spin_begin();
>  	for (;;) {
>  		struct __qrwlock *l = (struct __qrwlock *)lock;
>  
> @@ -131,7 +134,7 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
>  		   (cmpxchg_relaxed(&l->wmode, 0, _QW_WAITING) == 0))
>  			break;
>  
> -		cpu_relax();
> +		spin_cpu_relax();
>  	}
>  	/* When no more readers, set the locked flag */
> @@ -142,8 +145,10 @@ void queued_write_lock_slowpath(struct qrwlock *lock)
>  					    _QW_LOCKED) == _QW_WAITING))
>  			break;
>  
> -		cpu_relax();
> +		spin_cpu_relax();
>  	}
> +	spin_end();
> +
>  unlock:
>  	arch_spin_unlock(&lock->wait_lock);
>  }

Would not something like:

	do {
		spin_until_cond(!READ_ONCE(l->wmode));
	} while (try_cmpxchg_relaxed(&l->wmode, 0, _QW_WAITING));

Be a better pattern here? Then we have a fast-path without low-medium
flips.


> diff --git a/kernel/locking/qspinlock_paravirt.h b/kernel/locking/qspinlock_paravirt.h
> index 4ccfcaae5b89..88817e41fadf 100644
> --- a/kernel/locking/qspinlock_paravirt.h
> +++ b/kernel/locking/qspinlock_paravirt.h
> @@ -293,15 +293,19 @@ static void pv_wait_node(struct mcs_spinlock *node, struct mcs_spinlock *prev)
>  	bool wait_early;
>  
>  	for (;;) {
> +		spin_begin();

again, unconditional..

>  		for (wait_early = false, loop = SPIN_THRESHOLD; loop; loop--) {
> -			if (READ_ONCE(node->locked))
> +			if (READ_ONCE(node->locked)) {
> +				spin_end();
>  				return;
> +			}
>  			if (pv_wait_early(pp, loop)) {
>  				wait_early = true;
>  				break;
>  			}
> -			cpu_relax();
> +			spin_cpu_relax();
>  		}
> +		spin_end();
>  
>  		/*
>  		 * Order pn->state vs pn->locked thusly:
> @@ -417,11 +421,15 @@ pv_wait_head_or_lock(struct qspinlock *lock, struct mcs_spinlock *node)
>  		 * disable lock stealing before attempting to acquire the lock.
>  		 */
>  		set_pending(lock);
> +		spin_begin();

and again...

>  		for (loop = SPIN_THRESHOLD; loop; loop--) {
> -			if (trylock_clear_pending(lock))
> +			if (trylock_clear_pending(lock)) {
> +				spin_end();
>  				goto gotlock;
> -			cpu_relax();
> +			}
> +			spin_cpu_relax();
>  		}
> +		spin_end();
>  		clear_pending(lock);
>  



> diff --git a/kernel/locking/rwsem-xadd.c b/kernel/locking/rwsem-xadd.c
> index 34e727f18e49..2d0e539f1a95 100644
> --- a/kernel/locking/rwsem-xadd.c
> +++ b/kernel/locking/rwsem-xadd.c
> @@ -358,6 +358,7 @@ static noinline bool rwsem_spin_on_owner(struct rw_semaphore *sem)
>  		goto out;
>  
>  	rcu_read_lock();
> +	spin_begin();
>  	while (sem->owner == owner) {
>  		/*
>  		 * Ensure we emit the owner->on_cpu, dereference _after_
> @@ -373,12 +374,14 @@ static noinline bool rwsem_spin_on_owner(struct rw_semaphore *sem)
>  		 */
>  		if (!owner->on_cpu || need_resched() ||
>  				vcpu_is_preempted(task_cpu(owner))) {
> +			spin_end();
>  			rcu_read_unlock();
>  			return false;
>  		}
>  
> -		cpu_relax();
> +		spin_cpu_relax();
>  	}
> +	spin_end();
>  	rcu_read_unlock();
>  out:
>  	/*

rwsem_spin_on_owner() is different from the mutex one in that we don't
first do a trylock() which then makes this unconditional.

So I think we want the rwsem_optimistic_spin() main loop re-ordered to
first do the trylock and _then_ wait if it fails. Not first wait and
then try.

> @@ -408,6 +411,7 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
>  	 *  2) readers own the lock as we can't determine if they are
>  	 *     actively running or not.
>  	 */
> +	spin_begin();
>  	while (rwsem_spin_on_owner(sem)) {
>  		/*
>  		 * Try to acquire the lock
> @@ -432,8 +436,9 @@ static bool rwsem_optimistic_spin(struct rw_semaphore *sem)
>  		 * memory barriers as we'll eventually observe the right
>  		 * values at the cost of a few extra spins.
>  		 */
> -		cpu_relax();
> +		spin_cpu_relax();
>  	}
> +	spin_end();
>  	osq_unlock(&sem->osq);
>  done:
>  	preempt_enable();

And I think you did the recursive spin_begin thing again here.

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ