lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1483466430-8028-6-git-send-email-longman@redhat.com>
Date:   Tue,  3 Jan 2017 13:00:28 -0500
From:   Waiman Long <longman@...hat.com>
To:     Peter Zijlstra <peterz@...radead.org>,
        Ingo Molnar <mingo@...hat.com>,
        Thomas Gleixner <tglx@...utronix.de>,
        "H. Peter Anvin" <hpa@...or.com>
Cc:     linux-kernel@...r.kernel.org, Waiman Long <longman@...hat.com>
Subject: [RFC PATCH 5/7] locking/rtqspinlock: Handle priority boosting

As the priority of a task may get boosted due to an acquired rtmutex,
we will need to periodically check the task priority to see if it
gets boosted. For an originally non-RT task, that means unqueuing from
the MCS wait queue before doing an RT spinning. So the unqueuing code
from osq_lock is borrowed to handle the unqueuing aspect of it.

Signed-off-by: Waiman Long <longman@...hat.com>
---
 kernel/locking/qspinlock.c    |  25 ++++-
 kernel/locking/qspinlock_rt.h | 235 +++++++++++++++++++++++++++++++++++++++++-
 2 files changed, 253 insertions(+), 7 deletions(-)

diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c
index cb5c2e5..b25ad58 100644
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -270,10 +270,18 @@ static __always_inline u32  __pv_wait_head_or_lock(struct qspinlock *lock,
 /*
  * Realtime queued spinlock is mutual exclusive with native and PV spinlocks.
  */
+#define RT_RETRY	((u32)-1)
+
 #ifdef CONFIG_REALTIME_QUEUED_SPINLOCKS
 #include "qspinlock_rt.h"
 #else
-static __always_inline u32 rt_wait_head_or_retry(struct qspinlock *lock)
+static __always_inline void rt_init_node(struct mcs_spinlock *node, u32 tail) {}
+static __always_inline bool rt_wait_node_or_unqueue(struct qspinlock *lock,
+						    struct mcs_spinlock *node,
+						    struct mcs_spinlock *prev)
+						{ return false; }
+static __always_inline u32  rt_spin_lock_or_retry(struct qspinlock *lock,
+						  struct mcs_spinlock *node)
 						{ return 0; }
 #define rt_pending(v)		0
 #define rt_enabled()		false
@@ -514,6 +522,7 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 	node->locked = 0;
 	node->next = NULL;
 	pv_init_node(node);
+	rt_init_node(node, tail);
 
 	/*
 	 * We touched a (possibly) cold cacheline in the per-cpu queue node;
@@ -552,6 +561,10 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 		WRITE_ONCE(prev->next, node);
 
 		pv_wait_node(node, prev);
+
+		if (rt_wait_node_or_unqueue(lock, node, prev))
+			goto queue;	/* Retry RT locking */
+
 		arch_mcs_spin_lock_contended(&node->locked);
 
 		/*
@@ -591,10 +604,14 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
 
 	/*
 	 * The RT rt_wait_head_or_lock function, if active, will acquire the
-	 * lock and return a non-zero value.
+	 * lock, release the MCS lock and return a non-zero value. We need to
+	 * retry with RT spinning when RT_RETRY is returned.
 	 */
-	if ((val = rt_wait_head_or_retry(lock)))
-		goto locked;
+	val = rt_spin_lock_or_retry(lock, node);
+	if (val == RT_RETRY)
+		goto queue;
+	else if (val)
+		return;
 
 	val = smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_PENDING_MASK));
 
diff --git a/kernel/locking/qspinlock_rt.h b/kernel/locking/qspinlock_rt.h
index c2f2722..0c4d051 100644
--- a/kernel/locking/qspinlock_rt.h
+++ b/kernel/locking/qspinlock_rt.h
@@ -37,6 +37,12 @@
  * doing so, we can query the highest priority task that is waiting on the
  * outer lock and adjust our waiting priority accordingly. To speed up nested
  * spinlock calls, they will have a minimum RT priority of 1 to begin with.
+ *
+ * To handle priority boosting due to an acquired rt-mutex, The task->prio
+ * field is queried in each iteration of the loop. For originally non-RT tasks,
+ * it will have to break out of the MCS wait queue just like what is done
+ * in the OSQ lock. Then it has to retry RT spinning if it has been boosted
+ * to RT priority.
  */
 #include <linux/sched.h>
 
