lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <4C7C70EF.5020507@cn.fujitsu.com>
Date:	Tue, 31 Aug 2010 11:03:11 +0800
From:	Lai Jiangshan <laijs@...fujitsu.com>
To:	paulmck@...ux.vnet.ibm.com
CC:	Linus Torvalds <torvalds@...ux-foundation.org>,
	Ingo Molnar <mingo@...e.hu>, Andrew Morton <akpm@...l.org>,
	David Miller <davem@...emloft.net>,
	Manfred Spraul <manfred@...orfullife.com>,
	Mathieu Desnoyers <mathieu.desnoyers@...ymtl.ca>,
	Steven Rostedt <rostedt@...dmis.org>,
	Thomas Gleixner <tglx@...utronix.de>,
	Peter Zijlstra <a.p.zijlstra@...llo.nl>,
	LKML <linux-kernel@...r.kernel.org>, dipankar@...ibm.com
Subject: Re: [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU
 implementation

On 08/31/2010 07:57 AM, Paul E. McKenney wrote:
> On Mon, Aug 30, 2010 at 05:31:44PM +0800, Lai Jiangshan wrote:
>>
>> 0) contents
>> 1) overview
>> 2) scalable
>> 3) multi-GP
>> 4) integrated expedited synchronize_rcu
>> 5) preemptible
>> 6) top-down implies
>> 7) speech
>> 8) core rcuring
> 
> Interesting!  I can't claim to have dug through every detail, but figured
> I should ask a few questions first.
> 
> 							Thanx, Paul
> 
> Of course, the first question is whether it passes rcutorture, and if
> so on what hardware configuration.

Only in x86(32bit and 64bit) system.
Did you patch it in power and test it? It' need more test for different archs,
but I have no other arch.

What is "hardware configuration"?

> 
> At first glance, it looks to live somewhere between Manfred Spraul's
> counter-based hierarchical RCU and classic RCU.  There are also some
> resemblances to K42's "generations" RCU-like mechanism.
> 
>> 1) overview
>> This is a new RCU implementation: RCURING. It uses a ring atomic counters
>> to control the completion of GPs and a ring callbacks batches to process
>> the callbacks.
> 
> The ring is a set of grace periods, correct?

correct. A GP is also controled by a set ring counters in
(domain's done_complete, this GP's complete], this GP is only completed
until all these conters become zero.

I pull up a question of your here:
> For a given grace period, all quiescent states hit the same counter.
> Or am I missing something?

No.
any rcu read site has a locked_complete, if a cpu hold a locked_complete,
the counter of this locked_complete will not become zero.

Any quiescent states will release its previous rcu read site's locked_complete,
so different QS may hit different counter.
QS alway locks the newest GP(not started to waiting, domain's curr_complete)
for next rcu read site and get the new locked_complete.

What I said seems still indistinct, more questions are wellcome, questions
also help for documents. Thank you very much.

> 
>> In 2008, I think out the idea, and wrote the first 300lines code of rcuring.c,
>> but I'm too lazy to finish it
>>
>> 2) scalable
>> There is only 2 locks in every RCU domain(rcu_bh, rcu_sched, rcu_preempt),
>> one of them is for misc things, it is a infrequency lock, the other is
>> for advancing complete, it is frequency lock. But somebody else would
>> very probably do the same work for me if somebody else is holding this lock,
>> so we alway use spin_trylock() for this lock, so it is scalable.
>>
>> We don't need any lock for reporting quiescent state, we just need 2
>> atomic operations for every RCU domain.
> 
> If you have a single GP in flight, all CPUs need to do an atomic op on the
> same element of the RCU ring, correct?  

Yes, but it seldom have only a single GP in flight. We start GPs when need.

> My first thought was that each CPU
> was writing the current grace-period number to its own variable, but that
> would not require an atomic operation.  Of course, the downside of this
> latter approach is that some CPU would need to scan all CPUs' counters.
> 
>> 3) multi-GP
>> All old rcu implementations have only one waiting GP.
>>
>> time  -----ta-tb--------------------tc----------------td------------->
>> GP    -----|------GP1---------------|--------GP2------|------------->
>> cpu#x ........|---------------------------------------|...............>
>>               |->insert a callback                    |->handle callback
>>
>> the callback have to wait a partial GP1, and a full GP2, it may wait
>> near 2 GPs in worse case because it have to wait the rcu read sites witch
>> start at (tb, tc] which are needless to wait. In average, a callback
>> have to wait (1 + 1/(1+1))= 1.5GPs
>>
>> if we can have three simultaneous waiting GPs:
>>
>> GPs   .....|------GP1---------------|..........................
>>       ...........|--------GP2-----------|....................
>>       ....................|---------GP3---------------|........
>> cpu#x ........|-------------------------|..........................>
>>               |->insert a callback      |->handle callback
>> The callback is handled more quickly, In average, a callback have to wait
>> (1 + 1/(3+1))=1.25GPs
>>
>> if we can have X simultaneous waiting GPs, a callback have to wait
>> (1 + 1/(X+1)) GPs in average. The default X in this patch is 15
>> (including the reserved GPs).
> 
> Assuming that the start times of the GPs are evenly spaced, correct?

No, We start GPs when need.
If a cpu has queued some callbacks, and the callbacks's
complete is equals to rcu domain's curr_complete, this cpu will
start a new GPs for these callbacks.

> 
>> 4) integrated expedited synchronize_rcu
>> expedited synchronize_rcu is implemented in rcuring.c, not in other files.
>> The whole of it is less than 50 lines of code. we just reserve several
>> GPs for expedited synchronize_rcu, when a expedited callback is inserted,
>> we start a new GP immediately(we have reserved GPs, this step will
>> very probably success), and then force this new GP to be completed quickly.
>> (thanks to multi-GP)
> 
> I would be more convinced had you implemented force_quiescent_state()
> for preemptible RCU.  ;-)
> 
>> 5) preemptible
>> new simple preemptible code, rcu_read_lock/unlock() is simple and is inline function.
>> add new kernel-offset.c to avoid recursively header files inclusion.
>>
>> 6) top-down implies
>> irq and local_irq_disable() implies preempt_disable()/rcu_read_lock_sched(),
>> irq and local_irq_disable() implies rcu_read_lock(). (*)
>> preempt_disable()/rcu_read_lock_sched() implies rcu_read_lock(). (*)
>> *: In preemptible rcutree/rcutiny, it is false.
>>
>> 7) speech
>> I will give a speech about RCU and this RCU implementation
>> in Japan linuxcon 2010. Welcome to Japan Linuxcon 2010.
> 
> Congratulations!
> 
>> 8) core rcuring
>> (Comments and document is still been writing)
>>
>> all read_read_lock/unlock_domain() is nested in a coresponding
>> rcuring_lock/unlock(), and the algorithm handles a rcuring_lock/unlock()
>> ciritcal region as a full rcu read site ciritcal region.
>>
>> read site:
>>   rcuring_lock()
>>     lock a complete, and ensure this locked_complete - done_complete(1) > 0
>>       curr_complete(1) - locked_complete >= 0
>>     mb(1)
> 
> This is really a compiler barrier?

a really memory barrier.
these memory barriers are not in rcu_read_lock(), there in the code where
pass QS.

> 
>>
>>   rcu_derefrence(ptr)
>>
>>   rcuring_unlock()
>>     mb(2)
> 
> Ditto?
> 
>>     release its locked_complete.
>>
>> update site:
>>   rcu_assign_pointer(ptr, new)
>>
>>   (handle callback:call_rcu()+rcu_do_batch())
>>   (call_rcu():)
>>   mb(3)
>>   get the curr_complete(2)
>>   set callback's complete as this curr_complete(2).
>>     (we don't have any field to record the callback's complete, we just
>>      emulate it by rdp->done_complete and the position of this callback
>>      in rdp->batch.)
>>
>>   time pass... until done_complete(2) - callback's complete > 0
> 
> I have to ask about counter overflow on 32-bit systems.

The "complete" is "long" not "unsigned long"

if callback's complete - rdp->curr_complete > 0, we also queue
this callback to done_batch.

