lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20240914085327.32912-4-yongli-oc@zhaoxin.com>
Date: Sat, 14 Sep 2024 16:53:26 +0800
From: yongli-oc <yongli-oc@...oxin.com>
To: <peterz@...radead.org>, <mingo@...hat.com>, <will@...nel.org>,
	<longman@...hat.com>, <boqun.feng@...il.com>
CC: <linux-kernel@...r.kernel.org>, <yongli@...oxin.com>,
	<louisqi@...oxin.com>, <cobechen@...oxin.com>, <jiangbowang@...oxin.com>
Subject: [PATCH 3/4] locking/osq_lock: From x_osq_lock/unlock to numa-aware lock/unlock.

According to the contention level, switches from x_osq_lock to
numa-aware osq_lock.
The numa-aware lock is a two level osq_lock.
The Makefile for dynamic numa-aware osq lock.

Signed-off-by: yongli-oc <yongli-oc@...oxin.com>
---
 kernel/locking/Makefile      |   1 +
 kernel/locking/numa.h        |  98 ++++++++
 kernel/locking/numa_osq.h    |  32 +++
 kernel/locking/x_osq_lock.c  | 332 +++++++++++++++++++++++++++
 kernel/locking/zx_numa_osq.c | 433 +++++++++++++++++++++++++++++++++++
 5 files changed, 896 insertions(+)
 create mode 100644 kernel/locking/numa.h
 create mode 100644 kernel/locking/numa_osq.h
 create mode 100644 kernel/locking/x_osq_lock.c
 create mode 100644 kernel/locking/zx_numa_osq.c

diff --git a/kernel/locking/Makefile b/kernel/locking/Makefile
index 0db4093d17b8..00c1d5bb6eb9 100644
--- a/kernel/locking/Makefile
+++ b/kernel/locking/Makefile
@@ -22,6 +22,7 @@ obj-$(CONFIG_LOCKDEP) += lockdep_proc.o
 endif
 obj-$(CONFIG_SMP) += spinlock.o
 obj-$(CONFIG_LOCK_SPIN_ON_OWNER) += osq_lock.o
+obj-$(CONFIG_LOCK_SPIN_ON_OWNER_NUMA) += x_osq_lock.o zx_numa_osq.o zx_numa.o
 obj-$(CONFIG_PROVE_LOCKING) += spinlock.o
 obj-$(CONFIG_QUEUED_SPINLOCKS) += qspinlock.o
 obj-$(CONFIG_RT_MUTEXES) += rtmutex_api.o
