[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1483466430-8028-3-git-send-email-longman@redhat.com>
Date: Tue, 3 Jan 2017 13:00:25 -0500
From: Waiman Long <longman@...hat.com>
To: Peter Zijlstra <peterz@...radead.org>,
Ingo Molnar <mingo@...hat.com>,
Thomas Gleixner <tglx@...utronix.de>,
"H. Peter Anvin" <hpa@...or.com>
Cc: linux-kernel@...r.kernel.org, Waiman Long <longman@...hat.com>
Subject: [RFC PATCH 2/7] locking/rtqspinlock: Introduce realtime queued spinlocks
Realtime queued spinlock is a variant of the queued spinlock that is
suitable for use in a realtime running environment where the highest
priority task should always get its work done as soon as possible. That
means a minimal wait for spinlocks. To make that happen, RT tasks
will not wait in the MCS wait queue of the queued spinlocks. Instead,
they will spin directly on the lock and put the RT priority value of
the highest priority RT task into the pending byte of the lock.
With the assumption that the number of RT tasks actively running on
a system is relatively small, the performance overhead of such a RT
variant should be small too.
Realtime queued spinlock is currently mutually exclusive of paravirtual
spinlocks.
Signed-off-by: Waiman Long <longman@...hat.com>
---
arch/x86/Kconfig | 2 +-
kernel/Kconfig.locks | 9 +++
kernel/locking/qspinlock.c | 34 +++++++++-
kernel/locking/qspinlock_rt.h | 145 ++++++++++++++++++++++++++++++++++++++++++
4 files changed, 186 insertions(+), 4 deletions(-)
create mode 100644 kernel/locking/qspinlock_rt.h
diff --git a/arch/x86/Kconfig b/arch/x86/Kconfig
index e487493..7a97b31 100644
--- a/arch/x86/Kconfig
+++ b/arch/x86/Kconfig
@@ -723,7 +723,7 @@ config PARAVIRT_DEBUG
config PARAVIRT_SPINLOCKS
bool "Paravirtualization layer for spinlocks"
- depends on PARAVIRT && SMP
+ depends on PARAVIRT && SMP && !REALTIME_QUEUED_SPINLOCKS
---help---
Paravirtualized spinlocks allow a pvops backend to replace the
spinlock implementation with something virtualization-friendly
diff --git a/kernel/Kconfig.locks b/kernel/Kconfig.locks
index 84d882f..93f5bb5 100644
--- a/kernel/Kconfig.locks
+++ b/kernel/Kconfig.locks
@@ -248,3 +248,12 @@ config ARCH_USE_QUEUED_RWLOCKS
config QUEUED_RWLOCKS
def_bool y if ARCH_USE_QUEUED_RWLOCKS
depends on SMP
+
+config REALTIME_QUEUED_SPINLOCKS
+ bool "Realtime queued spinlocks"
+ default n
+ depends on QUEUED_SPINLOCKS && PREEMPT && NR_CPUS < 16384
+ help
+ This enables support for realtime environment where tasks with
+ realtime priority will get the spinlocks ASAP instead of waiting
+ in a queue.
diff --git a/kernel/locking/qspinlock.c b/kernel/locking/qspinlock.c
index b2caec7..cb5c2e5 100644
--- a/kernel/locking/qspinlock.c
+++ b/kernel/locking/qspinlock.c
@@ -268,6 +268,19 @@ static __always_inline u32 __pv_wait_head_or_lock(struct qspinlock *lock,
#endif
/*
+ * Realtime queued spinlock is mutual exclusive with native and PV spinlocks.
+ */
+#ifdef CONFIG_REALTIME_QUEUED_SPINLOCKS
+#include "qspinlock_rt.h"
+#else
+static __always_inline u32 rt_wait_head_or_retry(struct qspinlock *lock)
+ { return 0; }
+#define rt_pending(v) 0
+#define rt_enabled() false
+#define rt_spin_trylock(l) false
+#endif
+
+/*
* Various notes on spin_is_locked() and spin_unlock_wait(), which are
* 'interesting' functions:
*
@@ -415,7 +428,7 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
BUILD_BUG_ON(CONFIG_NR_CPUS >= (1U << _Q_TAIL_CPU_BITS));
- if (pv_enabled())
+ if (pv_enabled() || rt_enabled())
goto queue;
if (virt_spin_lock(lock))
@@ -490,6 +503,9 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
* queuing.
*/
queue:
+ if (rt_spin_trylock(lock))
+ return;
+
node = this_cpu_ptr(&mcs_nodes[0]);
idx = node->count++;
tail = encode_tail(smp_processor_id(), idx);
@@ -573,6 +589,13 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
if ((val = pv_wait_head_or_lock(lock, node)))
goto locked;
+ /*
+ * The RT rt_wait_head_or_lock function, if active, will acquire the
+ * lock and return a non-zero value.
+ */
+ if ((val = rt_wait_head_or_retry(lock)))
+ goto locked;
+
val = smp_cond_load_acquire(&lock->val.counter, !(VAL & _Q_LOCKED_PENDING_MASK));
locked:
@@ -587,7 +610,9 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
* to grab the lock.
*/
for (;;) {
- /* In the PV case we might already have _Q_LOCKED_VAL set */
+ /*
+ * In the PV & RT cases we might already have _Q_LOCKED_VAL set.
+ */
if ((val & _Q_TAIL_MASK) != tail) {
set_locked(lock);
break;
@@ -596,8 +621,11 @@ void queued_spin_lock_slowpath(struct qspinlock *lock, u32 val)
* The smp_cond_load_acquire() call above has provided the
* necessary acquire semantics required for locking. At most
* two iterations of this loop may be ran.
+ *
+ * In the RT case, the pending byte needs to be preserved.
*/
- old = atomic_cmpxchg_relaxed(&lock->val, val, _Q_LOCKED_VAL);
+ old = atomic_cmpxchg_relaxed(&lock->val, val,
+ rt_pending(val) | _Q_LOCKED_VAL);
if (old == val)
goto release; /* No contention */
diff --git a/kernel/locking/qspinlock_rt.h b/kernel/locking/qspinlock_rt.h
new file mode 100644
index 0000000..69513d6
--- /dev/null
+++ b/kernel/locking/qspinlock_rt.h
@@ -0,0 +1,145 @@
+/*
+ * Realtime queued spinlocks
+ * -------------------------
+ *
+ * By Waiman Long <longman@...hat.com>
+ *
+ * This is a variant of queued spinlocks that is designed to meet the
+ * requirement of a realtime environment. Tasks with realtime priority will
+ * spin on the lock instead of waiting in the queue like the other non-RT
+ * tasks. Those RT tasks make use of the pending byte to store the rt_priority
+ * of the highest priority task that is currently spinning. That task will
+ * then acquire the lock and reset the pending priority if set previously when
+ * it becomes free effectively jumping the queue ahead of the other lower
+ * priority RT tasks as well as non-RT tasks. The other spinning RT tasks
+ * should then bid to set this pending byte to their rt_priority level again.
+ *
+ * Assuming that the number of RT tasks in a system is limited, the
+ * performance overhead of RT tasks spinning on the lock should be small.
+ *
+ * As RT qspinlock needs the whole pending byte, it cannot be used on kernel
+ * configured to support 16K or more CPUs (CONFIG_NR_CPUS).
+ */
+#include <linux/sched.h>
+
+/*
+ * =========================[ Helper Functions ]=========================
+ */
+
+/*
+ * Translate the priority of a task to an equivalent RT priority
+ */
+static u8 rt_task_priority(struct task_struct *task)
+{
+ int prio = READ_ONCE(task->prio);
+
+ return (prio >= MAX_RT_PRIO) ? 0 : (u8)(MAX_RT_PRIO - prio);
+}
+
+/*
+ * ====================[ Functions Used by qspinlock.c ]====================
+ */
+
+static inline bool rt_enabled(void)
+{
+ return true;
+}
+
+/*
+ * Return the pending byte portion of the integer value of the lock.
+ */
+static inline int rt_pending(int val)
+{
+ return val & _Q_PENDING_MASK;
+}
+
+/*
+ * Return: true if locked acquired
+ * false if queuing in the MCS wait queue is needed.
+ */
+static bool rt_spin_trylock(struct qspinlock *lock)
+{
+ struct __qspinlock *l = (void *)lock;
+ u8 prio = rt_task_priority(current);
+
+ BUILD_BUG_ON(_Q_PENDING_BITS != 8);
+
+ if (!prio)
+ return false;
+
+ /*
+ * Spin on the lock and try to set its priority into the pending byte.
+ */
+ for (;;) {
+ u16 lockpend = READ_ONCE(l->locked_pending);
+ u8 pdprio = (u8)(lockpend >> _Q_PENDING_OFFSET);
+
+ if (prio < pdprio) {
+ /*
+ * Higher priority task present, one more cpu_relax()
+ * before the next attempt.
+ */
+ cpu_relax();
+ goto next;
+ }
+
+ if (!(lockpend & _Q_LOCKED_MASK)) {
+ u16 old = lockpend;
+ u16 new = (pdprio == prio)
+ ? _Q_LOCKED_VAL : (lockpend | _Q_LOCKED_VAL);
+
+ /*
+ * Lock is free and priority <= prio, try to acquire
+ * the lock and clear the priority`if it matches my
+ * prio.
+ */
+ lockpend = cmpxchg_acquire(&l->locked_pending,
+ old, new);
+ if (old == lockpend)
+ break;
+
+ pdprio = (u8)(lockpend >> _Q_PENDING_OFFSET);
+ }
+
+ if (pdprio < prio)
+ cmpxchg_relaxed(&l->pending, pdprio, prio);
+next:
+ cpu_relax();
+ }
+ return true;
+}
+
+/*
+ * We need to make the non-RT tasks wait longer if RT tasks are spinning for
+ * the lock. This is done to reduce the chance that a non-RT task may
+ * accidently grab the lock away from the RT tasks in the short interval
+ * where the pending priority may be reset after an RT task acquires the lock.
+ *
+ * Return: Current value of the lock.
+ */
+static u32 rt_wait_head_or_retry(struct qspinlock *lock)
+{
+ struct __qspinlock *l = (void *)lock;
+
+ for (;;) {
+ u16 lockpend = READ_ONCE(l->locked_pending);
+
+ if (!lockpend) {
+ lockpend = cmpxchg_acquire(&l->locked_pending, 0,
+ _Q_LOCKED_VAL);
+ if (!lockpend)
+ break;
+ }
+
+ /*
+ * 4 cpu_relax's if RT tasks present.
+ */
+ if (lockpend & _Q_PENDING_MASK) {
+ cpu_relax();
+ cpu_relax();
+ cpu_relax();
+ }
+ cpu_relax();
+ }
+ return atomic_read(&lock->val);
+}
--
1.8.3.1
Powered by blists - more mailing lists