> 
>>   (rcu_do_batch():)
>>   mb(4)
>>   call the callback: (free old ptr, etc.)
>>
>> Signed-off-by: Lai Jiangshan <laijs@...fujitsu.com>
>> ---
>>  Kbuild                   |   75 ++-
>>  include/linux/rcupdate.h |    6 
>>  include/linux/rcuring.h  |  195 +++++++++
>>  include/linux/sched.h    |    6 
>>  init/Kconfig             |   32 +
>>  kernel/Makefile          |    1 
>>  kernel/kernel-offsets.c  |   20 
>>  kernel/rcuring.c         | 1002 +++++++++++++++++++++++++++++++++++++++++++++++
>>  kernel/sched.c           |    2 
>>  kernel/time/tick-sched.c |    4 
>>  10 files changed, 1324 insertions(+), 19 deletions(-)
>> diff --git a/Kbuild b/Kbuild
>> index e3737ad..bce37cb 100644
>> --- a/Kbuild
>> +++ b/Kbuild
>> @@ -3,7 +3,19 @@
>>  # This file takes care of the following:
>>  # 1) Generate bounds.h
>>  # 2) Generate asm-offsets.h (may need bounds.h)
>> -# 3) Check for missing system calls
>> +# 3) Generate kernel-offsets.h
>> +# 4) Check for missing system calls
>> +
>> +# Default sed regexp - multiline due to syntax constraints
>> +define sed-y
>> +	"/^->/{s:->#\(.*\):/* \1 */:; \
>> +	s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \
>> +	s:->::; p;}"
>> +endef
>> +
>> +quiet_cmd_offsets_cc_s_c = CC $(quiet_modtag)  $@
>> +cmd_offsets_cc_s_c       = $(CC) -D__GENARATING_OFFSETS__               \
> 
> s/GENARATING/GENERATING/  just to be pedantic.  ;-)
> 
>> +                           $(c_flags) -fverbose-asm -S -o $@ $<
> 
> Pulling out the offsets might have the advantage of allowing both
> rcu_read_lock() and rcu_read_unlock() to be inlined, but without pulling
> in include/linux/sched.h everywhere.  I had been thinking in terms of
> moving the state from the task struct to the taskinfo struct, but
> this approach might be worth considering.
> 
>>  #####
>>  # 1) Generate bounds.h
>> @@ -43,22 +55,14 @@ $(obj)/$(bounds-file): kernel/bounds.s Kbuild
>>  # 2) Generate asm-offsets.h
>>  #
>>
>> -offsets-file := include/generated/asm-offsets.h
>> +asm-offsets-file := include/generated/asm-offsets.h
>>
>> -always  += $(offsets-file)
>> -targets += $(offsets-file)
>> +always  += $(asm-offsets-file)
>> +targets += $(asm-offsets-file)
>>  targets += arch/$(SRCARCH)/kernel/asm-offsets.s
>>
>> -
>> -# Default sed regexp - multiline due to syntax constraints
>> -define sed-y
>> -	"/^->/{s:->#\(.*\):/* \1 */:; \
>> -	s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \
>> -	s:->::; p;}"
>> -endef
>> -
>> -quiet_cmd_offsets = GEN     $@
>> -define cmd_offsets
>> +quiet_cmd_asm_offsets = GEN     $@
>> +define cmd_asm_offsets
>>  	(set -e; \
>>  	 echo "#ifndef __ASM_OFFSETS_H__"; \
>>  	 echo "#define __ASM_OFFSETS_H__"; \
>> @@ -78,13 +82,48 @@ endef
>>  arch/$(SRCARCH)/kernel/asm-offsets.s: arch/$(SRCARCH)/kernel/asm-offsets.c \
>>                                        $(obj)/$(bounds-file) FORCE
>>  	$(Q)mkdir -p $(dir $@)
>> -	$(call if_changed_dep,cc_s_c)
>> +	$(call if_changed_dep,offsets_cc_s_c)
>> +
>> +$(obj)/$(asm-offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild
>> +	$(call cmd,asm_offsets)
>> +
>> +#####
>> +# 3) Generate kernel-offsets.h
>> +#
>> +
>> +kernel-offsets-file := include/generated/kernel-offsets.h
>> +
>> +always  += $(kernel-offsets-file)
>> +targets += $(kernel-offsets-file)
>> +targets += kernel/kernel-offsets.s
>> +
>> +quiet_cmd_kernel_offsets = GEN     $@
>> +define cmd_kernel_offsets
>> +	(set -e; \
>> +	 echo "#ifndef __LINUX_KERNEL_OFFSETS_H__"; \
>> +	 echo "#define __LINUX_KERNEL_OFFSETS_H__"; \
>> +	 echo "/*"; \
>> +	 echo " * DO NOT MODIFY."; \
>> +	 echo " *"; \
>> +	 echo " * This file was generated by Kbuild"; \
>> +	 echo " *"; \
>> +	 echo " */"; \
>> +	 echo ""; \
>> +	 sed -ne $(sed-y) $<; \
>> +	 echo ""; \
>> +	 echo "#endif" ) > $@
>> +endef
>> +
>> +# We use internal kbuild rules to avoid the "is up to date" message from make
>> +kernel/kernel-offsets.s: kernel/kernel-offsets.c $(obj)/$(bounds-file) \
>> +                         $(obj)/$(asm-offsets-file) FORCE
>> +	$(call if_changed_dep,offsets_cc_s_c)
>>
>> -$(obj)/$(offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild
>> -	$(call cmd,offsets)
>> +$(obj)/$(kernel-offsets-file): kernel/kernel-offsets.s Kbuild
>> +	$(call cmd,kernel_offsets)
>>
>>  #####
>> -# 3) Check for missing system calls
>> +# 4) Check for missing system calls
>>  #
>>
>>  quiet_cmd_syscalls = CALL    $<
>> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
>> index 89414d6..42cd6bc 100644
>> --- a/include/linux/rcupdate.h
>> +++ b/include/linux/rcupdate.h
>> @@ -124,6 +124,7 @@ extern void rcu_bh_qs(int cpu);
>>  extern void rcu_check_callbacks(int cpu, int user);
>>  struct notifier_block;
>>
>> +#ifndef CONFIG_RCURING
>>  #ifdef CONFIG_NO_HZ
>>
>>  extern void rcu_enter_nohz(void);
>> @@ -140,11 +141,14 @@ static inline void rcu_exit_nohz(void)
>>  }
>>
>>  #endif /* #else #ifdef CONFIG_NO_HZ */
>> +#endif
>>
>>  #if defined(CONFIG_TREE_RCU) || defined(CONFIG_TREE_PREEMPT_RCU)
>>  #include <linux/rcutree.h>
>>  #elif defined(CONFIG_TINY_RCU) || defined(CONFIG_TINY_PREEMPT_RCU)
>>  #include <linux/rcutiny.h>
>> +#elif defined(CONFIG_RCURING)
>> +#include <linux/rcuring.h>
>>  #else
>>  #error "Unknown RCU implementation specified to kernel configuration"
>>  #endif
>> @@ -686,6 +690,7 @@ struct rcu_synchronize {
>>
>>  extern void wakeme_after_rcu(struct rcu_head  *head);
>>
>> +#ifndef CONFIG_RCURING
>>  #ifdef CONFIG_PREEMPT_RCU
>>
>>  /**
>> @@ -710,6 +715,7 @@ extern void call_rcu(struct rcu_head *head,
>>  #define	call_rcu	call_rcu_sched
>>
>>  #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
>> +#endif
>>
>>  /**
>>   * call_rcu_bh() - Queue an RCU for invocation after a quicker grace period.
>> diff --git a/include/linux/rcuring.h b/include/linux/rcuring.h
>> new file mode 100644
>> index 0000000..343b932
>> --- /dev/null
>> +++ b/include/linux/rcuring.h
>> @@ -0,0 +1,195 @@
>> +/*
>> + * Read-Copy Update mechanism for mutual exclusion
>> + *
>> + * This program is free software; you can redistribute it and/or modify
>> + * it under the terms of the GNU General Public License as published by
>> + * the Free Software Foundation; either version 2 of the License, or
>> + * (at your option) any later version.
>> + *
>> + * This program is distributed in the hope that it will be useful,
>> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
>> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
>> + * GNU General Public License for more details.
>> + *
>> + * You should have received a copy of the GNU General Public License
>> + * along with this program; if not, write to the Free Software
>> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
>> + *
>> + * Copyright IBM Corporation, 2008
>> + * Copyright Fujitsu 2008-2010 Lai Jiangshan
>> + *
>> + * Authors: Dipankar Sarma <dipankar@...ibm.com>
>> + *	    Manfred Spraul <manfred@...orfullife.com>
>> + *	    Paul E. McKenney <paulmck@...ux.vnet.ibm.com> Hierarchical version
>> + *	    Lai Jiangshan <laijs@...fujitsu.com> RCURING version
>> + */
>> +
>> +#ifndef __LINUX_RCURING_H
>> +#define __LINUX_RCURING_H
>> +
>> +#define __rcu_read_lock_bh() local_bh_disable()
>> +#define __rcu_read_unlock_bh() local_bh_enable()
>> +#define __rcu_read_lock_sched() preempt_disable()
>> +#define __rcu_read_unlock_sched() preempt_enable()
>> +
>> +#ifdef CONFIG_RCURING_BH
>> +extern void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *));
>> +extern void synchronize_rcu_bh(void);
>> +extern void synchronize_rcu_bh_expedited(void);
>> +extern void rcu_bh_qs(int cpu);
>> +extern long rcu_batches_completed_bh(void);
>> +extern void rcu_barrier_bh(void);
>> +extern void rcu_bh_force_quiescent_state(void);
>> +#else /* CONFIG_RCURING_BH */
>> +#define call_rcu_bh call_rcu_sched
>> +#define synchronize_rcu_bh synchronize_rcu_sched
>> +#define synchronize_rcu_bh_expedited synchronize_rcu_sched_expedited
>> +#define rcu_bh_qs(cpu) do { (void)(cpu); } while (0)
>> +#define rcu_batches_completed_bh rcu_batches_completed_sched
>> +#define rcu_barrier_bh rcu_barrier_sched
>> +#define rcu_bh_force_quiescent_state rcu_sched_force_quiescent_state
>> +#endif /* CONFIG_RCURING_BH */
>> +
>> +extern
>> +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *));
>> +extern void synchronize_rcu_sched(void);
>> +#define synchronize_sched synchronize_rcu_sched
>> +extern void synchronize_rcu_sched_expedited(void);
>> +#define synchronize_sched_expedited synchronize_rcu_sched_expedited
>> +extern void rcu_note_context_switch(int cpu);
>> +extern long rcu_batches_completed_sched(void);
>> +extern void rcu_barrier_sched(void);
>> +extern void rcu_sched_force_quiescent_state(void);
>> +
>> +/*
>> + * The @flags is valid only when RCURING_PREEMPT_SAVED is set.
>> + *
>> + * When preemptible rcu read site is preempted, we save RCURING_PREEMPT_SAVED,
>> + * RCURING_PREEMPT_QS and this read site's locked_complete(only the last
>> + * RCURING_COUNTERS_SHIFT bits) in the @flags.
>> + *
>> + * When the outmost rcu read site is closed, we will clear
>> + * RCURING_PREEMPT_QS bit if it's saved, and let rcu system knows
>> + * this task has passed a quiescent state and release the old read site's
>> + * locked_complete.
>> + */
>> +#define RCURING_PREEMPT_SAVED	(1 << 31)
>> +#define RCURING_PREEMPT_QS	(1 << 30)
>> +#define RCURING_PREEMPT_FLAGS	(RCURING_PREEMPT_SAVED | RCURING_PREEMPT_QS)
>> +
>> +struct rcu_task_preempt {
>> +	unsigned int nesting;
>> +	unsigned int flags;
>> +};
>> +
>> +static inline void rcu_read_lock_preempt(struct rcu_task_preempt *rcu_task)
>> +{
>> +	rcu_task->nesting++;
>> +	barrier();
>> +}
>> +
>> +static inline void rcu_read_unlock_preempt(struct rcu_task_preempt *rcu_task)
>> +{
>> +	int nesting = rcu_task->nesting;
>> +
>> +	if (nesting == 1) {
>> +		unsigned int flags;
>> +
>> +		barrier();
>> +		flags = ACCESS_ONCE(rcu_task->flags);
>> +
>> +		if ((flags & RCURING_PREEMPT_FLAGS) == RCURING_PREEMPT_FLAGS) {
>> +			flags &= ~RCURING_PREEMPT_QS;
>> +			ACCESS_ONCE(rcu_task->flags) = flags;
>> +		}
>> +	}
>> +
>> +	barrier();
>> +	rcu_task->nesting = nesting - 1;
>> +}
>> +
>> +#ifdef CONFIG_RCURING_PREEMPT
>> +
>> +#ifdef __GENARATING_OFFSETS__
>> +#define task_rcu_preempt(p) ((struct rcu_task_preempt *)NULL)
> 
> The above looks a bit strange.  The idea is that if you were generating
> offsets for the fields within rcu_task_preempt, you would select the
> fields?  Something like the following?

