[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <56A926FB.80609@hpe.com>
Date: Wed, 27 Jan 2016 15:22:19 -0500
From: Waiman Long <waiman.long@....com>
To: Peter Zijlstra <peterz@...radead.org>
CC: Thomas Gleixner <tglx@...utronix.de>,
Ingo Molnar <mingo@...hat.com>,
"H. Peter Anvin" <hpa@...or.com>,
Alexander Viro <viro@...iv.linux.org.uk>,
linux-fsdevel@...r.kernel.org, x86@...nel.org,
linux-kernel@...r.kernel.org, Scott J Norton <scott.norton@...com>,
Douglas Hatch <doug.hatch@...com>
Subject: Re: [RFC PATCH 1/3] lib/list_batch: A simple list insertion/deletion
batching facility
On 01/27/2016 11:34 AM, Peter Zijlstra wrote:
> On Tue, Jan 26, 2016 at 11:03:37AM -0500, Waiman Long wrote:
>> +static __always_inline void _list_batch_cmd(enum list_batch_cmd cmd,
>> + struct list_head *head,
>> + struct list_head *entry)
>> +{
>> + if (cmd == lb_cmd_add)
>> + list_add(entry, head);
>> + else if (cmd == lb_cmd_del)
>> + list_del(entry);
>> + else /* cmd == lb_cmd_del_init */
>> + list_del_init(entry);
> Maybe use switch(), GCC has fancy warns with enums and switch().
OK, I will look at the generated code to see if there is any difference.
>
>> +}
>> +static inline void do_list_batch(enum list_batch_cmd cmd, spinlock_t *lock,
>> + struct list_batch *batch,
>> + struct list_head *entry)
>> +{
>> + /*
>> + * Fast path
>> + */
>> + if (spin_trylock(lock)) {
>> + _list_batch_cmd(cmd, batch->list, entry);
>> + spin_unlock(lock);
>> _list_batch_cmd
> This is still quite a lot of code for an inline function
I expect the callers will call it with a constant cmd, thus optimizing
out all the if conditional checks in _list_batch_cmd(). Taking the
inline out will probably stop that optimization.
>> + return;
>> + }
>> + do_list_batch_slowpath(cmd, lock, batch, entry);
>> +}
>
>
>> +void do_list_batch_slowpath(enum list_batch_cmd cmd, spinlock_t *lock,
>> + struct list_batch *batch, struct list_head *entry)
>> +{
>> + struct list_batch_qnode node, *prev, *next, *nptr;
>> + int loop;
>> +
>> + /*
>> + * Put itself into the list_batch queue
>> + */
>> + node.next = NULL;
>> + node.entry = entry;
>> + node.cmd = cmd;
>> + node.state = lb_state_waiting;
>> +
> Here we rely on the release barrier implied by xchg() to ensure the node
> initialization is complete before the xchg() publishes the thing.
>
> But do we also need the acquire part of this barrier? From what I could
> tell, the primitive as a whole does not imply any ordering.
I think we probably won't need the acquire part, but I don't have a
non-x86 machine that can really test out the more relaxed versions of
the atomic ops. That is why I use the strict versions. We can always
relax it later on with additional patches.
>
>> + prev = xchg(&batch->tail,&node);
>> +
>> + if (prev) {
>> + WRITE_ONCE(prev->next,&node);
>> + while (READ_ONCE(node.state) == lb_state_waiting)
>> + cpu_relax();
>> + if (node.state == lb_state_done)
>> + return;
>> + WARN_ON(node.state != lb_state_batch);
>> + }
>> +
>> + /*
>> + * We are now the queue head, we shold now acquire the lock and
>> + * process a batch of qnodes.
>> + */
>> + loop = LB_BATCH_SIZE;
> Have you tried different sizes?
I have tried 64 and 128. Using 128 seems to give a bit better
performance number.
>> + next =&node;
>> + spin_lock(lock);
>> +
>> +do_list_again:
>> + do {
>> + nptr = next;
>> + _list_batch_cmd(nptr->cmd, batch->list, nptr->entry);
>> + next = READ_ONCE(nptr->next);
>> + /*
>> + * As soon as the state is marked lb_state_done, we
>> + * can no longer assume the content of *nptr as valid.
>> + * So we have to hold off marking it done until we no
>> + * longer need its content.
>> + *
>> + * The release barrier here is to make sure that we
>> + * won't access its content after marking it done.
>> + */
>> + if (next)
>> + smp_store_release(&nptr->state, lb_state_done);
>> + } while (--loop&& next);
>> + if (!next) {
>> + /*
>> + * The queue tail should equal to nptr, so clear it to
>> + * mark the queue as empty.
>> + */
>> + if (cmpxchg(&batch->tail, nptr, NULL) != nptr) {
>> + /*
>> + * Queue not empty, wait until the next pointer is
>> + * initialized.
>> + */
>> + while (!(next = READ_ONCE(nptr->next)))
>> + cpu_relax();
>> + }
>> + /* The above cmpxchg acts as a memory barrier */
> for what? :-)
>
> Also, if that cmpxchg() fails, it very much does _not_ act as one.
>
> I suspect you want smp_store_release() setting the state_done, just as
> above, and then use cmpxchg_relaxed().
You are right. I did forgot about there was no memory barrier guarantee
when cmpxchg() fails. However, in that case, the READ_ONCE() and
WRITE_ONCE() macros should still provide the necessary ordering, IMO. I
can certainly change it to use cmpxchg_relaxed() and smp_store_release()
instead.
>
>> + WRITE_ONCE(nptr->state, lb_state_done);
>> + }
>> + if (next) {
>> + if (loop)
>> + goto do_list_again; /* More qnodes to process */
>> + /*
>> + * Mark the next qnode as head to process the next batch
>> + * of qnodes. The new queue head cannot proceed until we
>> + * release the lock.
>> + */
>> + WRITE_ONCE(next->state, lb_state_batch);
>> + }
>> + spin_unlock(lock);
>> +}
>> +EXPORT_SYMBOL_GPL(do_list_batch_slowpath);
Cheers,
Longman
Powered by blists - more mailing lists