@@ -45,9 +51,56 @@
 #endif
 
 /*
+ * For proper unqueuing from the MCS wait queue, we need to store the encoded
+ * tail code as well the previous node pointer into the extra MCS node. Since
+ * CPUs in interrupt context won't use the per-CPU MCS nodes anymore. So only
+ * one is needed for process context CPUs. As a result, we can use the
+ * additional nodes for data storage. Here, we allow 2 nodes per cpu in case
+ * we want to put softIRQ CPUs into the queue as well.
+ */
+struct rt_node {
+	struct mcs_spinlock	mcs;
+	struct mcs_spinlock	__reserved;
+	struct mcs_spinlock	*prev;
+	u32			tail;
+};
+
+/*
  * =========================[ Helper Functions ]=========================
  */
 
+static u32 cmpxchg_tail_acquire(struct qspinlock *lock, u32 old, u32 new)
+{
+	struct __qspinlock *l = (void *)lock;
+
+	return cmpxchg_acquire(&l->tail, old >> _Q_TAIL_OFFSET,
+			       new >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
+}
+
+static u32 cmpxchg_tail_release(struct qspinlock *lock, u32 old, u32 new)
+{
+	struct __qspinlock *l = (void *)lock;
+
+	return cmpxchg_release(&l->tail, old >> _Q_TAIL_OFFSET,
+			       new >> _Q_TAIL_OFFSET) << _Q_TAIL_OFFSET;
+}
+
+static inline void rt_write_prev(struct mcs_spinlock *node,
+				 struct mcs_spinlock *prev)
+{
+	WRITE_ONCE(((struct rt_node *)node)->prev, prev);
+}
+
+static inline u32 rt_read_tail(struct mcs_spinlock *node)
+{
+	return READ_ONCE(((struct rt_node *)node)->tail);
+}
+
+static inline struct mcs_spinlock *rt_read_prev(struct mcs_spinlock *node)
+{
+	return READ_ONCE(((struct rt_node *)node)->prev);
+}
+
 /*
  * Translate the priority of a task to an equivalent RT priority
  */
@@ -141,6 +194,56 @@ static bool __rt_spin_trylock(struct qspinlock *lock,
 }
 
 /*
+ * MCS wait queue unqueuing code, borrow mostly from osq_lock.c.
+ */
+static struct mcs_spinlock *
+mcsq_wait_next(struct qspinlock *lock, struct mcs_spinlock *node,
+	       struct mcs_spinlock *prev)
+{
+	 struct mcs_spinlock *next = NULL;
+	 u32 tail = rt_read_tail(node);
+	 u32 old;
+
+	/*
+	 * If there is a prev node in queue, the 'old' value will be
+	 * the prev node's tail value. Otherwise, it's set to 0 since if
+	 * we're the only one in queue, the queue will then become empty.
+	 */
+	old = prev ? rt_read_tail(prev) : 0;
+
+	for (;;) {
+		if ((atomic_read(&lock->val) & _Q_TAIL_MASK) == tail &&
+		    cmpxchg_tail_acquire(lock, tail, old) == tail) {
+			/*
+			 * We are at the queue tail, we moved the @lock back.
+			 * @prev will now observe @lock and will complete its
+			 * unlock()/unqueue().
+			 */
+			break;
+		}
+
+		/*
+		 * We must xchg() the @node->next value, because if we were to
+		 * leave it in, a concurrent unlock()/unqueue() from
+		 * @node->next might complete Step-A and think its @prev is
+		 * still valid.
+		 *
+		 * If the concurrent unlock()/unqueue() wins the race, we'll
+		 * wait for either @lock to point to us, through its Step-B, or
+		 * wait for a new @node->next from its Step-C.
+		 */
+		if (node->next) {
+			next = xchg(&node->next, NULL);
+			if (next)
+				break;
+		}
+
+		cpu_relax();
+	}
+	return next;
+}
+
+/*
  * ====================[ Functions Used by qspinlock.c ]====================
  */
 
@@ -158,6 +261,17 @@ static inline int rt_pending(int val)
 }
 
 /*
+ * Initialize the RT fields of a MCS node.
+ */
+static inline void rt_init_node(struct mcs_spinlock *node, u32 tail)
+{
+	struct rt_node *n = (struct rt_node *)node;
+
+	n->prev = NULL;
+	n->tail = tail;
+}
+
+/*
  * Return: true if locked acquired
  *	   false if queuing in the MCS wait queue is needed.
  */