when  __GENARATING_OFFSETS__, no one really use task_rcu_preempt(),
it just for avoiding the compiling warnings.

> 
> #define example_offset(p) &(((struct rcu_task_preempt *)NULL)->field_a)
> 
>> +#else
>> +#include <generated/kernel-offsets.h>
>> +#define task_rcu_preempt(p)						\
>> +	((struct rcu_task_preempt *)(((void *)p) + TASK_RCU_PREEMPT))
>> +#endif
>> +
>> +#define current_rcu_preempt() task_rcu_preempt(current)
>> +#define __rcu_read_lock() rcu_read_lock_preempt(current_rcu_preempt())
>> +#define __rcu_read_unlock() rcu_read_unlock_preempt(current_rcu_preempt())
>> +extern
>> +void call_rcu_preempt(struct rcu_head *head, void (*func)(struct rcu_head *));
>> +#define call_rcu call_rcu_preempt
>> +extern void synchronize_rcu_preempt(void);
>> +#define synchronize_rcu synchronize_rcu_preempt
>> +extern void synchronize_rcu_preempt_expedited(void);
>> +#define synchronize_rcu_expedited synchronize_rcu_preempt_expedited
>> +extern long rcu_batches_completed_preempt(void);
>> +#define rcu_batches_completed rcu_batches_completed_preempt
>> +extern void rcu_barrier_preempt(void);
>> +#define rcu_barrier rcu_barrier_preempt
>> +extern void rcu_preempt_force_quiescent_state(void);
>> +#define rcu_force_quiescent_state rcu_preempt_force_quiescent_state
>> +
>> +struct task_struct;
>> +static inline void rcu_copy_process(struct task_struct *p)
>> +{
>> +	struct rcu_task_preempt *rcu_task = task_rcu_preempt(p);
>> +
>> +	rcu_task->nesting = 0;
>> +	rcu_task->flags = 0;
>> +}
>> +
>> +static inline void exit_rcu(void)
>> +{
>> +	if (current_rcu_preempt()->nesting) {
>> +		WARN_ON(1);
>> +		current_rcu_preempt()->nesting = 0;
> 
> If the RCU core was waiting on this task, how does it learn that this
> task is no longer blocking the current grace period(s)?

A schedule() will be called after exit_rcu(), and RCU core collect infos
when task do schedule().

> 
>> +	}
>> +}
>> +
>> +#else /*CONFIG_RCURING_PREEMPT */
>> +
>> +#define __rcu_read_lock() __rcu_read_lock_sched();
>> +#define __rcu_read_unlock() __rcu_read_unlock_sched();
>> +#define call_rcu call_rcu_sched
>> +#define synchronize_rcu synchronize_rcu_sched
>> +#define synchronize_rcu_expedited synchronize_rcu_sched_expedited
>> +#define rcu_batches_completed rcu_batches_completed_sched
>> +#define rcu_barrier rcu_barrier_sched
>> +#define rcu_force_quiescent_state rcu_sched_force_quiescent_state
>> +
>> +static inline void rcu_copy_process(struct task_struct *p) {}
>> +static inline void exit_rcu(void) {}
>> +#endif /* CONFIG_RCURING_PREEMPT */
>> +
>> +#ifdef CONFIG_NO_HZ
>> +extern void rcu_kernel_enter(void);
>> +extern void rcu_kernel_exit(void);
>> +
>> +static inline void rcu_nmi_enter(void) { rcu_kernel_enter(); }
>> +static inline void rcu_nmi_exit(void) { rcu_kernel_exit(); }
>> +static inline void rcu_irq_enter(void) { rcu_kernel_enter(); }
>> +static inline void rcu_irq_exit(void) { rcu_kernel_exit(); }
>> +extern void rcu_enter_nohz(void);
>> +extern void rcu_exit_nohz(void);
>> +#else
>> +static inline void rcu_nmi_enter(void) {}
>> +static inline void rcu_nmi_exit(void) {}
>> +static inline void rcu_irq_enter(void) {}
>> +static inline void rcu_irq_exit(void) {}
>> +static inline void rcu_enter_nohz(void) {}
>> +static inline void rcu_exit_nohz(void) {}
>> +#endif
>> +
>> +extern int rcu_needs_cpu(int cpu);
>> +extern void rcu_check_callbacks(int cpu, int user);
>> +
>> +extern void rcu_scheduler_starting(void);
>> +extern int rcu_scheduler_active __read_mostly;
>> +
>> +#endif /* __LINUX_RCURING_H */
>> diff --git a/include/linux/sched.h b/include/linux/sched.h
>> index e18473f..15b332d 100644
>> --- a/include/linux/sched.h
>> +++ b/include/linux/sched.h
>> @@ -1211,6 +1211,10 @@ struct task_struct {
>>  	struct rcu_node *rcu_blocked_node;
>>  #endif /* #ifdef CONFIG_TREE_PREEMPT_RCU */
>>
>> +#ifdef CONFIG_RCURING_PREEMPT
>> +	struct rcu_task_preempt rcu_task_preempt;
>> +#endif
>> +
>>  #if defined(CONFIG_SCHEDSTATS) || defined(CONFIG_TASK_DELAY_ACCT)
>>  	struct sched_info sched_info;
>>  #endif
>> @@ -1757,7 +1761,7 @@ static inline void rcu_copy_process(struct task_struct *p)
>>  	INIT_LIST_HEAD(&p->rcu_node_entry);
>>  }
>>
>> -#else
>> +#elif !defined(CONFIG_RCURING)
>>
>>  static inline void rcu_copy_process(struct task_struct *p)
>>  {
>> diff --git a/init/Kconfig b/init/Kconfig
>> index a619a1a..48e58d9 100644
>> --- a/init/Kconfig
>> +++ b/init/Kconfig
>> @@ -374,6 +374,9 @@ config TINY_PREEMPT_RCU
>>  	  for real-time UP systems.  This option greatly reduces the
>>  	  memory footprint of RCU.
>>
>> +config RCURING
>> +	bool "Multi-GP ring based RCU"
>> +
>>  endchoice
>>
>>  config PREEMPT_RCU
>> @@ -450,6 +453,35 @@ config TREE_RCU_TRACE
>>  	  TREE_PREEMPT_RCU implementations, permitting Makefile to
>>  	  trivially select kernel/rcutree_trace.c.
>>
>> +config RCURING_BH
>> +	bool "RCU for bh"
>> +	default y
>> +	depends on RCURING && !PREEMPT_RT
>> +	help
>> +	  If Y, use a independent rcu domain for rcu_bh.
>> +	  If N, rcu_bh will map to rcu_sched(except rcu_read_lock_bh,
>> +	  rcu_read_unlock_bh).
> 
> If I understand correctly, "N" defeats the purpose of bh, which is to
> have faster grace periods -- not having to wait for a context switch.

I remembered, BH is for defeating DOS, so somebody will not need
rcu domain for BH if he don't care this kind of DOS.

> 
>> +
>> +config RCURING_BH_PREEMPT
>> +	bool
>> +	depends on PREEMPT_RT
>> +	help
>> +	  rcu_bh will map to rcu_preempt.
> 
> If I understand what you are doing, this config option will break
> networking.

In PREEMPT_RT, BH will be preemptible. but RCURING_BH_PREEMPT
is not used here, will be removed.

> 
>> +
>> +config RCURING_PREEMPT
>> +	bool "RCURING based reemptible RCU"
>> +	default n
>> +	depends on RCURING && PREEMPT
>> +	help
>> +	  If Y, normal rcu functionality will map to rcu_preempt.
>> +	  If N, normal rcu functionality will map to rcu_sched.
>> +
>> +config RCURING_COUNTERS_SHIFT
>> +	int "RCURING's counter shift"
>> +	range 1 10
>> +	depends on RCURING
>> +	default 4
>> +
>>  endmenu # "RCU Subsystem"
>>
>>  config IKCONFIG
>> diff --git a/kernel/Makefile b/kernel/Makefile
>> index 92b7420..66eab8c 100644
>> --- a/kernel/Makefile
>> +++ b/kernel/Makefile
>> @@ -86,6 +86,7 @@ obj-$(CONFIG_TREE_PREEMPT_RCU) += rcutree.o
>>  obj-$(CONFIG_TREE_RCU_TRACE) += rcutree_trace.o
>>  obj-$(CONFIG_TINY_RCU) += rcutiny.o
>>  obj-$(CONFIG_TINY_PREEMPT_RCU) += rcutiny.o
>> +obj-$(CONFIG_RCURING) += rcuring.o
>>  obj-$(CONFIG_RELAY) += relay.o
>>  obj-$(CONFIG_SYSCTL) += utsname_sysctl.o
>>  obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o
>> diff --git a/kernel/kernel-offsets.c b/kernel/kernel-offsets.c
>> new file mode 100644
>> index 0000000..0d30817
>> --- /dev/null
>> +++ b/kernel/kernel-offsets.c
>> @@ -0,0 +1,20 @@
>> +/*
>> + * Generate definitions needed by assembly language modules.
>> + *
>> + * Copyright (C) 2008-2010 Lai Jiangshan
>> + */
>> +
>> +#define __GENERATING_KERNEL_OFFSETS__
>> +#include <linux/rcupdate.h>
>> +#include <linux/sched.h>
>> +#include <linux/kbuild.h>
>> +
>> +void foo(void);
>> +
>> +void foo(void)
>> +{
>> +#ifdef CONFIG_RCURING_PREEMPT
>> +	OFFSET(TASK_RCU_PREEMPT, task_struct, rcu_task_preempt);
>> +#endif
>> +}
>> +
>> diff --git a/kernel/rcuring.c b/kernel/rcuring.c
>> new file mode 100644
>> index 0000000..e7cedb5
>> --- /dev/null
>> +++ b/kernel/rcuring.c
>> @@ -0,0 +1,1002 @@
>> +/*
>> + * Read-Copy Update mechanism for mutual exclusion
>> + *
>> + * This program is free software; you can redistribute it and/or modify
>> + * it under the terms of the GNU General Public License as published by
>> + * the Free Software Foundation; either version 2 of the License, or
>> + * (at your option) any later version.
>> + *
>> + * This program is distributed in the hope that it will be useful,
>> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
>> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
>> + * GNU General Public License for more details.
>> + *
>> + * You should have received a copy of the GNU General Public License
>> + * along with this program; if not, write to the Free Software
>> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
>> + *
>> + * Copyright IBM Corporation, 2008
>> + * Copyright Fujitsu 2008-2010 Lai Jiangshan
>> + *
>> + * Authors: Dipankar Sarma <dipankar@...ibm.com>
>> + *	    Manfred Spraul <manfred@...orfullife.com>
>> + *	    Paul E. McKenney <paulmck@...ux.vnet.ibm.com> Hierarchical version
>> + *	    Lai Jiangshan <laijs@...fujitsu.com> RCURING version
>> + */
>> +
>> +#include <linux/rcupdate.h>
>> +#include <linux/percpu.h>
>> +#include <asm/atomic.h>
>> +#include <linux/spinlock.h>
>> +#include <linux/module.h>
>> +#include <linux/interrupt.h>
>> +#include <linux/cpu.h>
>> +
>> +#define RCURING_IDX_SHIFT	CONFIG_RCURING_COUNTERS_SHIFT
>> +#define RCURING_IDX_MAX		(1L << RCURING_IDX_SHIFT)
>> +#define RCURING_IDX_MASK	(RCURING_IDX_MAX - 1)
>> +#define RCURING_IDX(complete)	((complete) & RCURING_IDX_MASK)
>> +
>> +struct rcuring {
>> +	atomic_t	ctr[RCURING_IDX_MAX];
>> +	long		curr_complete;
>> +	long		done_complete;
>> +
>> +	raw_spinlock_t	lock;
>> +};
>> +
>> +/* It is long history that we use -300 as an initial complete */
>> +#define RCURING_INIT_COMPLETE	-300L
>> +#define RCURING_INIT(name)						\
>> +{									\
>> +	{[RCURING_IDX(RCURING_INIT_COMPLETE)] = ATOMIC_INIT(1),},	\
>> +	RCURING_INIT_COMPLETE,						\
>> +	RCURING_INIT_COMPLETE - 1,					\
>> +	__RAW_SPIN_LOCK_UNLOCKED(name.lock),				\
> 
> Please use the gcc-style initializers here.

I will be use gcc-style initializers in next version.

> 
>> +}
>> +
>> +static inline
>> +int rcuring_ctr_read(struct rcuring *rr, unsigned long complete)
>> +{
>> +	int res;
>> +
>> +	/* atomic_read() does not ensure to imply barrier(). */
>> +	barrier();
>> +	res = atomic_read(rr->ctr + RCURING_IDX(complete));
>> +	barrier();
>> +
>> +	return res;
>> +}
>> +
>> +void __rcuring_advance_done_complete(struct rcuring *rr)
>> +{
>> +	long wait_complete = rr->done_complete + 1;
>> +
>> +	if (!rcuring_ctr_read(rr, wait_complete)) {
>> +		do {
>> +			wait_complete++;
>> +		} while (!rcuring_ctr_read(rr, wait_complete));
>> +
>> +		ACCESS_ONCE(rr->done_complete) = wait_complete - 1;
>> +	}
>> +}
> 
> For a given grace period, all quiescent states hit the same counter.
> Or am I missing something?

No. see the top of this email.

> 
>> +
>> +static inline
>> +int rcuring_could_advance_done_complete(struct rcuring *rr)
>> +{
>> +	return !rcuring_ctr_read(rr, rr->done_complete + 1);
>> +}
>> +
>> +static inline
>> +void rcuring_advance_done_complete(struct rcuring *rr)
>> +{
>> +	if (rcuring_could_advance_done_complete(rr)) {
>> +		if (__raw_spin_trylock(&rr->lock)) {
>> +			__rcuring_advance_done_complete(rr);
>> +			__raw_spin_unlock(&rr->lock);
>> +		}
>> +	}
>> +}
>> +
>> +void __rcuring_advance_curr_complete(struct rcuring *rr, long next_complete)
>> +{
>> +	long curr_complete = rr->curr_complete;
>> +
>> +	if (curr_complete + 1 == next_complete) {
>> +		if (!rcuring_ctr_read(rr, next_complete)) {
>> +			ACCESS_ONCE(rr->curr_complete) = next_complete;
>> +			smp_mb__before_atomic_inc();
>> +			atomic_inc(rr->ctr + RCURING_IDX(next_complete));
>> +			smp_mb__after_atomic_inc();
>> +			atomic_dec(rr->ctr + RCURING_IDX(curr_complete));
>> +		}
>> +	}
>> +}
>> +
>> +static inline
>> +int rcuring_could_advance_complete(struct rcuring *rr, long next_complete)
>> +{
>> +	if (rcuring_could_advance_done_complete(rr))
>> +		return 1;
>> +
>> +	if (rr->curr_complete + 1 == next_complete) {
>> +		if (!rcuring_ctr_read(rr, next_complete))
>> +			return 1;
>> +	}
>> +
>> +	return 0;
>> +}
>> +
>> +static inline
>> +void rcuring_advance_complete(struct rcuring *rr, long next_complete)
>> +{
>> +	if (rcuring_could_advance_complete(rr, next_complete)) {
>> +		if (__raw_spin_trylock(&rr->lock)) {
>> +			__rcuring_advance_done_complete(rr);
>> +			__rcuring_advance_curr_complete(rr, next_complete);
>> +			__raw_spin_unlock(&rr->lock);
>> +		}
>> +	}
>> +}
>> +
>> +static inline
>> +void rcuring_advance_complete_force(struct rcuring *rr, long next_complete)
>> +{
>> +	if (rcuring_could_advance_complete(rr, next_complete)) {
>> +		__raw_spin_lock(&rr->lock);
>> +		__rcuring_advance_done_complete(rr);
>> +		__rcuring_advance_curr_complete(rr, next_complete);
>> +		__raw_spin_unlock(&rr->lock);
>> +	}
>> +}
>> +
>> +static inline long __locked_complete_adjust(int locked_idx, long complete)
>> +{
>> +	long locked_complete;
>> +
>> +	locked_complete = (complete & ~RCURING_IDX_MASK) | (long)locked_idx;
>> +	if (locked_complete - complete <= 0)
>> +		return locked_complete;
>> +	else
>> +		return locked_complete - RCURING_IDX_MAX;
>> +}
>> +
>> +static long rcuring_lock(struct rcuring *rr)
>> +{
>> +	long locked_complete;
>> +	int idx;
>> +
>> +	for (;;) {
>> +		idx = RCURING_IDX(ACCESS_ONCE(rr->curr_complete));
>> +		if (likely(atomic_inc_not_zero(rr->ctr + idx)))
>> +			break;
>> +	}
>> +	smp_mb__after_atomic_inc();
>> +
>> +	locked_complete = ACCESS_ONCE(rr->curr_complete);
>> +	if (likely(RCURING_IDX(locked_complete) == idx))
>> +		return locked_complete;
>> +	else
>> +		return __locked_complete_adjust(idx, locked_complete);
>> +}
>> +
>> +static void rcuring_unlock(struct rcuring *rr, long locked_complete)
>> +{
>> +	smp_mb__before_atomic_dec();
>> +
>> +	atomic_dec(rr->ctr + RCURING_IDX(locked_complete));
>> +}
>> +
>> +static inline
>> +void rcuring_dup_lock(struct rcuring *rr, long locked_complete)
>> +{
>> +	atomic_inc(rr->ctr + RCURING_IDX(locked_complete));
>> +}
>> +
>> +static inline
>> +long rcuring_advance_lock(struct rcuring *rr, long locked_complete)
>> +{
>> +	long new_locked_complete = locked_complete;
>> +
>> +	if (locked_complete != rr->curr_complete) {
>> +		/*
>> +		 * Lock the new complete at first, and then release
>> +		 * the old one. It prevents errors when NMI/SMI occurs
>> +		 * after rcuring_unlock() - we still lock it.
>> +		 */
>> +		new_locked_complete = rcuring_lock(rr);
>> +		rcuring_unlock(rr, locked_complete);
>> +	}
>> +
>> +	return new_locked_complete;
>> +}
>> +
>> +static inline long rcuring_get_done_complete(struct rcuring *rr)
>> +{
>> +	return ACCESS_ONCE(rr->done_complete);
>> +}
>> +
>> +static inline long rcuring_get_curr_complete(struct rcuring *rr)
>> +{
>> +	return ACCESS_ONCE(rr->curr_complete);
>> +}
>> +
>> +struct rcu_batch {
>> +	struct rcu_head *list, **tail;
>> +};
>> +
>> +static void rcu_batch_merge(struct rcu_batch *to, struct rcu_batch *from)
>> +{
>> +	if (from->list != NULL) {
>> +		*to->tail = from->list;
>> +		to->tail = from->tail;
>> +
>> +		from->list = NULL;
>> +		from->tail = &from->list;
>> +	}
>> +}
>> +
>> +static void rcu_batch_add(struct rcu_batch *batch, struct rcu_head *new)
>> +{
>> +	*batch->tail = new;
>> +	batch->tail = &new->next;
>> +}
>> +
>> +struct rcu_data {
>> +	long locked_complete;
>> +
>> +	/*
>> +	 * callbacks are in batches (done_complete, curr_complete]
>> +	 * curr_complete - done_complete >= 0
>> +	 * curr_complete - done_complete <= RCURING_IDX_MAX
>> +	 * domain's curr_complete - curr_complete >= 0
>> +	 * domain's done_complete - done_complete >= 0
>> +	 *
>> +	 * curr_complete == done_complete just means @batch is empty.
>> +	 * we have at least one callback in batch[RCURING_IDX(done_complete)]
>> +	 * if curr_complete != done_complete
>> +	 */
>> +	long curr_complete;
>> +	long done_complete;
>> +
>> +	struct rcu_batch batch[RCURING_IDX_MAX];
>> +	struct rcu_batch done_batch;
>> +
>> +	unsigned int qlen;
>> +	unsigned long jiffies;
>> +	unsigned long stall_jiffies;
>> +};
>> +
>> +struct rcu_domain {
>> +	struct rcuring rcuring;
>> +	const char *domain_name;
>> +	struct rcu_data __percpu *rcu_data;
>> +
>> +	spinlock_t lock; /* this lock is for fqs or other misc things */
>> +	long fqs_complete;
>> +	void (*force_quiescent_state)(struct rcu_domain *domain,
>> +			long fqs_complete);
>> +};
>> +
>> +#define EXPEDITED_GP_RESERVED_RECOMMEND (RCURING_IDX_SHIFT - 1)
>> +
>> +static inline long gp_reserved(struct rcu_domain *domain)
>> +{
>> +	return EXPEDITED_GP_RESERVED_RECOMMEND;
>> +}
>> +
>> +static inline void __init rcu_data_init(struct rcu_data *rdp)
>> +{
>> +	int idx;
>> +
>> +	for (idx = 0; idx < RCURING_IDX_MAX; idx++)
>> +		rdp->batch[idx].tail = &rdp->batch[idx].list;
>> +
>> +	rdp->done_batch.tail = &rdp->done_batch.list;
>> +}
>> +
>> +static void do_advance_callbacks(struct rcu_data *rdp, long done_complete)
>> +{
>> +	int idx;
>> +
>> +	while (rdp->done_complete != done_complete) {
>> +		rdp->done_complete++;
>> +		idx = RCURING_IDX(rdp->done_complete);
>> +		rcu_batch_merge(&rdp->done_batch, rdp->batch + idx);
>> +	}
>> +}
>> +
>> +/* Is value in the range (@left_open, @right_close] */
>> +static inline bool in_range(long left_open, long right_close, long value)
>> +{
>> +	return left_open - value < 0 && value - right_close <= 0;
>> +}
>> +
>> +static void advance_callbacks(struct rcu_data *rdp, long done_complete,
>> +		long curr_complete)
>> +{
>> +	if (in_range(done_complete, curr_complete, rdp->curr_complete)) {
>> +		do_advance_callbacks(rdp, done_complete);
>> +	} else {
>> +		do_advance_callbacks(rdp, rdp->curr_complete);
>> +		rdp->curr_complete = done_complete;
>> +		rdp->done_complete = done_complete;
>> +	}
>> +}
>> +
>> +static void __force_quiescent_state(struct rcu_domain *domain,
>> +		long fqs_complete)
>> +{
>> +	int cpu;
>> +	long fqs_complete_old;
>> +	long curr_complete = rcuring_get_curr_complete(&domain->rcuring);
>> +	long done_complete = rcuring_get_done_complete(&domain->rcuring);
>> +
>> +	spin_lock(&domain->lock);
>> +
>> +	fqs_complete_old = domain->fqs_complete;
>> +	if (!in_range(done_complete, curr_complete, fqs_complete_old))
>> +		fqs_complete_old = done_complete;
>> +
>> +	if (fqs_complete_old - fqs_complete >= 0) {
>> +		spin_unlock(&domain->lock);
>> +		return;
>> +	}
>> +
>> +	domain->fqs_complete = fqs_complete;
>> +	spin_unlock(&domain->lock);
>> +
>> +	for_each_online_cpu(cpu) {
>> +		struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu);
>> +		long locked_complete = ACCESS_ONCE(rdp->locked_complete);
>> +
>> +		if (!in_range(fqs_complete_old, fqs_complete, locked_complete))
>> +			continue;
> 
> Is preemption disabled here?

Yes, it called with local irq disabled.

> 
>> +
>> +		if (cpu == smp_processor_id())
>> +			set_tsk_need_resched(current);
>> +		else
>> +			smp_send_reschedule(cpu);
>> +	}
>> +}
>> +
>> +static void force_quiescent_state(struct rcu_domain *domain, long fqs_complete)
>> +{
>> +	domain->force_quiescent_state(domain, fqs_complete);
>> +}
>> +
>> +static
>> +long prepare_for_new_callback(struct rcu_domain *domain, struct rcu_data *rdp)
>> +{
>> +	long curr_complete, done_complete;
>> +	struct rcuring *rr = &domain->rcuring;
>> +
>> +	smp_mb();
>> +	curr_complete = rcuring_get_curr_complete(rr);
>> +	done_complete = rcuring_get_done_complete(rr);
>> +
>> +	advance_callbacks(rdp, done_complete, curr_complete);
>> +	rdp->curr_complete = curr_complete;
>> +
>> +	if (!rdp->qlen) {
>> +		rdp->jiffies = jiffies;
>> +		rdp->stall_jiffies = jiffies;
>> +	}
>> +
>> +	return curr_complete;
>> +}
>> +
>> +static long __call_rcu(struct rcu_domain *domain, struct rcu_head *head,
>> +		void (*func)(struct rcu_head *))
>> +{
>> +	struct rcu_data *rdp;
>> +	long curr_complete;
>> +
>> +	head->next = NULL;
>> +	head->func = func;
>> +
>> +	rdp = this_cpu_ptr(domain->rcu_data);
>> +	curr_complete = prepare_for_new_callback(domain, rdp);
>> +	rcu_batch_add(rdp->batch + RCURING_IDX(curr_complete), head);
>> +	rdp->qlen++;
>> +
>> +	return curr_complete;
>> +}
>> +
>> +static void print_rcuring_stall(struct rcu_domain *domain)
>> +{
>> +	struct rcuring *rr = &domain->rcuring;
>> +	unsigned long curr_complete = rcuring_get_curr_complete(rr);
>> +	unsigned long done_complete = rcuring_get_done_complete(rr);
>> +	int cpu;
>> +
>> +	printk(KERN_ERR "RCU is stall, cpu=%d, domain_name=%s\n", smp_processor_id(),
>> +			domain->domain_name);
>> +
>> +	printk(KERN_ERR "domain's complete(done/curr)=%ld/%ld\n",
>> +			curr_complete, done_complete);
>> +	printk(KERN_ERR "curr_ctr=%d, wait_ctr=%d\n",
>> +			rcuring_ctr_read(rr, curr_complete),
>> +			rcuring_ctr_read(rr, done_complete + 1));
>> +
>> +	for_each_online_cpu(cpu) {
>> +		struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu);
>> +		printk(KERN_ERR "cpu=%d, qlen=%d, complete(locked/dome/curr/)="
>> +				"%ld/%ld/%ld\n", cpu, rdp->qlen,
>> +				rdp->locked_complete, rdp->done_complete,
>> +				rdp->curr_complete);
>> +	}
>> +}
>> +
>> +static void __rcu_check_callbacks(struct rcu_domain *domain,
>> +		struct rcu_data *rdp, int in_rcu_softirq)
>> +{
>> +	long curr_complete, done_complete;
>> +	struct rcuring *rr = &domain->rcuring;
>> +
>> +	if (!rdp->qlen)
>> +		return;
>> +
>> +	rcuring_advance_done_complete(rr);
>> +
>> +	curr_complete = rcuring_get_curr_complete(rr);
>> +	done_complete = ACCESS_ONCE(rr->done_complete);
>> +	advance_callbacks(rdp, done_complete, curr_complete);
>> +
>> +	if (rdp->curr_complete == curr_complete
>> +			&& rdp->batch[RCURING_IDX(rdp->curr_complete)].list) {
>> +		long max_gp_allow = RCURING_IDX_MAX - gp_reserved(domain);
>> +
>> +		if (curr_complete - done_complete <= max_gp_allow)
>> +			rcuring_advance_complete(rr, curr_complete + 1);
>> +	}
>> +
>> +	if (rdp->done_batch.list) {
>> +		if (!in_rcu_softirq)
>> +			raise_softirq(RCU_SOFTIRQ);
>> +	} else {
>> +		if (jiffies - rdp->jiffies > 10) {
>> +			force_quiescent_state(domain, rdp->curr_complete);
>> +			rdp->jiffies = jiffies;
>> +		}
>> +		if (jiffies - rdp->stall_jiffies > 2 * HZ) {
>> +			print_rcuring_stall(domain);
>> +			rdp->stall_jiffies = jiffies;
>> +		}
>> +	}
>> +}
>> +
>> +static void rcu_do_batch(struct rcu_domain *domain, struct rcu_data *rdp)
>> +{
>> +	int count = 0;
>> +	struct rcu_head *list, *next;
>> +
>> +	list = rdp->done_batch.list;
>> +
>> +	if (list) {
>> +		rdp->done_batch.list = NULL;
>> +		rdp->done_batch.tail = &rdp->done_batch.list;
>> +
>> +		local_irq_enable();
>> +
>> +		smp_mb();
>> +
>> +		while (list) {
>> +			next = list->next;
>> +			prefetch(next);
>> +			list->func(list);
>> +			count++;
>> +			list = next;
>> +		}
>> +		local_irq_disable();
>> +
>> +		rdp->qlen -= count;
>> +		rdp->jiffies = jiffies;
>> +	}
>> +}
>> +
>> +static int __rcu_needs_cpu(struct rcu_data *rdp)
>> +{
>> +	return !!rdp->qlen;
>> +}
>> +
>> +static inline
>> +void __rcu_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp)
>> +{
>> +	rdp->locked_complete = rcuring_advance_lock(&domain->rcuring,
>> +			rdp->locked_complete);
>> +}
>> +
>> +static inline void rcu_preempt_save_complete(struct rcu_task_preempt *rcu_task,
>> +		long locked_complete)
>> +{
>> +	unsigned int new_flags;
>> +
>> +	new_flags = RCURING_PREEMPT_FLAGS | RCURING_IDX(locked_complete);
>> +	ACCESS_ONCE(rcu_task->flags) = new_flags;
>> +}
>> +
>> +/*
>> + * Every cpu hold a lock_complete. But for preempt rcu, any task may also
>> + * hold a lock_complete. This function increases lock_complete for the current
>> + * cpu and check(dup_lock_and_save or release or advance) the lock_complete of
>> + * the current task.
>> + */
>> +static inline
>> +void __rcu_preempt_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp,
>> +		struct rcu_task_preempt *rcu_task)
>> +{
>> +	long locked_complete = rdp->locked_complete;
>> +	unsigned int flags = ACCESS_ONCE(rcu_task->flags);
>> +
>> +	if (!(flags & RCURING_PREEMPT_SAVED)) {
>> +		BUG_ON(flags & RCURING_PREEMPT_QS);
>> +		if (rcu_task->nesting) {
>> +			/* dup_lock_and_save current task's lock_complete */
>> +			rcuring_dup_lock(&domain->rcuring, locked_complete);
>> +			rcu_preempt_save_complete(rcu_task, locked_complete);
>> +		}
>> +	} else if (!(rcu_task->nesting)) {
>> +		/* release current task's lock_complete */
>> +		rcuring_unlock(&domain->rcuring, (long)flags);
>> +		ACCESS_ONCE(rcu_task->flags) = 0;
>> +	} else if (!(flags & RCURING_PREEMPT_QS)) {
>> +		/* advance_and_save current task's lock_complete */
>> +		if (RCURING_IDX(locked_complete) != RCURING_IDX((long)flags)) {
>> +			rcuring_unlock(&domain->rcuring, (long)flags);
>> +			rcuring_dup_lock(&domain->rcuring, locked_complete);
>> +		}
>> +		rcu_preempt_save_complete(rcu_task, locked_complete);
>> +	}
>> +
>> +	/* increases lock_complete for the current cpu */
>> +	rdp->locked_complete = rcuring_advance_lock(&domain->rcuring,
>> +			locked_complete);
>> +}
> 
> When we need to boost the priority of preempted readers, how do we tell
> who they are?