diff --git a/kernel/locking/numa.h b/kernel/locking/numa.h
new file mode 100644
index 000000000000..01e5aef3257a
--- /dev/null
+++ b/kernel/locking/numa.h
@@ -0,0 +1,98 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __LINUX_NUMA_LOCK_H
+#define __LINUX_NUMA_LOCK_H
+#include "mcs_spinlock.h"
+
+struct optimistic_spin_node {
+	struct optimistic_spin_node *next, *prev;
+	int numa;
+	int locked; /* 1 if lock acquired */
+	int cpu; /* encoded CPU # + 1 value */
+	u32 serial;
+};
+
+
+struct _numa_buf {
+	void *numa_ptr;
+	struct list_head list;
+	u32 lockaddr;
+	u32 highaddr;
+	u8 idle;
+	u8 type;
+	u16 index;
+};
+
+struct cache_padding {
+	char x[0];
+} ____cacheline_internodealigned_in_smp;
+#define CACHE_PADDING(name)	struct cache_padding name
+
+struct _numa_lock {
+	atomic_t tail ____cacheline_aligned_in_smp;
+	atomic_t addr;
+	u8 shift;
+	u8 stopping;
+	u16 numa_nodes;
+	u32 accessed;
+	uint64_t totalaccessed;
+	u32 nodeswitched;
+	atomic_t initlock;
+	atomic_t pending;
+	union {
+		struct mcs_spinlock mcs_node;
+		struct optimistic_spin_node osq_node;
+	};
+	CACHE_PADDING(pad);
+};
+
+struct numa_cpu_info {
+	__u8	x86_model;
+	/* CPU family */
+	__u8	x86;
+	/* CPU vendor */
+	__u8	x86_vendor;
+	__u8	x86_reserved;
+	u32	feature1;
+};
+
+#define NUMAEXPAND 1
+
+#define COHORT_START 1
+#define ACQUIRE_NUMALOCK (UINT_MAX-1)
+#define NODE_WAIT UINT_MAX
+#define LOCK_NUMALOCK 1
+#define UNLOCK_NUMALOCK 0
+
+#define NUMALOCKDYNAMIC 0xff
+#define TURNTONUMAREADY 0xa5a5
+#define NUMATURNBACKREADY 0x5a5a
+
+#define NUMA_LOCKED_VAL 0xf5efef
+#define NUMA_UNLOCKED_VAL 0
+
+#define NUMASTEERMASK 0xf0000000
+#define HIGH32BITMASK 0xffffffff00000000
+#define LOW32MASK 0xffffffff
+
+extern int NUMASHIFT;
+extern int NUMACLUSTERS;
+extern int zx_numa_lock_total;
+extern struct _numa_buf *zx_numa_entry;
+extern atomic_t numa_count;
+extern int enable_zx_numa_osq_lock;
+extern u32 zx_numa_lock;
+extern int dynamic_enable;
+extern struct kmem_cache *zx_numa_lock_cachep;
+
+static inline u32 ptrmask(void *s)
+{
+	return (uint64_t)s & LOW32MASK;
+}
+inline void *get_numa_lock(int index);
+
+int zx_check_numa_dynamic_locked(u32 lockaddr, struct _numa_lock *_numa_lock,
+		int t);
+int zx_numa_lock_ptr_get(void *p);
+void numa_lock_init_data(struct _numa_lock *s, int clusters, u32 lockval,
+		u32 lockaddr);
+#endif
diff --git a/kernel/locking/numa_osq.h b/kernel/locking/numa_osq.h
new file mode 100644
index 000000000000..4c99ad60c4e0
--- /dev/null
+++ b/kernel/locking/numa_osq.h
@@ -0,0 +1,32 @@
+/* SPDX-License-Identifier: GPL-2.0 */
+#ifndef __LINUX_NUMA_OSQ_H
+#define __LINUX_NUMA_OSQ_H
+
+#include <linux/osq_lock.h>
+#include "mcs_spinlock.h"
+
+#define OSQLOCKINITED 0
+#define OSQTONUMADETECT 0x10
+#define OSQLOCKSTOPPING 0xfc
+#define OSQ_LOCKED_VAL 0xffff
+
+extern u16 osq_keep_times;
+extern u16 osq_lock_depth;
+extern int osq_node_max;
+
+inline int encode_cpu(int cpu_nr);
+inline int node_cpu(struct optimistic_spin_node *node);
+inline struct optimistic_spin_node *decode_cpu(int encoded_cpu_val);
+
+void zx_osq_lock_stopping(struct optimistic_spin_queue *lock);
+void zx_osq_numa_start(struct optimistic_spin_queue *lock);
+void zx_osq_turn_numa_waiting(struct optimistic_spin_queue *lock);
+
+bool x_osq_lock(struct optimistic_spin_queue *lock);
+void x_osq_unlock(struct optimistic_spin_queue *lock);
+bool x_osq_is_locked(struct optimistic_spin_queue *lock);
+inline void zx_numa_osq_unlock(struct optimistic_spin_queue *qslock,
+		struct _numa_lock *n);
+inline bool zx_numa_osq_lock(struct optimistic_spin_queue *qslock,
+		struct _numa_lock *n);
+#endif
diff --git a/kernel/locking/x_osq_lock.c b/kernel/locking/x_osq_lock.c
new file mode 100644
index 000000000000..6da8905ae7d6
--- /dev/null
+++ b/kernel/locking/x_osq_lock.c
@@ -0,0 +1,332 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * crossing from osq_lock to numa-aware lock
+ */
+#include <linux/percpu.h>
+#include <linux/sched.h>
+#include <linux/osq_lock.h>
+#include "numa.h"
+#include "numa_osq.h"
+
+u16 osq_lock_depth = 8;
+u16 osq_keep_times = 32;
+
+/*
+ * An MCS like lock especially tailored for optimistic spinning for sleeping
+ * lock implementations (mutex, rwsem, etc).
+ *
+ * Using a single mcs node per CPU is safe because sleeping locks should not be
+ * called from interrupt context and we have preemption disabled while
+ * spinning.
+ */
+DECLARE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node, osq_node);
+
+/*
+ * Get a stable @node->next pointer, either for unlock() or unqueue() purposes.
+ * Can return NULL in case we were the last queued and we updated @lock instead.
+ *
+ * If osq_lock() is being cancelled there must be a previous node
+ * and 'old_cpu' is its CPU #.
+ * For osq_unlock() there is never a previous node and old_cpu is
+ * set to OSQ_UNLOCKED_VAL.
+ */
+static inline struct optimistic_spin_node *
+osq_wait_next_stop(struct optimistic_spin_queue *lock,
+	      struct optimistic_spin_node *node,
+	      int old_cpu)
+{
+	u16 curr = encode_cpu(smp_processor_id());
+	u16 old = old_cpu;
+
+	if (lock->numa_enable == OSQLOCKSTOPPING && old == OSQ_UNLOCKED_VAL)
+		old = OSQ_LOCKED_VAL;
+
+	for (;;) {
+		if (READ_ONCE(lock->tail16) == curr &&
+		    cmpxchg(&lock->tail16, curr, old) == curr) {
+
+			/*
+			 * We were the last queued, we moved @lock back. @prev
+			 * will now observe @lock and will complete its
+			 * unlock()/unqueue().
+			 */
+			return NULL;
+		}
+
+		/*
+		 * We must xchg() the @node->next value, because if we were to
+		 * leave it in, a concurrent unlock()/unqueue() from
+		 * @node->next might complete Step-A and think its @prev is
+		 * still valid.
+		 *
+		 * If the concurrent unlock()/unqueue() wins the race, we'll
+		 * wait for either @lock to point to us, through its Step-B, or
+		 * wait for a new @node->next from its Step-C.
+		 */
+		if (node->next) {
+			struct optimistic_spin_node *next;
+
+			next = xchg(&node->next, NULL);
+			if (next)
+				return next;
+		}
+
+		cpu_relax();
+	}
+}
+
+bool x_osq_lock(struct optimistic_spin_queue *lock)
+{
+	struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
+	struct optimistic_spin_node *prev, *next;
+	int cpu = smp_processor_id();
+	u16 curr = encode_cpu(cpu);
+	struct optimistic_spin_queue tail;
+	u16 old;
+
+	tail.val = READ_ONCE(lock->val);
+	if (unlikely(tail.numa_enable == OSQLOCKSTOPPING)) {
+		zx_osq_turn_numa_waiting(lock);
+		return x_osq_lock(lock);
+	}
+
+	if (unlikely(tail.numa_enable == NUMALOCKDYNAMIC)) {
+		struct _numa_lock *_numa_lock = NULL;
+		struct _numa_lock *node_lock = NULL;
+
+		_numa_lock = get_numa_lock(tail.index);
+		node_lock = (struct _numa_lock *) _numa_lock +
+					(cpu >> NUMASHIFT);
+
+		prefetch(node_lock);
+		return zx_numa_osq_lock(lock, _numa_lock);
+	}
+
+	node->locked = 0;
+	node->next = NULL;
+	node->cpu = curr;
+
+	/*
+	 * We need both ACQUIRE (pairs with corresponding RELEASE in
+	 * unlock() uncontended, or fastpath) and RELEASE (to publish
+	 * the node fields we just initialised) semantics when updating
+	 * the lock tail.
+	 */
+
+	if (likely(tail.numa_enable >= OSQTONUMADETECT)) {
+		struct optimistic_spin_queue ss;
+
+		while (1) {
+			ss.val = atomic_read(&lock->tail);
+			if (ss.tail16 == OSQ_LOCKED_VAL) {
+				zx_osq_turn_numa_waiting(lock);
+				return x_osq_lock(lock);
+			}
+			if (cmpxchg(&lock->tail16, ss.tail16, curr)
+					== ss.tail16) {
+				old = ss.tail16;
+				break;
+			}
+			cpu_relax();
+		}
+	} else
+		old = xchg(&lock->tail16, curr);
+
+	if (old == OSQ_UNLOCKED_VAL) {
+		node->serial = 1;
+		return true;
+	}
+
+	prev = decode_cpu(old);
+	node->prev = prev;
+
+	node->serial = prev->serial + 1;
+	/*
+	 * osq_lock()			unqueue
+	 *
+	 * node->prev = prev		osq_wait_next()
+	 * WMB				MB
+	 * prev->next = node		next->prev = prev // unqueue-C
+	 *
+	 * Here 'node->prev' and 'next->prev' are the same variable and we need
+	 * to ensure these stores happen in-order to avoid corrupting the list.
+	 */
+	smp_wmb();
+
+	WRITE_ONCE(prev->next, node);
+
+	/*
+	 * Normally @prev is untouchable after the above store; because at that
+	 * moment unlock can proceed and wipe the node element from stack.
+	 *
+	 * However, since our nodes are static per-cpu storage, we're
+	 * guaranteed their existence -- this allows us to apply
+	 * cmpxchg in an attempt to undo our queueing.
+	 */
+
+	/*
+	 * Wait to acquire the lock or cancellation. Note that need_resched()
+	 * will come with an IPI, which will wake smp_cond_load_relaxed() if it
+	 * is implemented with a monitor-wait. vcpu_is_preempted() relies on
+	 * polling, be careful.
+	 */
+	if (smp_cond_load_relaxed(&node->locked, VAL || need_resched() ||
+				  vcpu_is_preempted(node_cpu(node->prev))))
+		return true;
+
+	/* unqueue */
+	/*
+	 * Step - A  -- stabilize @prev
+	 *
+	 * Undo our @prev->next assignment; this will make @prev's
+	 * unlock()/unqueue() wait for a next pointer since @lock points to us
+	 * (or later).
+	 */
+
+	for (;;) {
+		/*
+		 * cpu_relax() below implies a compiler barrier which would
+		 * prevent this comparison being optimized away.
+		 */
+		if (data_race(prev->next) == node &&
+		    cmpxchg(&prev->next, node, NULL) == node)
+			break;
+
+		/*
+		 * We can only fail the cmpxchg() racing against an unlock(),
+		 * in which case we should observe @node->locked becoming
+		 * true.
+		 */
+		if (smp_load_acquire(&node->locked))
+			return true;
+
+		cpu_relax();
+
+		/*
+		 * Or we race against a concurrent unqueue()'s step-B, in which
+		 * case its step-C will write us a new @node->prev pointer.
+		 */
+		prev = READ_ONCE(node->prev);
+	}
+
+	/*
+	 * Step - B -- stabilize @next
+	 *
+	 * Similar to unlock(), wait for @node->next or move @lock from @node
+	 * back to @prev.
+	 */
+
+	next = osq_wait_next_stop(lock, node, prev->cpu);
+	if (!next)
+		return false;
+
+	/*
+	 * Step - C -- unlink
+	 *
+	 * @prev is stable because its still waiting for a new @prev->next
+	 * pointer, @next is stable because our @node->next pointer is NULL and
+	 * it will wait in Step-A.
+	 */
+
+	WRITE_ONCE(next->prev, prev);
+	WRITE_ONCE(prev->next, next);
+
+	return false;
+}
+
+
+
+void x_osq_unlock(struct optimistic_spin_queue *lock)
+{
+	struct optimistic_spin_node *node, *next;
+	int threadshold = osq_lock_depth;
+	int cpu = smp_processor_id();
+	u16 curr = encode_cpu(cpu);
+	int depth = 0;
+	u32 count = 0;
+
+	if (unlikely(lock->numa_enable == NUMALOCKDYNAMIC)) {
+		struct _numa_lock *_numa_lock = get_numa_lock(lock->index);
+
+		prefetch((struct _numa_lock *) _numa_lock + (cpu >> NUMASHIFT));
+		return zx_numa_osq_unlock(lock, _numa_lock);
+	}
+	/*
+	 * Fast path for the uncontended case.
+	 */
+	if (unlikely(lock->numa_enable == OSQTONUMADETECT)) {
+		struct optimistic_spin_node *node_last = NULL;
+		u16 tail = 0;
+
+		tail = cmpxchg(&lock->tail16, curr, OSQ_UNLOCKED_VAL);
+		if (tail == curr)
+			return;
+
+		node = this_cpu_ptr(&osq_node);
+		node_last = decode_cpu(tail);
+		depth = node_last->serial - node->serial;
+		count = READ_ONCE(node->locked);
+		if (count > osq_keep_times && (dynamic_enable & 0x1))
+			zx_osq_lock_stopping(lock);
+	} else if (unlikely(lock->numa_enable == OSQLOCKSTOPPING)) {
+		if (cmpxchg(&lock->tail16, curr, OSQ_LOCKED_VAL)
+					== curr) {
+			zx_osq_numa_start(lock);
+			return;
+		}
+	} else {
+		struct optimistic_spin_queue t;
+
+		t.val = 0;
+		if (dynamic_enable & 0x1) {
+			if (atomic_read(&numa_count) < zx_numa_lock_total)
+				t.numa_enable = OSQTONUMADETECT;
+		}
+		if (t.numa_enable == OSQTONUMADETECT) {
+			if (atomic_cmpxchg_release(&lock->tail, curr,
+				(t.val | OSQ_UNLOCKED_VAL)) == curr)
+				return;
+		} else if (cmpxchg(&lock->tail16, curr,
+				OSQ_UNLOCKED_VAL) == curr)
+			return;
+	}
+
+	/*
+	 * Second most likely case.
+	 */
+	node = this_cpu_ptr(&osq_node);
+	next = xchg(&node->next, NULL);
+	if (next) {
+		if (depth > threadshold)
+			WRITE_ONCE(next->locked, count + 1);
+		else
+			WRITE_ONCE(next->locked, 1);
+		return;
+	}
+
+	next = osq_wait_next_stop(lock, node, OSQ_UNLOCKED_VAL);
+	if (next) {
+		if (depth > threadshold)
+			WRITE_ONCE(next->locked, count + 1);
+		else
+			WRITE_ONCE(next->locked, 1);
+	}
+}
+
+bool x_osq_is_locked(struct optimistic_spin_queue *lock)
+{
+	struct optimistic_spin_queue val;
+
+	val.val = atomic_read(&lock->tail);
+	if (val.tail16 == OSQ_UNLOCKED_VAL)
+		return false;
+
+	if (val.tail16 == OSQ_LOCKED_VAL) {
+		if (val.numa_enable != NUMALOCKDYNAMIC)
+			return true;
+		return zx_check_numa_dynamic_locked(ptrmask(lock),
+					get_numa_lock(val.index), 0);
+	}
+
+	return true;
+}
diff --git a/kernel/locking/zx_numa_osq.c b/kernel/locking/zx_numa_osq.c
new file mode 100644
index 000000000000..f4c3dba208d3
--- /dev/null
+++ b/kernel/locking/zx_numa_osq.c
@@ -0,0 +1,433 @@
+// SPDX-License-Identifier: GPL-2.0
+/*
+ * Dynamic numa-aware osq lock
+ * Author: LiYong <yongli-oc@...oxin.com>
+ *
+ */
+#include <linux/cpumask.h>
+#include <asm/byteorder.h>
+#include <linux/percpu.h>
+#include <linux/sched.h>
+#include <linux/slab.h>
+#include <linux/osq_lock.h>
+#include "numa.h"
+#include "numa_osq.h"
+
+int osq_node_max = 256;
+
+/*
+ * The pending bit spinning loop count.
+ * This heuristic is used to limit the number of lockword accesses
+ * made by atomic_cond_read_relaxed when waiting for the lock to
+ * transition out of the "== _Q_PENDING_VAL" state. We don't spin
+ * indefinitely because there's no guarantee that we'll make forward
+ * progress.
+ */
+
+static DEFINE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node, osq_cpu_node);
+
+/*
+ * We use the value 0 to represent "no CPU", thus the encoded value
+ * will be the CPU number incremented by 1.
+ */
+static inline int decode(int cpu_nr)
+{
+	return cpu_nr - 1;
+}
+
+static inline struct optimistic_spin_node *decode_curr(int encoded_cpu_val)
+{
+	int cpu_nr = decode(encoded_cpu_val);
+
+	return per_cpu_ptr(&osq_cpu_node, cpu_nr);
+}
+
+static int atomic64_cmpxchg_notequal(void *qslock, atomic_t *tail, int curr)
+{
+	u64 ss = 0;
+	u32 addr = ptrmask(qslock);
+	u64 addrcurr = (((u64)addr) << 32) | curr;
+
+	while (1) {
+		ss = atomic64_read((atomic64_t *) tail);
+		if ((ss >> 32) != addr)
+			return NUMA_LOCKED_VAL;
+		if ((ss & LOW32MASK) == NUMA_LOCKED_VAL)
+			return NUMA_LOCKED_VAL;
+		if (atomic64_cmpxchg((atomic64_t *) tail, ss, addrcurr) == ss)
+			return ss & LOW32MASK;
+		cpu_relax();
+	}
+}
+
+void zx_osq_lock_stopping(struct optimistic_spin_queue *lock)
+{
+	int s = 0;
+
+	s = zx_numa_lock_ptr_get(lock);
+	if (s < zx_numa_lock_total) {
+		numa_lock_init_data(zx_numa_entry[s].numa_ptr,
+			NUMACLUSTERS, NUMA_UNLOCKED_VAL,
+			ptrmask(lock));
+
+		WRITE_ONCE(lock->index, s);
+		zx_numa_entry[s].type = 1;
+		smp_mb();/*should set these before enable*/
+		prefetchw(&lock->numa_enable);
+		WRITE_ONCE(lock->numa_enable, OSQLOCKSTOPPING);
+	} else {
+		prefetchw(&lock->numa_enable);
+		WRITE_ONCE(lock->numa_enable, OSQLOCKINITED);
+	}
+}
+
+void zx_osq_numa_start(struct optimistic_spin_queue *lock)
+{
+	struct _numa_lock *_numa_lock = get_numa_lock(lock->index);
+
+	prefetchw(&lock->numa_enable);
+	WRITE_ONCE(lock->numa_enable, NUMALOCKDYNAMIC);
+	smp_mb(); /*should keep lock->numa_enable modified first*/
+	atomic_set(&_numa_lock->initlock, TURNTONUMAREADY);
+}
+
+
+void zx_osq_turn_numa_waiting(struct optimistic_spin_queue *lock)
+{
+	struct _numa_lock *_numa_lock = get_numa_lock(lock->index);
+
+	atomic_inc(&_numa_lock->pending);
+	while (1) {
+		int s = atomic_read(&_numa_lock->initlock);
+
+		if (s == TURNTONUMAREADY)
+			break;
+		cpu_relax();
+		cpu_relax();
+		cpu_relax();
+		cpu_relax();
+
+	}
+	atomic_dec(&_numa_lock->pending);
+}
+
+
+
+
+static struct optimistic_spin_node *
+zx_numa_osq_wait_next(struct _numa_lock *lock,
+		struct optimistic_spin_node *node,
+		struct optimistic_spin_node *prev, int cpu)
+{
+	struct optimistic_spin_node *next = NULL;
+	int curr = encode_cpu(cpu);
+	int old;
+
+	old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
+	for (;;) {
+		if (atomic_read(&lock->tail) == curr &&
+		    atomic_cmpxchg_acquire(&lock->tail, curr, old) == curr) {
+
+			break;
+		}
+		if (node->next) {
+			next = xchg(&node->next, NULL);
+			if (next)
+				break;
+		}
+		cpu_relax();
+	}
+	return next;
+}
+static void zx_numa_turn_osq_waiting(struct optimistic_spin_queue *lock,
+			struct _numa_lock *_numa_lock)
+{
+	struct _numa_lock *numa_lock = _numa_lock + _numa_lock->numa_nodes;
+	int lockaddr = ptrmask(lock);
+	u64 s = 0;
+	struct optimistic_spin_queue tail;
+
+	tail.numa_enable = NUMALOCKDYNAMIC;
+	tail.index = lock->index;
+	tail.tail16 = OSQ_LOCKED_VAL;
+	while (1) {
+		cpu_relax(); cpu_relax(); cpu_relax(); cpu_relax();
+		s = atomic64_read((atomic64_t *) &numa_lock->tail);
+		if ((s >> 32) != lockaddr)
+			break;
+		if ((s & LOW32MASK) == NUMA_LOCKED_VAL)
+			break;
+	}
+	prefetchw(&lock->tail);
+	if (atomic_cmpxchg(&lock->tail, tail.val, OSQ_UNLOCKED_VAL)
+			== tail.val) {
+		;
+	}
+
+}
+
+static int _zx_node_osq_lock_internal(struct optimistic_spin_queue *qslock,
+	struct optimistic_spin_node *node, struct optimistic_spin_node *prev,
+	struct _numa_lock *node_lock, int cpu, int *cur_status)
+{
+	struct optimistic_spin_node *next = NULL;
+
+	for (;;) {
+		struct optimistic_spin_node *node_prev = NULL;
+
+		if (prev->next == node &&
+		    cmpxchg(&prev->next, node, NULL) == node) {
+			break;
+		}
+		/*load locked first each time*/
+		*cur_status = smp_load_acquire(&node->locked);
+
+		if (*cur_status != NODE_WAIT)
+			return 0; //goto NODE_UNLOCK;
+
+		cpu_relax();
+		node_prev = READ_ONCE(node->prev);
+		if (node_prev != prev)
+			prev = node_prev;
+	}
+
+	next = zx_numa_osq_wait_next(node_lock, node, prev, cpu);
+	if (!next)
+		return -1;
+
+	WRITE_ONCE(next->prev, prev);
+	WRITE_ONCE(prev->next, next);
+
+	return -1;
+}
+
+static int _zx_node_osq_lock(struct optimistic_spin_queue *qslock,
+					struct _numa_lock *_numa_lock)
+{
+	struct optimistic_spin_node *node = this_cpu_ptr(&osq_cpu_node);
+	struct optimistic_spin_node *prev = NULL;
+	int cpu = smp_processor_id();
+	int curr = encode_cpu(cpu);
+	int numa = cpu >> _numa_lock->shift;
+	struct _numa_lock *node_lock = _numa_lock + numa;
+	int cur_status = 0;
+	int old = 0;
+
+	node->locked = NODE_WAIT;
+	node->next = NULL;
+	node->cpu = curr;
+
+	old = atomic64_cmpxchg_notequal(qslock, &node_lock->tail, curr);
+
+	if (old == NUMA_LOCKED_VAL) {
+		bool s = true;
+
+		zx_numa_turn_osq_waiting(qslock, _numa_lock);
+		s = osq_lock(qslock);
+		if (s == true)
+			return 1;
+		else
+			return -1;
+	}
+
+	if (old == 0) {
+		node->locked = COHORT_START;
+		return ACQUIRE_NUMALOCK;
+	}
+
+	prev = decode_curr(old);
+	node->prev = prev;
+
+	smp_mb(); /* make sure node set before set pre->next */
+
+	WRITE_ONCE(prev->next, node);
+
+	while ((cur_status = READ_ONCE(node->locked)) == NODE_WAIT) {
+		if (need_resched() || vcpu_is_preempted(node_cpu(node->prev))) {
+			int ddd = _zx_node_osq_lock_internal(qslock, node, prev,
+						node_lock, cpu, &cur_status);
+
+			if (cur_status != NODE_WAIT)
+				goto NODE_UNLOCK;
+			if (ddd == -1)
+				return -1;
+		}
+		cpu_relax();
+	}
+NODE_UNLOCK:
+	if (cur_status == ACQUIRE_NUMALOCK)
+		node->locked = COHORT_START;
+	return cur_status;
+}
+static int _zx_numa_osq_lock(struct optimistic_spin_queue *qslock, int cpu,
+				struct _numa_lock *_numa_lock)
+{
+	int numacpu = cpu >> _numa_lock->shift;
+	int numacurr = encode_cpu(numacpu);
+
+	struct optimistic_spin_node *node = &(_numa_lock + numacpu)->osq_node;
+	struct _numa_lock *numa_lock = _numa_lock + _numa_lock->numa_nodes;
+	struct optimistic_spin_node *prevnode = NULL;
+	int prev = 0;
+
+	node->next = NULL;
+	node->locked = LOCK_NUMALOCK;
+	node->cpu = numacurr;
+
+	prev = atomic_xchg(&numa_lock->tail, numacurr);
+	if (prev == 0) {
+		node->locked = UNLOCK_NUMALOCK;
+		return 0;
+	}
+
+	prevnode = &(_numa_lock + prev - 1)->osq_node;
+	node->prev = prevnode;
+	smp_mb(); /*node->prev should be set before next*/
+	WRITE_ONCE(prevnode->next, node);
+
+	while (READ_ONCE(node->locked) == LOCK_NUMALOCK) {
+		cpu_relax();
+		cpu_relax();
+		cpu_relax();
+		cpu_relax();
+	}
+	return 0;
+}
+inline bool zx_numa_osq_lock(struct optimistic_spin_queue *qslock,
+		struct _numa_lock *_numa_lock)
+{
+	struct _numa_lock *node_lock = NULL;
+	int cpu = smp_processor_id();
+	int numa = cpu >> _numa_lock->shift;
+	int status = 0;
+
+	node_lock = _numa_lock + numa;
+
+	if (node_lock->stopping) {
+		zx_numa_turn_osq_waiting(qslock, _numa_lock);
+		return osq_lock(qslock);
+	}
+
+	status = _zx_node_osq_lock(qslock, _numa_lock);
+	if (status == ACQUIRE_NUMALOCK)
+		status = _zx_numa_osq_lock(qslock, smp_processor_id(),
+				_numa_lock);
+
+	if (status == -1)
+		return false;
+	return true;
+}
+
+static int atomic64_checktail_osq(struct optimistic_spin_queue *qslock,
+	struct _numa_lock *node_lock, int ctail)
+{
+	u64 addr = ((u64)ptrmask(qslock)) << 32;
+	u64 addrtail = addr | ctail;
+	u64 ss = 0;
+	bool mark;
+
+	ss = atomic64_read((atomic64_t *) &node_lock->tail);
+	if (node_lock->stopping == 0)
+		mark = (ss == addrtail &&
+			atomic64_cmpxchg_acquire(
+				(atomic64_t *) &node_lock->tail,
+				addrtail, addr|NUMA_UNLOCKED_VAL) == addrtail);
+	else
+		mark = (ss == addrtail &&
+			atomic64_cmpxchg_acquire(
+				(atomic64_t *) &node_lock->tail,
+				addrtail, NUMA_LOCKED_VAL) == addrtail);
+	return mark;
+}
+
+static void node_lock_release(struct optimistic_spin_queue *qslock,
+		struct _numa_lock *node_lock, struct optimistic_spin_node *node,
+		int val, int cpu, int numa_end)
+{
+	struct optimistic_spin_node *next = NULL;
+	int curr = encode_cpu(cpu);
+
+	while (1) {
+		if (atomic64_checktail_osq(qslock, node_lock, curr)) {
+			if (qslock->numa_enable == NUMALOCKDYNAMIC) {
+				int index = qslock->index;
+
+				if (numa_end == OSQ_UNLOCKED_VAL &&
+					zx_numa_entry[index].idle == 0) {
+					cmpxchg(&zx_numa_entry[index].idle,
+							0, 1);
+				}
+			}
+			return;
+		}
+		if (node->next) {
+			next = xchg(&node->next, NULL);
+			if (next) {
+				WRITE_ONCE(next->locked, val);
+				return;
+			}
+		}
+		cpu_relax();
+	}
+}
+
+static int numa_lock_release(struct optimistic_spin_queue *qslock,
+		struct _numa_lock *numa_lock,
+		struct optimistic_spin_node *node, int cpu)
+{
+	struct optimistic_spin_node *next = NULL;
+	int curr = cpu >> numa_lock->shift;
+	int numacurr = encode_cpu(curr);
+
+	while (1) {
+		if (atomic_read(&numa_lock->tail) == numacurr &&
+		    atomic_cmpxchg_acquire(&numa_lock->tail, numacurr,
+					   OSQ_UNLOCKED_VAL) == numacurr) {
+			return OSQ_UNLOCKED_VAL;
+		}
+
+		if (node->next) {
+			next = xchg(&node->next, NULL);
+			if (next) {
+				WRITE_ONCE(next->locked, UNLOCK_NUMALOCK);
+				return 1;
+			}
+		}
+		cpu_relax();
+	}
+}
+
+inline void zx_numa_osq_unlock(struct optimistic_spin_queue *qslock,
+		 struct _numa_lock *_numa_lock)
+{
+	u32 cpu =  smp_processor_id();
+	struct optimistic_spin_node *node = this_cpu_ptr(&osq_cpu_node);
+	int numa = cpu >> _numa_lock->shift;
+	struct _numa_lock *numa_lock = _numa_lock + _numa_lock->numa_nodes;
+	struct _numa_lock *node_lock = _numa_lock + numa;
+	struct optimistic_spin_node *numa_node =
+						&(_numa_lock + numa)->osq_node;
+	struct optimistic_spin_node *next = NULL;
+	int cur_count = 0;
+	int numa_end = 0;
+
+	cur_count = READ_ONCE(node->locked);
+
+	if (cur_count >= osq_node_max - 1) {
+		numa_end = numa_lock_release(qslock,
+				numa_lock, numa_node, cpu);
+		node_lock_release(qslock, node_lock, node,
+				ACQUIRE_NUMALOCK, cpu, numa_end);
+		return;
+	}
+
+	next = xchg(&node->next, NULL);
+	if (next) {
+		WRITE_ONCE(next->locked, cur_count + 1);
+		return;
+	}
+
+	numa_end = numa_lock_release(qslock, numa_lock, numa_node, cpu);
+	node_lock_release(qslock, node_lock, node, ACQUIRE_NUMALOCK,
+			cpu, numa_end);
+}
-- 
2.34.1


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