[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <41806261-1633-4ec6-8cca-fc3d61d5d4bc@zhaoxin.com>
Date: Thu, 19 Sep 2024 17:35:30 +0800
From: yongli-os <yongli-oc@...oxin.com>
To: Waiman Long <longman@...hat.com>, <peterz@...radead.org>,
<mingo@...hat.com>, <will@...nel.org>, <boqun.feng@...il.com>
CC: <linux-kernel@...r.kernel.org>, <yongli@...oxin.com>,
<louisqi@...oxin.com>, <cobechen@...oxin.com>, <jiangbowang@...oxin.com>
Subject: Re: [PATCH 3/4] locking/osq_lock: From x_osq_lock/unlock to
numa-aware lock/unlock.
On 2024/9/15 01:11, Waiman Long wrote:
>
>
> [这封邮件来自外部发件人 谨防风险]
>
> On 9/14/24 04:53, yongli-oc wrote:
>> According to the contention level, switches from x_osq_lock to
>> numa-aware osq_lock.
>> The numa-aware lock is a two level osq_lock.
>> The Makefile for dynamic numa-aware osq lock.
>>
>> Signed-off-by: yongli-oc <yongli-oc@...oxin.com>
>> ---
>> kernel/locking/Makefile | 1 +
>> kernel/locking/numa.h | 98 ++++++++
>> kernel/locking/numa_osq.h | 32 +++
>> kernel/locking/x_osq_lock.c | 332 +++++++++++++++++++++++++++
>> kernel/locking/zx_numa_osq.c | 433 +++++++++++++++++++++++++++++++++++
>> 5 files changed, 896 insertions(+)
>> create mode 100644 kernel/locking/numa.h
>> create mode 100644 kernel/locking/numa_osq.h
>> create mode 100644 kernel/locking/x_osq_lock.c
>> create mode 100644 kernel/locking/zx_numa_osq.c
>>
>> diff --git a/kernel/locking/Makefile b/kernel/locking/Makefile
>> index 0db4093d17b8..00c1d5bb6eb9 100644
>> --- a/kernel/locking/Makefile
>> +++ b/kernel/locking/Makefile
>> @@ -22,6 +22,7 @@ obj-$(CONFIG_LOCKDEP) += lockdep_proc.o
>> endif
>> obj-$(CONFIG_SMP) += spinlock.o
>> obj-$(CONFIG_LOCK_SPIN_ON_OWNER) += osq_lock.o
>> +obj-$(CONFIG_LOCK_SPIN_ON_OWNER_NUMA) += x_osq_lock.o zx_numa_osq.o
>> zx_numa.o
>> obj-$(CONFIG_PROVE_LOCKING) += spinlock.o
>> obj-$(CONFIG_QUEUED_SPINLOCKS) += qspinlock.o
>> obj-$(CONFIG_RT_MUTEXES) += rtmutex_api.o
>> diff --git a/kernel/locking/numa.h b/kernel/locking/numa.h
>> new file mode 100644
>> index 000000000000..01e5aef3257a
>> --- /dev/null
>> +++ b/kernel/locking/numa.h
>> @@ -0,0 +1,98 @@
>> +/* SPDX-License-Identifier: GPL-2.0 */
>> +#ifndef __LINUX_NUMA_LOCK_H
>> +#define __LINUX_NUMA_LOCK_H
>> +#include "mcs_spinlock.h"
>> +
>> +struct optimistic_spin_node {
>> + struct optimistic_spin_node *next, *prev;
>> + int numa;
>> + int locked; /* 1 if lock acquired */
>> + int cpu; /* encoded CPU # + 1 value */
>> + u32 serial;
>> +};
>> +
>> +
>> +struct _numa_buf {
>> + void *numa_ptr;
>> + struct list_head list;
>> + u32 lockaddr;
>> + u32 highaddr;
>> + u8 idle;
>> + u8 type;
>> + u16 index;
>> +};
>> +
>> +struct cache_padding {
>> + char x[0];
>> +} ____cacheline_internodealigned_in_smp;
>> +#define CACHE_PADDING(name) struct cache_padding name
> We have struct cacheline_padding and CACHELINE_PADDING() in
> include/linux/cache. What is the point of reinventing the same thing
> here?
I will include the linux/cache.h header, and delete the definition above.
>> +
>> +struct _numa_lock {
>> + atomic_t tail ____cacheline_aligned_in_smp;
>> + atomic_t addr;
>> + u8 shift;
>> + u8 stopping;
>> + u16 numa_nodes;
>> + u32 accessed;
>> + uint64_t totalaccessed;
>> + u32 nodeswitched;
>> + atomic_t initlock;
>> + atomic_t pending;
>> + union {
>> + struct mcs_spinlock mcs_node;
>> + struct optimistic_spin_node osq_node;
>> + };
>> + CACHE_PADDING(pad);
>> +};
>> +
>> +struct numa_cpu_info {
>> + __u8 x86_model;
>> + /* CPU family */
>> + __u8 x86;
>> + /* CPU vendor */
>> + __u8 x86_vendor;
>> + __u8 x86_reserved;
>> + u32 feature1;
>> +};
>> +
>> +#define NUMAEXPAND 1
>> +
>> +#define COHORT_START 1
>> +#define ACQUIRE_NUMALOCK (UINT_MAX-1)
>> +#define NODE_WAIT UINT_MAX
>> +#define LOCK_NUMALOCK 1
>> +#define UNLOCK_NUMALOCK 0
>> +
>> +#define NUMALOCKDYNAMIC 0xff
>> +#define TURNTONUMAREADY 0xa5a5
>> +#define NUMATURNBACKREADY 0x5a5a
>> +
>> +#define NUMA_LOCKED_VAL 0xf5efef
> What are these special bit values for?
It is value for NUMA lock state, the value has no special mean,
any value is OK. I will change it to some ordinary value.
>> +#define NUMA_UNLOCKED_VAL 0
>> +
>> +#define NUMASTEERMASK 0xf0000000
>> +#define HIGH32BITMASK 0xffffffff00000000
>> +#define LOW32MASK 0xffffffff
>> +
>> +extern int NUMASHIFT;
>> +extern int NUMACLUSTERS;
> In the kernel, uppercase names are used for macros. Variables should use
> all lowercase names to avoid confusion.
Yes, I will change to lowercase.
>> +extern int zx_numa_lock_total;
>> +extern struct _numa_buf *zx_numa_entry;
>> +extern atomic_t numa_count;
>> +extern int enable_zx_numa_osq_lock;
>> +extern u32 zx_numa_lock;
>> +extern int dynamic_enable;
>> +extern struct kmem_cache *zx_numa_lock_cachep;
>> +
>> +static inline u32 ptrmask(void *s)
>> +{
>> + return (uint64_t)s & LOW32MASK;
>> +}
>> +inline void *get_numa_lock(int index);
>> +
>> +int zx_check_numa_dynamic_locked(u32 lockaddr, struct _numa_lock
>> *_numa_lock,
>> + int t);
>> +int zx_numa_lock_ptr_get(void *p);
>> +void numa_lock_init_data(struct _numa_lock *s, int clusters, u32
>> lockval,
>> + u32 lockaddr);
>> +#endif
>> diff --git a/kernel/locking/numa_osq.h b/kernel/locking/numa_osq.h
>> new file mode 100644
>> index 000000000000..4c99ad60c4e0
>> --- /dev/null
>> +++ b/kernel/locking/numa_osq.h
>> @@ -0,0 +1,32 @@
>> +/* SPDX-License-Identifier: GPL-2.0 */
>> +#ifndef __LINUX_NUMA_OSQ_H
>> +#define __LINUX_NUMA_OSQ_H
>> +
>> +#include <linux/osq_lock.h>
>> +#include "mcs_spinlock.h"
>> +
>> +#define OSQLOCKINITED 0
>> +#define OSQTONUMADETECT 0x10
>> +#define OSQLOCKSTOPPING 0xfc
>> +#define OSQ_LOCKED_VAL 0xffff
>> +
>> +extern u16 osq_keep_times;
>> +extern u16 osq_lock_depth;
>> +extern int osq_node_max;
>> +
>> +inline int encode_cpu(int cpu_nr);
>> +inline int node_cpu(struct optimistic_spin_node *node);
>> +inline struct optimistic_spin_node *decode_cpu(int encoded_cpu_val);
>> +
>> +void zx_osq_lock_stopping(struct optimistic_spin_queue *lock);
>> +void zx_osq_numa_start(struct optimistic_spin_queue *lock);
>> +void zx_osq_turn_numa_waiting(struct optimistic_spin_queue *lock);
>> +
>> +bool x_osq_lock(struct optimistic_spin_queue *lock);
>> +void x_osq_unlock(struct optimistic_spin_queue *lock);
>> +bool x_osq_is_locked(struct optimistic_spin_queue *lock);
>> +inline void zx_numa_osq_unlock(struct optimistic_spin_queue *qslock,
>> + struct _numa_lock *n);
>> +inline bool zx_numa_osq_lock(struct optimistic_spin_queue *qslock,
>> + struct _numa_lock *n);
>> +#endif
>> diff --git a/kernel/locking/x_osq_lock.c b/kernel/locking/x_osq_lock.c
>> new file mode 100644
>> index 000000000000..6da8905ae7d6
>> --- /dev/null
>> +++ b/kernel/locking/x_osq_lock.c
>> @@ -0,0 +1,332 @@
>> +// SPDX-License-Identifier: GPL-2.0
>> +/*
>> + * crossing from osq_lock to numa-aware lock
>> + */
>> +#include <linux/percpu.h>
>> +#include <linux/sched.h>
>> +#include <linux/osq_lock.h>
>> +#include "numa.h"
>> +#include "numa_osq.h"
>> +
>> +u16 osq_lock_depth = 8;
>> +u16 osq_keep_times = 32;
>> +
>> +/*
>> + * An MCS like lock especially tailored for optimistic spinning for
>> sleeping
>> + * lock implementations (mutex, rwsem, etc).
>> + *
>> + * Using a single mcs node per CPU is safe because sleeping locks
>> should not be
>> + * called from interrupt context and we have preemption disabled while
>> + * spinning.
>> + */
>> +DECLARE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node, osq_node);
>> +
>> +/*
>> + * Get a stable @node->next pointer, either for unlock() or
>> unqueue() purposes.
>> + * Can return NULL in case we were the last queued and we updated
>> @lock instead.
>> + *
>> + * If osq_lock() is being cancelled there must be a previous node
>> + * and 'old_cpu' is its CPU #.
>> + * For osq_unlock() there is never a previous node and old_cpu is
>> + * set to OSQ_UNLOCKED_VAL.
>> + */
>> +static inline struct optimistic_spin_node *
>> +osq_wait_next_stop(struct optimistic_spin_queue *lock,
>> + struct optimistic_spin_node *node,
>> + int old_cpu)
>> +{
>> + u16 curr = encode_cpu(smp_processor_id());
>> + u16 old = old_cpu;
>> +
>> + if (lock->numa_enable == OSQLOCKSTOPPING && old ==
>> OSQ_UNLOCKED_VAL)
>> + old = OSQ_LOCKED_VAL;
>> +
>> + for (;;) {
>> + if (READ_ONCE(lock->tail16) == curr &&
>> + cmpxchg(&lock->tail16, curr, old) == curr) {
>> +
>> + /*
>> + * We were the last queued, we moved @lock
>> back. @prev
>> + * will now observe @lock and will complete its
>> + * unlock()/unqueue().
>> + */
>> + return NULL;
>> + }
>> +
>> + /*
>> + * We must xchg() the @node->next value, because if we
>> were to
>> + * leave it in, a concurrent unlock()/unqueue() from
>> + * @node->next might complete Step-A and think its
>> @prev is
>> + * still valid.
>> + *
>> + * If the concurrent unlock()/unqueue() wins the race,
>> we'll
>> + * wait for either @lock to point to us, through its
>> Step-B, or
>> + * wait for a new @node->next from its Step-C.
>> + */
>> + if (node->next) {
>> + struct optimistic_spin_node *next;
>> +
>> + next = xchg(&node->next, NULL);
>> + if (next)
>> + return next;
>> + }
>> +
>> + cpu_relax();
>> + }
>> +}
>> +
>> +bool x_osq_lock(struct optimistic_spin_queue *lock)
>> +{
>> + struct optimistic_spin_node *node = this_cpu_ptr(&osq_node);
>> + struct optimistic_spin_node *prev, *next;
>> + int cpu = smp_processor_id();
>> + u16 curr = encode_cpu(cpu);
>> + struct optimistic_spin_queue tail;
>> + u16 old;
>> +
>> + tail.val = READ_ONCE(lock->val);
>> + if (unlikely(tail.numa_enable == OSQLOCKSTOPPING)) {
>> + zx_osq_turn_numa_waiting(lock);
>> + return x_osq_lock(lock);
>> + }
>> +
>> + if (unlikely(tail.numa_enable == NUMALOCKDYNAMIC)) {
>> + struct _numa_lock *_numa_lock = NULL;
>> + struct _numa_lock *node_lock = NULL;
>> +
>> + _numa_lock = get_numa_lock(tail.index);
>> + node_lock = (struct _numa_lock *) _numa_lock +
>> + (cpu >> NUMASHIFT);
>> +
>> + prefetch(node_lock);
>> + return zx_numa_osq_lock(lock, _numa_lock);
>> + }
>> +
>> + node->locked = 0;
>> + node->next = NULL;
>> + node->cpu = curr;
>> +
>> + /*
>> + * We need both ACQUIRE (pairs with corresponding RELEASE in
>> + * unlock() uncontended, or fastpath) and RELEASE (to publish
>> + * the node fields we just initialised) semantics when updating
>> + * the lock tail.
>> + */
>> +
>> + if (likely(tail.numa_enable >= OSQTONUMADETECT)) {
>> + struct optimistic_spin_queue ss;
>> +
>> + while (1) {
>> + ss.val = atomic_read(&lock->tail);
>> + if (ss.tail16 == OSQ_LOCKED_VAL) {
>> + zx_osq_turn_numa_waiting(lock);
>> + return x_osq_lock(lock);
>> + }
>> + if (cmpxchg(&lock->tail16, ss.tail16, curr)
>> + == ss.tail16) {
>> + old = ss.tail16;
>> + break;
>> + }
>> + cpu_relax();
>> + }
>> + } else
>> + old = xchg(&lock->tail16, curr);
>> +
>> + if (old == OSQ_UNLOCKED_VAL) {
>> + node->serial = 1;
>> + return true;
>> + }
>> +
>> + prev = decode_cpu(old);
>> + node->prev = prev;
>> +
>> + node->serial = prev->serial + 1;
>> + /*
>> + * osq_lock() unqueue
>> + *
>> + * node->prev = prev osq_wait_next()
>> + * WMB MB
>> + * prev->next = node next->prev = prev // unqueue-C
>> + *
>> + * Here 'node->prev' and 'next->prev' are the same variable and
>> we need
>> + * to ensure these stores happen in-order to avoid corrupting
>> the list.
>> + */
>> + smp_wmb();
>> +
>> + WRITE_ONCE(prev->next, node);
>> +
>> + /*
>> + * Normally @prev is untouchable after the above store; because
>> at that
>> + * moment unlock can proceed and wipe the node element from stack.
>> + *
>> + * However, since our nodes are static per-cpu storage, we're
>> + * guaranteed their existence -- this allows us to apply
>> + * cmpxchg in an attempt to undo our queueing.
>> + */
>> +
>> + /*
>> + * Wait to acquire the lock or cancellation. Note that
>> need_resched()
>> + * will come with an IPI, which will wake
>> smp_cond_load_relaxed() if it
>> + * is implemented with a monitor-wait. vcpu_is_preempted()
>> relies on
>> + * polling, be careful.
>> + */
>> + if (smp_cond_load_relaxed(&node->locked, VAL || need_resched() ||
>> + vcpu_is_preempted(node_cpu(node->prev))))
>> + return true;
>> +
>> + /* unqueue */
>> + /*
>> + * Step - A -- stabilize @prev
>> + *
>> + * Undo our @prev->next assignment; this will make @prev's
>> + * unlock()/unqueue() wait for a next pointer since @lock
>> points to us
>> + * (or later).
>> + */
>> +
>> + for (;;) {
>> + /*
>> + * cpu_relax() below implies a compiler barrier which
>> would
>> + * prevent this comparison being optimized away.
>> + */
>> + if (data_race(prev->next) == node &&
>> + cmpxchg(&prev->next, node, NULL) == node)
>> + break;
>> +
>> + /*
>> + * We can only fail the cmpxchg() racing against an
>> unlock(),
>> + * in which case we should observe @node->locked becoming
>> + * true.
>> + */
>> + if (smp_load_acquire(&node->locked))
>> + return true;
>> +
>> + cpu_relax();
>> +
>> + /*
>> + * Or we race against a concurrent unqueue()'s step-B,
>> in which
>> + * case its step-C will write us a new @node->prev
>> pointer.
>> + */
>> + prev = READ_ONCE(node->prev);
>> + }
>> +
>> + /*
>> + * Step - B -- stabilize @next
>> + *
>> + * Similar to unlock(), wait for @node->next or move @lock from
>> @node
>> + * back to @prev.
>> + */
>> +
>> + next = osq_wait_next_stop(lock, node, prev->cpu);
>> + if (!next)
>> + return false;
>> +
>> + /*
>> + * Step - C -- unlink
>> + *
>> + * @prev is stable because its still waiting for a new @prev->next
>> + * pointer, @next is stable because our @node->next pointer is
>> NULL and
>> + * it will wait in Step-A.
>> + */
>> +
>> + WRITE_ONCE(next->prev, prev);
>> + WRITE_ONCE(prev->next, next);
>> +
>> + return false;
>> +}
>> +
>> +
>> +
>> +void x_osq_unlock(struct optimistic_spin_queue *lock)
>> +{
>> + struct optimistic_spin_node *node, *next;
>> + int threadshold = osq_lock_depth;
>> + int cpu = smp_processor_id();
>> + u16 curr = encode_cpu(cpu);
>> + int depth = 0;
>> + u32 count = 0;
>> +
>> + if (unlikely(lock->numa_enable == NUMALOCKDYNAMIC)) {
>> + struct _numa_lock *_numa_lock =
>> get_numa_lock(lock->index);
>> +
>> + prefetch((struct _numa_lock *) _numa_lock + (cpu >>
>> NUMASHIFT));
>> + return zx_numa_osq_unlock(lock, _numa_lock);
>> + }
>> + /*
>> + * Fast path for the uncontended case.
>> + */
>> + if (unlikely(lock->numa_enable == OSQTONUMADETECT)) {
>> + struct optimistic_spin_node *node_last = NULL;
>> + u16 tail = 0;
>> +
>> + tail = cmpxchg(&lock->tail16, curr, OSQ_UNLOCKED_VAL);
>> + if (tail == curr)
>> + return;
>> +
>> + node = this_cpu_ptr(&osq_node);
>> + node_last = decode_cpu(tail);
>> + depth = node_last->serial - node->serial;
>> + count = READ_ONCE(node->locked);
>> + if (count > osq_keep_times && (dynamic_enable & 0x1))
>> + zx_osq_lock_stopping(lock);
>> + } else if (unlikely(lock->numa_enable == OSQLOCKSTOPPING)) {
>> + if (cmpxchg(&lock->tail16, curr, OSQ_LOCKED_VAL)
>> + == curr) {
>> + zx_osq_numa_start(lock);
>> + return;
>> + }
>> + } else {
>> + struct optimistic_spin_queue t;
>> +
>> + t.val = 0;
>> + if (dynamic_enable & 0x1) {
>> + if (atomic_read(&numa_count) < zx_numa_lock_total)
>> + t.numa_enable = OSQTONUMADETECT;
>> + }
>> + if (t.numa_enable == OSQTONUMADETECT) {
>> + if (atomic_cmpxchg_release(&lock->tail, curr,
>> + (t.val | OSQ_UNLOCKED_VAL)) == curr)
>> + return;
>> + } else if (cmpxchg(&lock->tail16, curr,
>> + OSQ_UNLOCKED_VAL) == curr)
>> + return;
>> + }
>> +
>> + /*
>> + * Second most likely case.
>> + */
>> + node = this_cpu_ptr(&osq_node);
>> + next = xchg(&node->next, NULL);
>> + if (next) {
>> + if (depth > threadshold)
>> + WRITE_ONCE(next->locked, count + 1);
>> + else
>> + WRITE_ONCE(next->locked, 1);
>> + return;
>> + }
>> +
>> + next = osq_wait_next_stop(lock, node, OSQ_UNLOCKED_VAL);
>> + if (next) {
>> + if (depth > threadshold)
>> + WRITE_ONCE(next->locked, count + 1);
>> + else
>> + WRITE_ONCE(next->locked, 1);
>> + }
>> +}
>> +
>> +bool x_osq_is_locked(struct optimistic_spin_queue *lock)
>> +{
>> + struct optimistic_spin_queue val;
>> +
>> + val.val = atomic_read(&lock->tail);
>> + if (val.tail16 == OSQ_UNLOCKED_VAL)
>> + return false;
>> +
>> + if (val.tail16 == OSQ_LOCKED_VAL) {
>> + if (val.numa_enable != NUMALOCKDYNAMIC)
>> + return true;
>> + return zx_check_numa_dynamic_locked(ptrmask(lock),
>> + get_numa_lock(val.index), 0);
>> + }
>> +
>> + return true;
>> +}
> These functions are mostly based on the osq_* functions with some extra
> code stuffed in. Common code should be shared instead of duplicated.
It uses 16 bit tail, if merge to the osq_lock, it will make osq_lock
changes too much.
>> diff --git a/kernel/locking/zx_numa_osq.c b/kernel/locking/zx_numa_osq.c
>> new file mode 100644
>> index 000000000000..f4c3dba208d3
>> --- /dev/null
>> +++ b/kernel/locking/zx_numa_osq.c
>> @@ -0,0 +1,433 @@
>> +// SPDX-License-Identifier: GPL-2.0
>> +/*
>> + * Dynamic numa-aware osq lock
>> + * Author: LiYong <yongli-oc@...oxin.com>
>> + *
>> + */
>> +#include <linux/cpumask.h>
>> +#include <asm/byteorder.h>
>> +#include <linux/percpu.h>
>> +#include <linux/sched.h>
>> +#include <linux/slab.h>
>> +#include <linux/osq_lock.h>
>> +#include "numa.h"
>> +#include "numa_osq.h"
>> +
>> +int osq_node_max = 256;
>> +
>> +/*
>> + * The pending bit spinning loop count.
>> + * This heuristic is used to limit the number of lockword accesses
>> + * made by atomic_cond_read_relaxed when waiting for the lock to
>> + * transition out of the "== _Q_PENDING_VAL" state. We don't spin
>> + * indefinitely because there's no guarantee that we'll make forward
>> + * progress.
>> + */
>> +
>> +static DEFINE_PER_CPU_SHARED_ALIGNED(struct optimistic_spin_node,
>> osq_cpu_node);
>> +
>> +/*
>> + * We use the value 0 to represent "no CPU", thus the encoded value
>> + * will be the CPU number incremented by 1.
>> + */
>> +static inline int decode(int cpu_nr)
>> +{
>> + return cpu_nr - 1;
>> +}
>> +
>> +static inline struct optimistic_spin_node *decode_curr(int
>> encoded_cpu_val)
>> +{
>> + int cpu_nr = decode(encoded_cpu_val);
>> +
>> + return per_cpu_ptr(&osq_cpu_node, cpu_nr);
>> +}
>> +
>> +static int atomic64_cmpxchg_notequal(void *qslock, atomic_t *tail,
>> int curr)
>> +{
>> + u64 ss = 0;
>> + u32 addr = ptrmask(qslock);
>> + u64 addrcurr = (((u64)addr) << 32) | curr;
>> +
>> + while (1) {
>> + ss = atomic64_read((atomic64_t *) tail);
>> + if ((ss >> 32) != addr)
>> + return NUMA_LOCKED_VAL;
>> + if ((ss & LOW32MASK) == NUMA_LOCKED_VAL)
>> + return NUMA_LOCKED_VAL;
>> + if (atomic64_cmpxchg((atomic64_t *) tail, ss, addrcurr)
>> == ss)
>> + return ss & LOW32MASK;
>> + cpu_relax();
>> + }
>> +}
>> +
>> +void zx_osq_lock_stopping(struct optimistic_spin_queue *lock)
>> +{
>> + int s = 0;
>> +
>> + s = zx_numa_lock_ptr_get(lock);
>> + if (s < zx_numa_lock_total) {
>> + numa_lock_init_data(zx_numa_entry[s].numa_ptr,
>> + NUMACLUSTERS, NUMA_UNLOCKED_VAL,
>> + ptrmask(lock));
>> +
>> + WRITE_ONCE(lock->index, s);
>> + zx_numa_entry[s].type = 1;
>> + smp_mb();/*should set these before enable*/
>> + prefetchw(&lock->numa_enable);
>> + WRITE_ONCE(lock->numa_enable, OSQLOCKSTOPPING);
>> + } else {
>> + prefetchw(&lock->numa_enable);
>> + WRITE_ONCE(lock->numa_enable, OSQLOCKINITED);
>> + }
>> +}
>> +
>> +void zx_osq_numa_start(struct optimistic_spin_queue *lock)
>> +{
>> + struct _numa_lock *_numa_lock = get_numa_lock(lock->index);
>> +
>> + prefetchw(&lock->numa_enable);
>> + WRITE_ONCE(lock->numa_enable, NUMALOCKDYNAMIC);
>> + smp_mb(); /*should keep lock->numa_enable modified first*/
>> + atomic_set(&_numa_lock->initlock, TURNTONUMAREADY);
>> +}
>> +
>> +
>> +void zx_osq_turn_numa_waiting(struct optimistic_spin_queue *lock)
>> +{
>> + struct _numa_lock *_numa_lock = get_numa_lock(lock->index);
>> +
>> + atomic_inc(&_numa_lock->pending);
>> + while (1) {
>> + int s = atomic_read(&_numa_lock->initlock);
>> +
>> + if (s == TURNTONUMAREADY)
>> + break;
>> + cpu_relax();
>> + cpu_relax();
>> + cpu_relax();
>> + cpu_relax();
> We don't usually call cpu_relax() multiple times like that as the actual
> delay is CPU dependent and it is hard to get it right for all. Can you
> explain why it is called exactly 4 times here?
The cpu_relax() is REP NOP. Form the intel instruction set, the REP NOP
is PAUSE.
The PAUSE instruction can save some power for the processor. So I often
write more
cpu_relax() when busy waiting.
I will change to one cpu_relax(), according to Linux kernel habits.
>> +
>> + }
>> + atomic_dec(&_numa_lock->pending);
>> +}
>> +
>> +
>> +
>> +
>> +static struct optimistic_spin_node *
>> +zx_numa_osq_wait_next(struct _numa_lock *lock,
>> + struct optimistic_spin_node *node,
>> + struct optimistic_spin_node *prev, int cpu)
>> +{
>> + struct optimistic_spin_node *next = NULL;
>> + int curr = encode_cpu(cpu);
>> + int old;
>> +
>> + old = prev ? prev->cpu : OSQ_UNLOCKED_VAL;
>> + for (;;) {
>> + if (atomic_read(&lock->tail) == curr &&
>> + atomic_cmpxchg_acquire(&lock->tail, curr, old) ==
>> curr) {
>> +
>> + break;
>> + }
>> + if (node->next) {
>> + next = xchg(&node->next, NULL);
>> + if (next)
>> + break;
>> + }
>> + cpu_relax();
>> + }
>> + return next;
>> +}
>> +static void zx_numa_turn_osq_waiting(struct optimistic_spin_queue
>> *lock,
>> + struct _numa_lock *_numa_lock)
>> +{
>> + struct _numa_lock *numa_lock = _numa_lock +
>> _numa_lock->numa_nodes;
>> + int lockaddr = ptrmask(lock);
>> + u64 s = 0;
>> + struct optimistic_spin_queue tail;
>> +
>> + tail.numa_enable = NUMALOCKDYNAMIC;
>> + tail.index = lock->index;
>> + tail.tail16 = OSQ_LOCKED_VAL;
>> + while (1) {
>> + cpu_relax(); cpu_relax(); cpu_relax(); cpu_relax();
>> + s = atomic64_read((atomic64_t *) &numa_lock->tail);
>> + if ((s >> 32) != lockaddr)
>> + break;
>> + if ((s & LOW32MASK) == NUMA_LOCKED_VAL)
>> + break;
>> + }
>> + prefetchw(&lock->tail);
>> + if (atomic_cmpxchg(&lock->tail, tail.val, OSQ_UNLOCKED_VAL)
>> + == tail.val) {
>> + ;
>> + }
>> +
>> +}
>> +
>> +static int _zx_node_osq_lock_internal(struct optimistic_spin_queue
>> *qslock,
>> + struct optimistic_spin_node *node, struct optimistic_spin_node
>> *prev,
>> + struct _numa_lock *node_lock, int cpu, int *cur_status)
>> +{
>> + struct optimistic_spin_node *next = NULL;
>> +
>> + for (;;) {
>> + struct optimistic_spin_node *node_prev = NULL;
>> +
>> + if (prev->next == node &&
>> + cmpxchg(&prev->next, node, NULL) == node) {
>> + break;
>> + }
>> + /*load locked first each time*/
>> + *cur_status = smp_load_acquire(&node->locked);
>> +
>> + if (*cur_status != NODE_WAIT)
>> + return 0; //goto NODE_UNLOCK;
>> +
>> + cpu_relax();
>> + node_prev = READ_ONCE(node->prev);
>> + if (node_prev != prev)
>> + prev = node_prev;
>> + }
>> +
>> + next = zx_numa_osq_wait_next(node_lock, node, prev, cpu);
>> + if (!next)
>> + return -1;
>> +
>> + WRITE_ONCE(next->prev, prev);
>> + WRITE_ONCE(prev->next, next);
>> +
>> + return -1;
>> +}
>> +
>> +static int _zx_node_osq_lock(struct optimistic_spin_queue *qslock,
>> + struct _numa_lock *_numa_lock)
>> +{
>> + struct optimistic_spin_node *node = this_cpu_ptr(&osq_cpu_node);
>> + struct optimistic_spin_node *prev = NULL;
>> + int cpu = smp_processor_id();
>> + int curr = encode_cpu(cpu);
>> + int numa = cpu >> _numa_lock->shift;
>> + struct _numa_lock *node_lock = _numa_lock + numa;
>> + int cur_status = 0;
>> + int old = 0;
>> +
>> + node->locked = NODE_WAIT;
>> + node->next = NULL;
>> + node->cpu = curr;
>> +
>> + old = atomic64_cmpxchg_notequal(qslock, &node_lock->tail, curr);
>> +
>> + if (old == NUMA_LOCKED_VAL) {
>> + bool s = true;
>> +
>> + zx_numa_turn_osq_waiting(qslock, _numa_lock);
>> + s = osq_lock(qslock);
>> + if (s == true)
>> + return 1;
>> + else
>> + return -1;
>> + }
>> +
>> + if (old == 0) {
>> + node->locked = COHORT_START;
>> + return ACQUIRE_NUMALOCK;
>> + }
>> +
>> + prev = decode_curr(old);
>> + node->prev = prev;
>> +
>> + smp_mb(); /* make sure node set before set pre->next */
>> +
>> + WRITE_ONCE(prev->next, node);
>> +
>> + while ((cur_status = READ_ONCE(node->locked)) == NODE_WAIT) {
>> + if (need_resched() ||
>> vcpu_is_preempted(node_cpu(node->prev))) {
>> + int ddd = _zx_node_osq_lock_internal(qslock,
>> node, prev,
>> + node_lock, cpu,
>> &cur_status);
>> +
>> + if (cur_status != NODE_WAIT)
>> + goto NODE_UNLOCK;
>> + if (ddd == -1)
>> + return -1;
>> + }
>> + cpu_relax();
>> + }
>> +NODE_UNLOCK:
>> + if (cur_status == ACQUIRE_NUMALOCK)
>> + node->locked = COHORT_START;
>> + return cur_status;
>> +}
>> +static int _zx_numa_osq_lock(struct optimistic_spin_queue *qslock,
>> int cpu,
>> + struct _numa_lock *_numa_lock)
>> +{
>> + int numacpu = cpu >> _numa_lock->shift;
>> + int numacurr = encode_cpu(numacpu);
>> +
>> + struct optimistic_spin_node *node = &(_numa_lock +
>> numacpu)->osq_node;
>> + struct _numa_lock *numa_lock = _numa_lock +
>> _numa_lock->numa_nodes;
>> + struct optimistic_spin_node *prevnode = NULL;
>> + int prev = 0;
>> +
>> + node->next = NULL;
>> + node->locked = LOCK_NUMALOCK;
>> + node->cpu = numacurr;
>> +
>> + prev = atomic_xchg(&numa_lock->tail, numacurr);
>> + if (prev == 0) {
>> + node->locked = UNLOCK_NUMALOCK;
>> + return 0;
>> + }
>> +
>> + prevnode = &(_numa_lock + prev - 1)->osq_node;
>> + node->prev = prevnode;
>> + smp_mb(); /*node->prev should be set before next*/
>> + WRITE_ONCE(prevnode->next, node);
>> +
>> + while (READ_ONCE(node->locked) == LOCK_NUMALOCK) {
>> + cpu_relax();
>> + cpu_relax();
>> + cpu_relax();
>> + cpu_relax();
>> + }
>> + return 0;
>> +}
>> +inline bool zx_numa_osq_lock(struct optimistic_spin_queue *qslock,
>> + struct _numa_lock *_numa_lock)
>> +{
>> + struct _numa_lock *node_lock = NULL;
>> + int cpu = smp_processor_id();
>> + int numa = cpu >> _numa_lock->shift;
>> + int status = 0;
>> +
>> + node_lock = _numa_lock + numa;
>> +
>> + if (node_lock->stopping) {
>> + zx_numa_turn_osq_waiting(qslock, _numa_lock);
>> + return osq_lock(qslock);
>> + }
>> +
>> + status = _zx_node_osq_lock(qslock, _numa_lock);
>> + if (status == ACQUIRE_NUMALOCK)
>> + status = _zx_numa_osq_lock(qslock, smp_processor_id(),
>> + _numa_lock);
>> +
>> + if (status == -1)
>> + return false;
>> + return true;
>> +}
>> +
>> +static int atomic64_checktail_osq(struct optimistic_spin_queue *qslock,
>> + struct _numa_lock *node_lock, int ctail)
>> +{
>> + u64 addr = ((u64)ptrmask(qslock)) << 32;
>> + u64 addrtail = addr | ctail;
>> + u64 ss = 0;
>> + bool mark;
>> +
>> + ss = atomic64_read((atomic64_t *) &node_lock->tail);
>> + if (node_lock->stopping == 0)
>> + mark = (ss == addrtail &&
>> + atomic64_cmpxchg_acquire(
>> + (atomic64_t *) &node_lock->tail,
>> + addrtail, addr|NUMA_UNLOCKED_VAL) ==
>> addrtail);
>> + else
>> + mark = (ss == addrtail &&
>> + atomic64_cmpxchg_acquire(
>> + (atomic64_t *) &node_lock->tail,
>> + addrtail, NUMA_LOCKED_VAL) == addrtail);
>> + return mark;
>> +}
>> +
>> +static void node_lock_release(struct optimistic_spin_queue *qslock,
>> + struct _numa_lock *node_lock, struct
>> optimistic_spin_node *node,
>> + int val, int cpu, int numa_end)
>> +{
>> + struct optimistic_spin_node *next = NULL;
>> + int curr = encode_cpu(cpu);
>> +
>> + while (1) {
>> + if (atomic64_checktail_osq(qslock, node_lock, curr)) {
>> + if (qslock->numa_enable == NUMALOCKDYNAMIC) {
>> + int index = qslock->index;
>> +
>> + if (numa_end == OSQ_UNLOCKED_VAL &&
>> + zx_numa_entry[index].idle == 0) {
>> + cmpxchg(&zx_numa_entry[index].idle,
>> + 0, 1);
>> + }
>> + }
>> + return;
>> + }
>> + if (node->next) {
>> + next = xchg(&node->next, NULL);
>> + if (next) {
>> + WRITE_ONCE(next->locked, val);
>> + return;
>> + }
>> + }
>> + cpu_relax();
>> + }
>> +}
>> +
>> +static int numa_lock_release(struct optimistic_spin_queue *qslock,
>> + struct _numa_lock *numa_lock,
>> + struct optimistic_spin_node *node, int cpu)
>> +{
>> + struct optimistic_spin_node *next = NULL;
>> + int curr = cpu >> numa_lock->shift;
>> + int numacurr = encode_cpu(curr);
>> +
>> + while (1) {
>> + if (atomic_read(&numa_lock->tail) == numacurr &&
>> + atomic_cmpxchg_acquire(&numa_lock->tail, numacurr,
>> + OSQ_UNLOCKED_VAL) ==
>> numacurr) {
>> + return OSQ_UNLOCKED_VAL;
>> + }
>> +
>> + if (node->next) {
>> + next = xchg(&node->next, NULL);
>> + if (next) {
>> + WRITE_ONCE(next->locked, UNLOCK_NUMALOCK);
>> + return 1;
>> + }
>> + }
>> + cpu_relax();
>> + }
>> +}
>> +
>> +inline void zx_numa_osq_unlock(struct optimistic_spin_queue *qslock,
>> + struct _numa_lock *_numa_lock)
>> +{
>> + u32 cpu = smp_processor_id();
>> + struct optimistic_spin_node *node = this_cpu_ptr(&osq_cpu_node);
>> + int numa = cpu >> _numa_lock->shift;
>> + struct _numa_lock *numa_lock = _numa_lock +
>> _numa_lock->numa_nodes;
>> + struct _numa_lock *node_lock = _numa_lock + numa;
>> + struct optimistic_spin_node *numa_node =
>> + &(_numa_lock +
>> numa)->osq_node;
>> + struct optimistic_spin_node *next = NULL;
>> + int cur_count = 0;
>> + int numa_end = 0;
>> +
>> + cur_count = READ_ONCE(node->locked);
>> +
>> + if (cur_count >= osq_node_max - 1) {
>> + numa_end = numa_lock_release(qslock,
>> + numa_lock, numa_node, cpu);
>> + node_lock_release(qslock, node_lock, node,
>> + ACQUIRE_NUMALOCK, cpu, numa_end);
>> + return;
>> + }
>> +
>> + next = xchg(&node->next, NULL);
>> + if (next) {
>> + WRITE_ONCE(next->locked, cur_count + 1);
>> + return;
>> + }
>> +
>> + numa_end = numa_lock_release(qslock, numa_lock, numa_node, cpu);
>> + node_lock_release(qslock, node_lock, node, ACQUIRE_NUMALOCK,
>> + cpu, numa_end);
>> +}
>
> Overall speaking, there are not enough comments to explain exactly what
> are you trying to achieve with these new functions. It makes them hard
> to review.
>
Apologies for so less comments. I will write more in next commit.
> Cheers,
> Longman
>
Powered by blists - more mailing lists