And struct list_head in struct rcu_task_preempt, and a ring struct list_head
in rdp, and use percpu locks or scheduler's runqueues's locks
protect all these struct list_head.

> 
>> +
>> +static void __synchronize_rcu(struct rcu_domain *domain, int expedited)
>> +{
>> +	struct rcu_synchronize rcu;
>> +	long complete;
>> +
>> +	init_completion(&rcu.completion);
>> +
>> +	local_irq_disable();
>> +	complete = __call_rcu(domain, &rcu.head, wakeme_after_rcu);
>> +
>> +	if (expedited) {
>> +		/*
>> +		 * Fore a new gp to be started immediately(can use reserved gp).
>> +		 * But if all gp are used, it will fail to start new gp,
>> +		 * and we have to wait until some new gps available.
>> +		 */
>> +		rcuring_advance_complete_force(&domain->rcuring,
>> +				complete + 1);
> 
> It looks to me like rcuring_advance_complete_force() can just fail
> silently.  How does this work?

If it fails, We have no GPs lefts, synchronize_rcu_expedited()
will becomes synchronize_rcu(). the next fqs will expedite it only a little.

It's OK, we have 3 reserved GPs by default, It means we allows 3
synchronize_rcu_expedited()s concurrent at least and it's enough.

> 
>> +
>> +		/* Fore qs and expedite the new gp */
>> +		force_quiescent_state(domain, complete);
>> +	}
>> +	local_irq_enable();
>> +
>> +	wait_for_completion(&rcu.completion);
>> +}
>> +
>> +static inline long __rcu_batches_completed(struct rcu_domain *domain)
>> +{
>> +	return rcuring_get_done_complete(&domain->rcuring);
>> +}
>> +
>> +#ifdef CONFIG_RCURING_BH
>> +#define gen_code_for_bh(gen_code, args...) gen_code(bh, ##args)
> 
> Where is gen_code() defined?
> 
>> +#else
>> +#define gen_code_for_bh(gen_code, args...)
>> +#endif
>> +#define gen_code_for_sched(gen_code, args...) gen_code(sched, ##args)
>> +#ifdef CONFIG_RCURING_PREEMPT
>> +#define gen_code_for_preempt(gen_code, args...) gen_code(preempt, ##args)
>> +#else
>> +#define gen_code_for_preempt(gen_code, args...)
>> +#endif
>> +
>> +#define GEN_CODES(gen_code, args...)			\
>> +	gen_code_for_bh(gen_code, ##args)		\
>> +	gen_code_for_sched(gen_code, ##args)		\
>> +	gen_code_for_preempt(gen_code, ##args)		\
>> +
>> +#define gen_basic_code(domain)						\
>> +									\
>> +static void force_quiescent_state_##domain(struct rcu_domain *domain,	\
>> +		long fqs_complete);					\
>> +									\
>> +static DEFINE_PER_CPU(struct rcu_data, rcu_data_##domain);		\
>> +									\
>> +struct rcu_domain rcu_domain_##domain =					\
>> +{									\
>> +	.rcuring = RCURING_INIT(rcu_domain_##domain.rcuring),		\
>> +	.domain_name = #domain,						\
>> +	.rcu_data = &rcu_data_##domain,					\
>> +	.lock = __SPIN_LOCK_UNLOCKED(rcu_domain_##domain.lock),		\
>> +	.force_quiescent_state = force_quiescent_state_##domain,	\
>> +};									\
>> +									\
>> +void call_rcu_##domain(struct rcu_head *head,				\
>> +		void (*func)(struct rcu_head *))			\
>> +{									\
>> +	unsigned long flags;						\
>> +									\
>> +	local_irq_save(flags);						\
>> +	__call_rcu(&rcu_domain_##domain, head, func);			\
>> +	local_irq_restore(flags);					\
>> +}									\
>> +EXPORT_SYMBOL_GPL(call_rcu_##domain);					\
>> +									\
>> +void synchronize_rcu_##domain(void)					\
>> +{									\
>> +	__synchronize_rcu(&rcu_domain_##domain, 0);			\
>> +}									\
>> +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain);				\
>> +									\
>> +void synchronize_rcu_##domain##_expedited(void)				\
>> +{									\
>> +	__synchronize_rcu(&rcu_domain_##domain, 1);			\
>> +}									\
>> +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain##_expedited);		\
>> +									\
>> +long rcu_batches_completed_##domain(void)				\
>> +{									\
>> +	return __rcu_batches_completed(&rcu_domain_##domain);		\
>> +}									\
>> +EXPORT_SYMBOL_GPL(rcu_batches_completed_##domain);			\
>> +									\
>> +void rcu_##domain##_force_quiescent_state(void)				\
>> +{									\
>> +	force_quiescent_state(&rcu_domain_##domain, 0);			\
>> +}									\
>> +EXPORT_SYMBOL_GPL(rcu_##domain##_force_quiescent_state);
> 
> I used to use a single macro for the synchronize_rcu*() APIs, but was
> talked into separate functions.  Maybe some of the more ornate recent
> macros have shifted people away from duplicate code in functions, so
> might be time to try again.  ;-)

Too much dumplicated code, I have to do this :-)

> 
>> +GEN_CODES(gen_basic_code)
>> +
>> +static DEFINE_PER_CPU(int, kernel_count);
>> +
>> +#define LOCK_COMPLETE(domain)						\
>> +	long complete_##domain = rcuring_lock(&rcu_domain_##domain.rcuring);
>> +#define SAVE_COMPLETE(domain)						\
>> +	per_cpu(rcu_data_##domain, cpu).locked_complete = complete_##domain;
>> +#define LOAD_COMPLETE(domain)						\
>> +	int complete_##domain = per_cpu(rcu_data_##domain, cpu).locked_complete;
>> +#define RELEASE_COMPLETE(domain)					\
>> +	rcuring_unlock(&rcu_domain_##domain.rcuring, complete_##domain);
>> +
>> +static void __rcu_kernel_enter_outmost(int cpu)
>> +{
>> +	GEN_CODES(LOCK_COMPLETE)
>> +
>> +	barrier();
>> +	per_cpu(kernel_count, cpu) = 1;
>> +	barrier();
>> +
>> +	GEN_CODES(SAVE_COMPLETE)
>> +}
>> +
>> +static void __rcu_kernel_exit_outmost(int cpu)
>> +{
>> +	GEN_CODES(LOAD_COMPLETE)
>> +
>> +	barrier();
>> +	per_cpu(kernel_count, cpu) = 0;
>> +	barrier();
>> +
>> +	GEN_CODES(RELEASE_COMPLETE)
>> +}
>> +
>> +#ifdef CONFIG_RCURING_BH
>> +static void force_quiescent_state_bh(struct rcu_domain *domain,
>> +		long fqs_complete)
>> +{
>> +	__force_quiescent_state(domain, fqs_complete);
>> +}
>> +
>> +static inline void rcu_bh_qsctr_inc_irqoff(int cpu)
>> +{
>> +	__rcu_qsctr_inc(&rcu_domain_bh, &per_cpu(rcu_data_bh, cpu));
>> +}
>> +
>> +void rcu_bh_qs(int cpu)
>> +{
>> +	unsigned long flags;
>> +
>> +	local_irq_save(flags);
>> +	rcu_bh_qsctr_inc_irqoff(cpu);
>> +	local_irq_restore(flags);
>> +}
>> +
>> +#else /* CONFIG_RCURING_BH */
>> +static inline void rcu_bh_qsctr_inc_irqoff(int cpu) { (void)(cpu); }
>> +#endif /* CONFIG_RCURING_BH */
>> +
>> +static void force_quiescent_state_sched(struct rcu_domain *domain,
>> +		long fqs_complete)
>> +{
>> +	__force_quiescent_state(domain, fqs_complete);
>> +}
>> +
>> +static inline void rcu_sched_qsctr_inc_irqoff(int cpu)
>> +{
>> +	__rcu_qsctr_inc(&rcu_domain_sched, &per_cpu(rcu_data_sched, cpu));
>> +}
>> +
>> +#ifdef CONFIG_RCURING_PREEMPT
>> +static void force_quiescent_state_preempt(struct rcu_domain *domain,
>> +		long fqs_complete)
>> +{
>> +	/* To Be Implemented */
>> +}
>> +
>> +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu)
>> +{
>> +	__rcu_preempt_qsctr_inc(&rcu_domain_preempt,
>> +			&per_cpu(rcu_data_preempt, cpu),
>> +			&current->rcu_task_preempt);
>> +}
>> +
>> +#else /* CONFIG_RCURING_PREEMPT */
>> +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu)
>> +{
>> +	(void)(cpu);
>> +	(void)(__rcu_preempt_qsctr_inc);
>> +}
>> +#endif /* CONFIG_RCURING_PREEMPT */
>> +
>> +#define gen_needs_cpu(domain, cpu)					\
>> +	if (__rcu_needs_cpu(&per_cpu(rcu_data_##domain, cpu)))		\
>> +		return 1;
>> +int rcu_needs_cpu(int cpu)
>> +{
>> +	GEN_CODES(gen_needs_cpu, cpu)
>> +	return 0;
>> +}
>> +
>> +#define call_func(domain, func, args...)				\
>> +	func(&rcu_domain_##domain, &__get_cpu_var(rcu_data_##domain), ##args);
>> +
>> +/*
>> + * Note a context switch. This is a quiescent state for RCU-sched,
>> + * and requires special handling for preemptible RCU.
>> + */
>> +void rcu_note_context_switch(int cpu)
>> +{
>> +	unsigned long flags;
>> +
>> +#ifdef CONFIG_HOTPLUG_CPU
>> +	/*
>> +	 * The stoper thread need to schedule() to the idle thread
>> +	 * after CPU_DYING.
>> +	 */
>> +	if (unlikely(!per_cpu(kernel_count, cpu))) {
>> +		BUG_ON(cpu_online(cpu));
>> +		return;
>> +	}
>> +#endif
>> +
>> +	local_irq_save(flags);
>> +	rcu_bh_qsctr_inc_irqoff(cpu);
>> +	rcu_sched_qsctr_inc_irqoff(cpu);
>> +	rcu_preempt_qsctr_inc_irqoff(cpu);
>> +	local_irq_restore(flags);
>> +}
>> +
>> +static int rcu_cpu_idle(int cpu)
>> +{
>> +	return idle_cpu(cpu) && rcu_scheduler_active &&
>> +		!in_softirq() && hardirq_count() <= (1 << HARDIRQ_SHIFT);
>> +}
>> +
>> +void rcu_check_callbacks(int cpu, int user)
>> +{
>> +	unsigned long flags;
>> +
>> +	local_irq_save(flags);
>> +
>> +	if (user || rcu_cpu_idle(cpu)) {
>> +		rcu_bh_qsctr_inc_irqoff(cpu);
>> +		rcu_sched_qsctr_inc_irqoff(cpu);
>> +		rcu_preempt_qsctr_inc_irqoff(cpu);
>> +	} else if (!in_softirq())
>> +		rcu_bh_qsctr_inc_irqoff(cpu);
>> +
>> +	GEN_CODES(call_func, __rcu_check_callbacks, 0)
>> +	local_irq_restore(flags);
>> +}
>> +
>> +static void rcu_process_callbacks(struct softirq_action *unused)
>> +{
>> +	local_irq_disable();
>> +	GEN_CODES(call_func, __rcu_check_callbacks, 1)
>> +	GEN_CODES(call_func, rcu_do_batch)
>> +	local_irq_enable();
>> +}
>> +
>> +static void __cpuinit __rcu_offline_cpu(struct rcu_domain *domain,
>> +		struct rcu_data *off_rdp, struct rcu_data *rdp)
>> +{
>> +	if (off_rdp->qlen > 0) {
>> +		unsigned long flags;
>> +		long curr_complete;
>> +
>> +		/* move all callbacks in @off_rdp to @off_rdp->done_batch */
>> +		do_advance_callbacks(off_rdp, off_rdp->curr_complete);
>> +
>> +		/* move all callbacks in @off_rdp to this cpu(@rdp) */
>> +		local_irq_save(flags);
>> +		curr_complete = prepare_for_new_callback(domain, rdp);
>> +		rcu_batch_merge(rdp->batch + RCURING_IDX(curr_complete),
>> +				&off_rdp->done_batch);
>> +		rdp->qlen += off_rdp->qlen;
>> +		local_irq_restore(flags);
>> +
>> +		off_rdp->qlen = 0;
>> +	}
>> +}
>> +
>> +#define gen_offline(domain, cpu, recieve_cpu)				\
>> +	__rcu_offline_cpu(&rcu_domain_##domain,				\
>> +			&per_cpu(rcu_data_##domain, cpu),		\
>> +			&per_cpu(rcu_data_##domain, recieve_cpu));
>> +
>> +static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
>> +				unsigned long action, void *hcpu)
>> +{
>> +	int cpu = (long)hcpu;
>> +	int recieve_cpu;
>> +
>> +	(void)(recieve_cpu);
>> +	(void)(__rcu_offline_cpu);
>> +	(void)(__rcu_kernel_exit_outmost);
>> +
>> +	switch (action) {
>> +#ifdef CONFIG_HOTPLUG_CPU
>> +	case CPU_DYING:
>> +	case CPU_DYING_FROZEN:
>> +		/*
>> +		 * the machine is stopped now, we can access to
>> +		 * any other cpu data.
>> +		 * */
>> +		recieve_cpu = cpumask_any_but(cpu_online_mask, cpu);
>> +		GEN_CODES(gen_offline, cpu, recieve_cpu)
>> +		__rcu_kernel_exit_outmost(cpu);
>> +		break;
>> +#endif
>> +	case CPU_STARTING:
>> +	case CPU_STARTING_FROZEN:
>> +		__rcu_kernel_enter_outmost(cpu);
>> +	default:
>> +		break;
>> +	}
>> +	return NOTIFY_OK;
>> +}
>> +
>> +#define rcu_init_domain_data(domain, cpu)			\
>> +	for_each_possible_cpu(cpu)				\
>> +		rcu_data_init(&per_cpu(rcu_data_##domain, cpu));
>> +
>> +void __init rcu_init(void)
>> +{
>> +	int cpu;
>> +
>> +	GEN_CODES(rcu_init_domain_data, cpu)
> 
> This one actually consumes more lines than would writing out the
> calls explicitly.  ;-)

like following? more lines and more "#ifdef"
GEN_CODES reduces dumplicated code and reduces "#ifdef"s.

#ifdef CONFIG_RCURING_BH
for_each_possible_cpu(cpu)
		rcu_data_init(&per_cpu(rcu_data_bh, cpu));
#endif
for_each_possible_cpu(cpu)
		rcu_data_init(&per_cpu(rcu_data_sched, cpu));
#ifdef CONFIG_RCURING_PREEMPT
for_each_possible_cpu(cpu)
		rcu_data_init(&per_cpu(rcu_data_preempt, cpu));
#endif

> 
>> +
>> +	__rcu_kernel_enter_outmost(raw_smp_processor_id());
>> +	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
>> +	cpu_notifier(rcu_cpu_notify, 0);
>> +}
>> +
>> +int rcu_scheduler_active __read_mostly;
>> +EXPORT_SYMBOL_GPL(rcu_scheduler_active);
>> +
>> +/*
>> + * This function is invoked towards the end of the scheduler's initialization
>> + * process. Before this is called, the idle task might contain
>> + * RCU read-side critical sections (during which time, this idle
>> + * task is booting the system). After this function is called, the
>> + * idle tasks are prohibited from containing RCU read-side critical
>> + * sections. This function also enables RCU lockdep checking.
>> + */
>> +void rcu_scheduler_starting(void)
>> +{
>> +	rcu_scheduler_active = 1;
>> +}
>> +
>> +#ifdef CONFIG_NO_HZ
>> +
>> +void rcu_kernel_enter(void)
>> +{
>> +	unsigned long flags;
>> +
>> +	raw_local_irq_save(flags);
>> +	if (__get_cpu_var(kernel_count) == 0)
>> +		__rcu_kernel_enter_outmost(raw_smp_processor_id());
>> +	else
>> +		__get_cpu_var(kernel_count)++;
>> +	raw_local_irq_restore(flags);
>> +}
>> +
>> +void rcu_kernel_exit(void)
>> +{
>> +	unsigned long flags;
>> +
>> +	raw_local_irq_save(flags);
>> +	if (__get_cpu_var(kernel_count) == 1)
>> +		__rcu_kernel_exit_outmost(raw_smp_processor_id());
>> +	else
>> +		__get_cpu_var(kernel_count)--;
>> +	raw_local_irq_restore(flags);
>> +}
>> +
>> +void rcu_enter_nohz(void)
>> +{
>> +	unsigned long flags;
>> +
>> +	raw_local_irq_save(flags);
>> +	__rcu_kernel_exit_outmost(raw_smp_processor_id());
>> +	raw_local_irq_restore(flags);
>> +}
>> +
>> +void rcu_exit_nohz(void)
>> +{
>> +	unsigned long flags;
>> +
>> +	raw_local_irq_save(flags);
>> +	__rcu_kernel_enter_outmost(raw_smp_processor_id());
>> +	raw_local_irq_restore(flags);
>> +}
>> +#endif /* CONFIG_NO_HZ */
>> +
>> +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head);
>> +static struct completion rcu_barrier_completion;
>> +static struct kref rcu_barrier_wait_ref;
>> +static DEFINE_MUTEX(rcu_barrier_mutex);
>> +
>> +static void rcu_barrier_release_ref(struct kref *notused)
>> +{
>> +	complete(&rcu_barrier_completion);
>> +}
>> +
>> +static void rcu_barrier_callback(struct rcu_head *notused)
>> +{
>> +	kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref);
>> +}
>> +
>> +static void rcu_barrier_func(void *data)
>> +{
>> +	struct rcu_domain *rcu_domain = data;
>> +
>> +	kref_get(&rcu_barrier_wait_ref);
>> +	__call_rcu(rcu_domain, &__get_cpu_var(rcu_barrier_head),
>> +			rcu_barrier_callback);
>> +}
>> +
>> +static void __rcu_barrier(struct rcu_domain *rcu_domain)
>> +{
>> +	mutex_lock(&rcu_barrier_mutex);
>> +
>> +	init_completion(&rcu_barrier_completion);
>> +	kref_init(&rcu_barrier_wait_ref);
>> +
>> +	/* queue barrier rcu_heads for every cpu */
>> +	on_each_cpu(rcu_barrier_func, rcu_domain, 1);
>> +
>> +	kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref);
>> +	wait_for_completion(&rcu_barrier_completion);
>> +
>> +	mutex_unlock(&rcu_barrier_mutex);
>> +}
>> +
>> +#define gen_rcu_barrier(domain)						\
>> +void rcu_barrier_##domain(void)						\
>> +{									\
>> +	__rcu_barrier(&rcu_domain_##domain);				\
>> +}									\
>> +EXPORT_SYMBOL_GPL(rcu_barrier_##domain);
>> +
>> +GEN_CODES(gen_rcu_barrier)
>> +
>> diff --git a/kernel/sched.c b/kernel/sched.c
>> index 09b574e..967f25a 100644
>> --- a/kernel/sched.c
>> +++ b/kernel/sched.c
>> @@ -9113,6 +9113,7 @@ struct cgroup_subsys cpuacct_subsys = {
>>  };
>>  #endif	/* CONFIG_CGROUP_CPUACCT */
>>
>> +#ifndef CONFIG_RCURING
>>  #ifndef CONFIG_SMP
>>
>>  void synchronize_sched_expedited(void)
>> @@ -9182,3 +9183,4 @@ void synchronize_sched_expedited(void)
>>  EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
>>
>>  #endif /* #else #ifndef CONFIG_SMP */
>> +#endif /* CONFIG_RCURING */
>> diff --git a/kernel/time/tick-sched.c b/kernel/time/tick-sched.c
>> index 3e216e0..0eba561 100644
>> --- a/kernel/time/tick-sched.c
>> +++ b/kernel/time/tick-sched.c
>> @@ -269,6 +269,10 @@ void tick_nohz_stop_sched_tick(int inidle)
>>  	cpu = smp_processor_id();
>>  	ts = &per_cpu(tick_cpu_sched, cpu);
>>
>> +	/* Don't enter nohz when cpu is offlining, it is going to die */
>> +	if (cpu_is_offline(cpu))
>> +		goto end;
>> +
>>  	/*
>>  	 * Call to tick_nohz_start_idle stops the last_update_time from being
>>  	 * updated. Thus, it must not be called in the event we are called from
>>
> 
> 

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