@@ -167,16 +281,98 @@ static inline bool rt_spin_trylock(struct qspinlock *lock)
 }
 
 /*
+ * Return: true if it has been unqueued and need to retry locking.
+ *	   false if it becomes the wait queue head & proceed to next step.
+ */
+static bool rt_wait_node_or_unqueue(struct qspinlock *lock,
+				    struct mcs_spinlock *node,
+				    struct mcs_spinlock *prev)
+{
+	struct mcs_spinlock *next;
+
+	rt_write_prev(node, prev);	/* Save previous node pointer */
+
+	while (!READ_ONCE(node->locked)) {
+		if (rt_task_priority(current, 0))
+			goto unqueue;
+		cpu_relax();
+	}
+	return false;
+
+unqueue:
+	/*
+	 * Step - A  -- stabilize @prev
+	 *
+	 * Undo our @prev->next assignment; this will make @prev's
+	 * unlock()/unqueue() wait for a next pointer since @lock points
+	 * to us (or later).
+	 */
+	for (;;) {
+		if (prev->next == node &&
+		    cmpxchg(&prev->next, node, NULL) == node)
+			break;
+
+		/*
+		 * We can only fail the cmpxchg() racing against an unlock(),
+		 * in which case we should observe @node->locked becoming
+		 * true.
+		 */
+		if (smp_load_acquire(&node->locked))
+			return false;
+
+		cpu_relax();
+
+		/*
+		 * Or we race against a concurrent unqueue()'s step-B, in which
+		 * case its step-C will write us a new @node->prev pointer.
+		 */
+		prev = rt_read_prev(node);
+	}
+
+	/*
+	 * Step - B -- stabilize @next
+	 *
+	 * Similar to unlock(), wait for @node->next or move @lock from @node
+	 * back to @prev.
+	 */
+	next = mcsq_wait_next(lock, node, prev);
+
+	/*
+	 * Step - C -- unlink
+	 *
+	 * @prev is stable because its still waiting for a new @prev->next
+	 * pointer, @next is stable because our @node->next pointer is NULL and
+	 * it will wait in Step-A.
+	 */
+	if (next) {
+		rt_write_prev(next, prev);
+		WRITE_ONCE(prev->next, next);
+	}
+
+	/*
+	 * Release the node.
+	 */
+	__this_cpu_dec(mcs_nodes[0].count);
+
+	return true;	/* Need to retry RT spinning */
+}
+
+/*
  * We need to make the non-RT tasks wait longer if RT tasks are spinning for
  * the lock. This is done to reduce the chance that a non-RT task may
  * accidently grab the lock away from the RT tasks in the short interval
  * where the pending priority may be reset after an RT task acquires the lock.
  *
- * Return: Current value of the lock.
+ * Return: RT_RETRY if it needs to retry locking.
+ *	   1 if lock acquired.
  */
-static u32 rt_wait_head_or_retry(struct qspinlock *lock)
+static u32 rt_spin_lock_or_retry(struct qspinlock *lock,
+				 struct mcs_spinlock *node)
 {
 	struct __qspinlock *l = (void *)lock;
+	struct mcs_spinlock *next;
+	bool retry = false;
+	u32 tail;
 
 	for (;;) {
 		u16 lockpend = READ_ONCE(l->locked_pending);
@@ -187,6 +383,14 @@ static u32 rt_wait_head_or_retry(struct qspinlock *lock)
 			if (!lockpend)
 				break;
 		}
+		/*
+		 * We need to break out of the non-RT wait queue and do
+		 * RT spinnning if we become an RT task.
+		 */
+		if (rt_task_priority(current, 0)) {
+			retry = true;
+			goto unlock;
+		}
 
 		/*
 		 * 4 cpu_relax's if RT tasks present.
@@ -198,7 +402,32 @@ static u32 rt_wait_head_or_retry(struct qspinlock *lock)
 		}
 		cpu_relax();
 	}
-	return atomic_read(&lock->val);
+
+unlock:
+	/*
+	 * Remove itself from the MCS wait queue (unlock).
+	 */
+	tail = rt_read_tail(node);
+	if (cmpxchg_tail_release(lock, tail, 0) == tail)
+		goto release;
+
+	/*
+	 * Second case.
+	 */
+	next = xchg(&node->next, NULL);
+	if (!next)
+		next = mcsq_wait_next(lock, node, NULL);
+
+	if (next)
+		WRITE_ONCE(next->locked, 1);
+
+release:
+	/*
+	 * Release the node.
+	 */
+	__this_cpu_dec(mcs_nodes[0].count);
+
+	return retry ? RT_RETRY : 1;
 }
 
 /*
-- 
1.8.3.1

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