[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20080608151844.GA18751@linux.vnet.ibm.com>
Date: Sun, 8 Jun 2008 08:18:44 -0700
From: "Paul E. McKenney" <paulmck@...ux.vnet.ibm.com>
To: Lai Jiangshan <laijs@...fujitsu.com>
Cc: Andrew Morton <akpm@...ux-foundation.org>,
torvalds@...ux-foundation.org,
Linux Kernel Mailing List <linux-kernel@...r.kernel.org>
Subject: Re: [RFC][PATCH] rcu classic: new algorithm for callbacks-processing
On Tue, Jun 03, 2008 at 11:46:11AM +0800, Lai Jiangshan wrote:
>
> The code/algorithm of the implement of current callbacks-processing
> is very efficient and technical. But when I studied it and I found
> a disadvantage:
>
> In multi-CPU systems, when a new RCU callback is being
> queued(call_rcu[_bh]), this callback will be invoked after the grace
> period for the batch with batch number = rcp->cur+2 has completed
> very very likely in current implement. Actually, this callback can be
> invoked after the grace period for the batch with
> batch number = rcp->cur+1 has completed. The delay of invocation means
> that latency of synchronize_rcu() is extended. But more important thing
> is that the callbacks usually free memory, and these works are delayed
> too! it's necessary for reclaimer to free memory as soon as
> possible when left memory is few.
Speeding up the RCU grace periods would indeed be a very good thing!
> A very simple way can solve this problem:
> a field(struct rcu_head::batch) is added to record the batch number for
> the RCU callback. And when a new RCU callback is being queued, we
> determine the batch number for this callback(head->batch = rcp->cur+1)
> and we move this callback to rdp->donelist if we find
> that head->batch <= rcp->completed when we process callbacks.
> This simple way reduces the wait time for invocation a lot. (about
> 2.5Grace Period -> 1.5Grace Period in average in multi-CPU systems)
>
> This is my algorithm. But I do not add any field for struct rcu_head
> in my implement. We just need to memorize the last 2 batches and
> their batch number, because these 2 batches include all entries that
> for whom the grace period hasn't completed. So we use a special
> linked-list rather than add a field.
> Please see the comment of struct rcu_data.
Maintaining the single list with multiple pointers into it certainly
does seem to simplify the list processing, as does extracting the common
code from call_rcu() and call_rcu_bh(). Just out of curiosity, why
did you keep donelist as a separate list instead of an additional pointer
into the mxtlist?
> rcutourture was tested successfully(x86_64/4cpu i386/2cpu i386/1cpu).
Of course, RCU implementations need careful inspection, testing and
validation. Running rcutorture is a good first step, but unfortunately
only a first step. So I need to ask you the following questions:
1. How long did you run rcutorture?
2. Do you have access to weak-memory-order machines on which
to do rcutorture testing? (If not, I expect that we can
motivate testing elsewhere.)
3. Did you run CPU hotplug while running rcutorture? Doing so
is extremely important, as RCU interacts with CPU hotplug.
4. Did you use the rcutorture test_no_idle_hz and shuffle_interval
arguments to test out RCU's interaction with CONFIG_NO_HZ?
(This requires running a CONFIG_NO_HZ kernel.)
5. One concern I have is the removal of a few memory barriers.
Could you please tell me why it is safe to remove these?
Could you please run any additional combinations of tests that you
are able to given the hardware you have access to?
And thank you very much for all your work in simplifying and speeding
up RCU grace-period detection! There may be some additional work
required, but this patch does look promising!
Thanx, Paul
> Signed-off-by: Lai Jiangshan <laijs@...fujitsu.com>
> ---
> diff --git a/include/linux/rcuclassic.h b/include/linux/rcuclassic.h
> index b3aa05b..fce7106 100644
> --- a/include/linux/rcuclassic.h
> +++ b/include/linux/rcuclassic.h
> @@ -66,11 +66,7 @@ static inline int rcu_batch_after(long a, long b)
> return (a - b) > 0;
> }
>
> -/*
> - * Per-CPU data for Read-Copy UPdate.
> - * nxtlist - new callbacks are added here
> - * curlist - current batch for which quiescent cycle started if any
> - */
> +/* Per-CPU data for Read-Copy UPdate. */
> struct rcu_data {
> /* 1) quiescent state handling : */
> long quiescbatch; /* Batch # for grace period */
> @@ -78,12 +74,24 @@ struct rcu_data {
> int qs_pending; /* core waits for quiesc state */
>
> /* 2) batch handling */
> - long batch; /* Batch # for current RCU batch */
> + /*
> + * if nxtlist is not NULL, then:
> + * batch:
> + * The batch # for the last entry of nxtlist
> + * [*nxttail[1], NULL = *nxttail[2]):
> + * Entries that batch # <= batch
> + * [*nxttail[0], *nxttail[1]):
> + * Entries that batch # <= batch - 1
> + * [nxtlist, *nxttail[0]):
> + * Entries that batch # <= batch - 2
> + * The grace period for these entries has completed, and
> + * the other grace-period-completed entries may be moved
> + * here temporarily in rcu_process_callbacks().
> + */
> + long batch;
> struct rcu_head *nxtlist;
> - struct rcu_head **nxttail;
> + struct rcu_head **nxttail[3];
> long qlen; /* # of queued callbacks */
> - struct rcu_head *curlist;
> - struct rcu_head **curtail;
> struct rcu_head *donelist;
> struct rcu_head **donetail;
> long blimit; /* Upper limit on a processed batch */
> diff --git a/kernel/rcuclassic.c b/kernel/rcuclassic.c
> index f4ffbd0..36ceb9f 100644
> --- a/kernel/rcuclassic.c
> +++ b/kernel/rcuclassic.c
> @@ -104,6 +104,31 @@ static inline void force_quiescent_state(struct rcu_data *rdp,
> }
> #endif
>
> +static void __call_rcu(struct rcu_head *head, struct rcu_ctrlblk *rcp,
> + struct rcu_data *rdp)
> +{
> + long batch;
> + smp_mb(); /* reads the most recently updated value of rcu->cur. */
> + batch = rcp->cur + 1;
> +
> + if (rdp->nxtlist && rdp->batch <= batch - 1) {
> + /* process callbacks */
> + rdp->nxttail[0] = rdp->nxttail[1];
> + rdp->nxttail[1] = rdp->nxttail[2];
> + if (rdp->batch <= batch - 2)
> + rdp->nxttail[0] = rdp->nxttail[2];
> + }
> +
> + rdp->batch = batch;
> + *rdp->nxttail[2] = head;
> + rdp->nxttail[2] = &head->next;
> +
> + if (unlikely(++rdp->qlen > qhimark)) {
> + rdp->blimit = INT_MAX;
> + force_quiescent_state(rdp, &rcu_ctrlblk);
> + }
> +}
> +
> /**
> * call_rcu - Queue an RCU callback for invocation after a grace period.
> * @head: structure to be used for queueing the RCU updates.
> @@ -119,18 +144,11 @@ void call_rcu(struct rcu_head *head,
> void (*func)(struct rcu_head *rcu))
> {
> unsigned long flags;
> - struct rcu_data *rdp;
>
> head->func = func;
> head->next = NULL;
> local_irq_save(flags);
> - rdp = &__get_cpu_var(rcu_data);
> - *rdp->nxttail = head;
> - rdp->nxttail = &head->next;
> - if (unlikely(++rdp->qlen > qhimark)) {
> - rdp->blimit = INT_MAX;
> - force_quiescent_state(rdp, &rcu_ctrlblk);
> - }
> + __call_rcu(head, &rcu_ctrlblk, &__get_cpu_var(rcu_data));
> local_irq_restore(flags);
> }
> EXPORT_SYMBOL_GPL(call_rcu);
> @@ -155,20 +173,11 @@ void call_rcu_bh(struct rcu_head *head,
> void (*func)(struct rcu_head *rcu))
> {
> unsigned long flags;
> - struct rcu_data *rdp;
>
> head->func = func;
> head->next = NULL;
> local_irq_save(flags);
> - rdp = &__get_cpu_var(rcu_bh_data);
> - *rdp->nxttail = head;
> - rdp->nxttail = &head->next;
> -
> - if (unlikely(++rdp->qlen > qhimark)) {
> - rdp->blimit = INT_MAX;
> - force_quiescent_state(rdp, &rcu_bh_ctrlblk);
> - }
> -
> + __call_rcu(head, &rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
> local_irq_restore(flags);
> }
> EXPORT_SYMBOL_GPL(call_rcu_bh);
> @@ -197,12 +206,6 @@ EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
> static inline void raise_rcu_softirq(void)
> {
> raise_softirq(RCU_SOFTIRQ);
> - /*
> - * The smp_mb() here is required to ensure that this cpu's
> - * __rcu_process_callbacks() reads the most recently updated
> - * value of rcu->cur.
> - */
> - smp_mb();
> }
>
> /*
> @@ -265,11 +268,6 @@ static void rcu_start_batch(struct rcu_ctrlblk *rcp)
> if (rcp->next_pending &&
> rcp->completed == rcp->cur) {
> rcp->next_pending = 0;
> - /*
> - * next_pending == 0 must be visible in
> - * __rcu_process_callbacks() before it can see new value of cur.
> - */
> - smp_wmb();
> rcp->cur++;
>
> /*
> @@ -350,13 +348,15 @@ static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
> * which is dead and hence not processing interrupts.
> */
> static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
> - struct rcu_head **tail)
> + struct rcu_head **tail, long batch)
> {
> - local_irq_disable();
> - *this_rdp->nxttail = list;
> - if (list)
> - this_rdp->nxttail = tail;
> - local_irq_enable();
> + if (list) {
> + local_irq_disable();
> + this_rdp->batch = batch;
> + *this_rdp->nxttail[2] = list;
> + this_rdp->nxttail[2] = tail;
> + local_irq_enable();
> + }
> }
>
> static void __rcu_offline_cpu(struct rcu_data *this_rdp,
> @@ -370,9 +370,8 @@ static void __rcu_offline_cpu(struct rcu_data *this_rdp,
> if (rcp->cur != rcp->completed)
> cpu_quiet(rdp->cpu, rcp);
> spin_unlock_bh(&rcp->lock);
> - rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail);
> - rcu_move_batch(this_rdp, rdp->curlist, rdp->curtail);
> - rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail);
> + rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail, rcp->cur + 1);
> + rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail[2], rcp->cur + 1);
> }
>
> static void rcu_offline_cpu(int cpu)
> @@ -402,33 +401,37 @@ static void rcu_offline_cpu(int cpu)
> static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
> struct rcu_data *rdp)
> {
> - if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch)) {
> - *rdp->donetail = rdp->curlist;
> - rdp->donetail = rdp->curtail;
> - rdp->curlist = NULL;
> - rdp->curtail = &rdp->curlist;
> - }
> -
> - if (rdp->nxtlist && !rdp->curlist) {
> + if (rdp->nxtlist) {
> local_irq_disable();
> - rdp->curlist = rdp->nxtlist;
> - rdp->curtail = rdp->nxttail;
> - rdp->nxtlist = NULL;
> - rdp->nxttail = &rdp->nxtlist;
> - local_irq_enable();
>
> - /*
> - * start the next batch of callbacks
> + /* move the other grace-period-completed entries to
> + * [rdp->nxtlist, *rdp->nxttail[0]) temporarily
> */
> -
> - /* determine batch number */
> - rdp->batch = rcp->cur + 1;
> - /* see the comment and corresponding wmb() in
> - * the rcu_start_batch()
> + if (rdp->batch <= rcp->completed)
> + rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2];
> + else if (rdp->batch - 1 <= rcp->completed)
> + rdp->nxttail[0] = rdp->nxttail[1];
> +
> + /* the grace period for entries in
> + * [rdp->nxtlist, *rdp->nxttail[0]) has completed and
> + * move these entries to donelist
> */
> - smp_rmb();
> + if (rdp->nxttail[0] != &rdp->nxtlist) {
> + *rdp->donetail = rdp->nxtlist;
> + rdp->donetail = rdp->nxttail[0];
> + rdp->nxtlist = *rdp->nxttail[0];
> + *rdp->donetail = NULL;
> +
> + if (rdp->nxttail[1] == rdp->nxttail[0])
> + rdp->nxttail[1] = &rdp->nxtlist;
> + if (rdp->nxttail[2] == rdp->nxttail[0])
> + rdp->nxttail[2] = &rdp->nxtlist;
> + rdp->nxttail[0] = &rdp->nxtlist;
> + }
> +
> + local_irq_enable();
>
> - if (!rcp->next_pending) {
> + if (rcp->cur < rdp->batch && !rcp->next_pending) {
> /* and start it/schedule start if it's a new batch */
> spin_lock(&rcp->lock);
> rcp->next_pending = 1;
> @@ -450,15 +453,24 @@ static void rcu_process_callbacks(struct softirq_action *unused)
>
> static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
> {
> - /* This cpu has pending rcu entries and the grace period
> - * for them has completed.
> - */
> - if (rdp->curlist && !rcu_batch_before(rcp->completed, rdp->batch))
> - return 1;
> -
> - /* This cpu has no pending entries, but there are new entries */
> - if (!rdp->curlist && rdp->nxtlist)
> - return 1;
> + if (rdp->nxtlist) {
> + /* This cpu has pending rcu entries and the grace period
> + * for them has completed.
> + */
> + if (rdp->batch <= rcp->completed)
> + return 1;
> + if (rdp->batch - 1 <= rcp->completed &&
> + rdp->nxttail[0] != rdp->nxttail[1])
> + return 1;
> + if (rdp->nxttail[0] != &rdp->nxtlist)
> + return 1;
> +
> + /* This cpu has pending rcu entries and the new batch
> + * for then hasn't been started nor scheduled start
> + */
> + if (rcp->cur < rdp->batch && !rcp->next_pending)
> + return 1;
> + }
>
> /* This cpu has finished callbacks to invoke */
> if (rdp->donelist)
> @@ -494,7 +506,7 @@ int rcu_needs_cpu(int cpu)
> struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
> struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
>
> - return (!!rdp->curlist || !!rdp_bh->curlist || rcu_pending(cpu));
> + return (!!rdp->nxtlist || !!rdp_bh->nxtlist || rcu_pending(cpu));
> }
>
> void rcu_check_callbacks(int cpu, int user)
> @@ -513,8 +525,7 @@ static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
> struct rcu_data *rdp)
> {
> memset(rdp, 0, sizeof(*rdp));
> - rdp->curtail = &rdp->curlist;
> - rdp->nxttail = &rdp->nxtlist;
> + rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2] = &rdp->nxtlist;
> rdp->donetail = &rdp->donelist;
> rdp->quiescbatch = rcp->completed;
> rdp->qs_pending = 0;
>
>
>
>
> --
> To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
> the body of a message to majordomo@...r.kernel.org
> More majordomo info at http://vger.kernel.org/majordomo-info.html
> Please read the FAQ at http://www.tux.org/lkml/
>
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at http://vger.kernel.org/majordomo-info.html
Please read the FAQ at http://www.tux.org/lkml/
Powered by blists - more mailing lists