lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20100830235705.GQ2420@linux.vnet.ibm.com>
Date:	Mon, 30 Aug 2010 16:57:05 -0700
From:	"Paul E. McKenney" <paulmck@...ux.vnet.ibm.com>
To:	Lai Jiangshan <laijs@...fujitsu.com>
Cc:	Linus Torvalds <torvalds@...ux-foundation.org>,
	Ingo Molnar <mingo@...e.hu>, Andrew Morton <akpm@...l.org>,
	David Miller <davem@...emloft.net>,
	Manfred Spraul <manfred@...orfullife.com>,
	Mathieu Desnoyers <mathieu.desnoyers@...ymtl.ca>,
	Steven Rostedt <rostedt@...dmis.org>,
	Thomas Gleixner <tglx@...utronix.de>,
	Peter Zijlstra <a.p.zijlstra@...llo.nl>,
	LKML <linux-kernel@...r.kernel.org>, dipankar@...ibm.com
Subject: Re: [PATCH, RFC] RCU: New scalable, multi-GP and preemptible RCU
 implementation

On Mon, Aug 30, 2010 at 05:31:44PM +0800, Lai Jiangshan wrote:
> 
> 0) contents
> 1) overview
> 2) scalable
> 3) multi-GP
> 4) integrated expedited synchronize_rcu
> 5) preemptible
> 6) top-down implies
> 7) speech
> 8) core rcuring

Interesting!  I can't claim to have dug through every detail, but figured
I should ask a few questions first.

							Thanx, Paul

Of course, the first question is whether it passes rcutorture, and if
so on what hardware configuration.

At first glance, it looks to live somewhere between Manfred Spraul's
counter-based hierarchical RCU and classic RCU.  There are also some
resemblances to K42's "generations" RCU-like mechanism.

> 1) overview
> This is a new RCU implementation: RCURING. It uses a ring atomic counters
> to control the completion of GPs and a ring callbacks batches to process
> the callbacks.

The ring is a set of grace periods, correct?

> In 2008, I think out the idea, and wrote the first 300lines code of rcuring.c,
> but I'm too lazy to finish it
> 
> 2) scalable
> There is only 2 locks in every RCU domain(rcu_bh, rcu_sched, rcu_preempt),
> one of them is for misc things, it is a infrequency lock, the other is
> for advancing complete, it is frequency lock. But somebody else would
> very probably do the same work for me if somebody else is holding this lock,
> so we alway use spin_trylock() for this lock, so it is scalable.
> 
> We don't need any lock for reporting quiescent state, we just need 2
> atomic operations for every RCU domain.

If you have a single GP in flight, all CPUs need to do an atomic op on the
same element of the RCU ring, correct?  My first thought was that each CPU
was writing the current grace-period number to its own variable, but that
would not require an atomic operation.  Of course, the downside of this
latter approach is that some CPU would need to scan all CPUs' counters.

> 3) multi-GP
> All old rcu implementations have only one waiting GP.
> 
> time  -----ta-tb--------------------tc----------------td------------->
> GP    -----|------GP1---------------|--------GP2------|------------->
> cpu#x ........|---------------------------------------|...............>
>               |->insert a callback                    |->handle callback
> 
> the callback have to wait a partial GP1, and a full GP2, it may wait
> near 2 GPs in worse case because it have to wait the rcu read sites witch
> start at (tb, tc] which are needless to wait. In average, a callback
> have to wait (1 + 1/(1+1))= 1.5GPs
> 
> if we can have three simultaneous waiting GPs:
> 
> GPs   .....|------GP1---------------|..........................
>       ...........|--------GP2-----------|....................
>       ....................|---------GP3---------------|........
> cpu#x ........|-------------------------|..........................>
>               |->insert a callback      |->handle callback
> The callback is handled more quickly, In average, a callback have to wait
> (1 + 1/(3+1))=1.25GPs
> 
> if we can have X simultaneous waiting GPs, a callback have to wait
> (1 + 1/(X+1)) GPs in average. The default X in this patch is 15
> (including the reserved GPs).

Assuming that the start times of the GPs are evenly spaced, correct?

> 4) integrated expedited synchronize_rcu
> expedited synchronize_rcu is implemented in rcuring.c, not in other files.
> The whole of it is less than 50 lines of code. we just reserve several
> GPs for expedited synchronize_rcu, when a expedited callback is inserted,
> we start a new GP immediately(we have reserved GPs, this step will
> very probably success), and then force this new GP to be completed quickly.
> (thanks to multi-GP)

I would be more convinced had you implemented force_quiescent_state()
for preemptible RCU.  ;-)

> 5) preemptible
> new simple preemptible code, rcu_read_lock/unlock() is simple and is inline function.
> add new kernel-offset.c to avoid recursively header files inclusion.
> 
> 6) top-down implies
> irq and local_irq_disable() implies preempt_disable()/rcu_read_lock_sched(),
> irq and local_irq_disable() implies rcu_read_lock(). (*)
> preempt_disable()/rcu_read_lock_sched() implies rcu_read_lock(). (*)
> *: In preemptible rcutree/rcutiny, it is false.
> 
> 7) speech
> I will give a speech about RCU and this RCU implementation
> in Japan linuxcon 2010. Welcome to Japan Linuxcon 2010.

Congratulations!

> 8) core rcuring
> (Comments and document is still been writing)
> 
> all read_read_lock/unlock_domain() is nested in a coresponding
> rcuring_lock/unlock(), and the algorithm handles a rcuring_lock/unlock()
> ciritcal region as a full rcu read site ciritcal region.
> 
> read site:
>   rcuring_lock()
>     lock a complete, and ensure this locked_complete - done_complete(1) > 0
>       curr_complete(1) - locked_complete >= 0
>     mb(1)

This is really a compiler barrier?

> 
>   rcu_derefrence(ptr)
> 
>   rcuring_unlock()
>     mb(2)

Ditto?

>     release its locked_complete.
> 
> update site:
>   rcu_assign_pointer(ptr, new)
> 
>   (handle callback:call_rcu()+rcu_do_batch())
>   (call_rcu():)
>   mb(3)
>   get the curr_complete(2)
>   set callback's complete as this curr_complete(2).
>     (we don't have any field to record the callback's complete, we just
>      emulate it by rdp->done_complete and the position of this callback
>      in rdp->batch.)
> 
>   time pass... until done_complete(2) - callback's complete > 0

I have to ask about counter overflow on 32-bit systems.

>   (rcu_do_batch():)
>   mb(4)
>   call the callback: (free old ptr, etc.)
> 
> Signed-off-by: Lai Jiangshan <laijs@...fujitsu.com>
> ---
>  Kbuild                   |   75 ++-
>  include/linux/rcupdate.h |    6 
>  include/linux/rcuring.h  |  195 +++++++++
>  include/linux/sched.h    |    6 
>  init/Kconfig             |   32 +
>  kernel/Makefile          |    1 
>  kernel/kernel-offsets.c  |   20 
>  kernel/rcuring.c         | 1002 +++++++++++++++++++++++++++++++++++++++++++++++
>  kernel/sched.c           |    2 
>  kernel/time/tick-sched.c |    4 
>  10 files changed, 1324 insertions(+), 19 deletions(-)
> diff --git a/Kbuild b/Kbuild
> index e3737ad..bce37cb 100644
> --- a/Kbuild
> +++ b/Kbuild
> @@ -3,7 +3,19 @@
>  # This file takes care of the following:
>  # 1) Generate bounds.h
>  # 2) Generate asm-offsets.h (may need bounds.h)
> -# 3) Check for missing system calls
> +# 3) Generate kernel-offsets.h
> +# 4) Check for missing system calls
> +
> +# Default sed regexp - multiline due to syntax constraints
> +define sed-y
> +	"/^->/{s:->#\(.*\):/* \1 */:; \
> +	s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \
> +	s:->::; p;}"
> +endef
> +
> +quiet_cmd_offsets_cc_s_c = CC $(quiet_modtag)  $@
> +cmd_offsets_cc_s_c       = $(CC) -D__GENARATING_OFFSETS__               \

s/GENARATING/GENERATING/  just to be pedantic.  ;-)

> +                           $(c_flags) -fverbose-asm -S -o $@ $<

Pulling out the offsets might have the advantage of allowing both
rcu_read_lock() and rcu_read_unlock() to be inlined, but without pulling
in include/linux/sched.h everywhere.  I had been thinking in terms of
moving the state from the task struct to the taskinfo struct, but
this approach might be worth considering.

>  #####
>  # 1) Generate bounds.h
> @@ -43,22 +55,14 @@ $(obj)/$(bounds-file): kernel/bounds.s Kbuild
>  # 2) Generate asm-offsets.h
>  #
> 
> -offsets-file := include/generated/asm-offsets.h
> +asm-offsets-file := include/generated/asm-offsets.h
> 
> -always  += $(offsets-file)
> -targets += $(offsets-file)
> +always  += $(asm-offsets-file)
> +targets += $(asm-offsets-file)
>  targets += arch/$(SRCARCH)/kernel/asm-offsets.s
> 
> -
> -# Default sed regexp - multiline due to syntax constraints
> -define sed-y
> -	"/^->/{s:->#\(.*\):/* \1 */:; \
> -	s:^->\([^ ]*\) [\$$#]*\([^ ]*\) \(.*\):#define \1 \2 /* \3 */:; \
> -	s:->::; p;}"
> -endef
> -
> -quiet_cmd_offsets = GEN     $@
> -define cmd_offsets
> +quiet_cmd_asm_offsets = GEN     $@
> +define cmd_asm_offsets
>  	(set -e; \
>  	 echo "#ifndef __ASM_OFFSETS_H__"; \
>  	 echo "#define __ASM_OFFSETS_H__"; \
> @@ -78,13 +82,48 @@ endef
>  arch/$(SRCARCH)/kernel/asm-offsets.s: arch/$(SRCARCH)/kernel/asm-offsets.c \
>                                        $(obj)/$(bounds-file) FORCE
>  	$(Q)mkdir -p $(dir $@)
> -	$(call if_changed_dep,cc_s_c)
> +	$(call if_changed_dep,offsets_cc_s_c)
> +
> +$(obj)/$(asm-offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild
> +	$(call cmd,asm_offsets)
> +
> +#####
> +# 3) Generate kernel-offsets.h
> +#
> +
> +kernel-offsets-file := include/generated/kernel-offsets.h
> +
> +always  += $(kernel-offsets-file)
> +targets += $(kernel-offsets-file)
> +targets += kernel/kernel-offsets.s
> +
> +quiet_cmd_kernel_offsets = GEN     $@
> +define cmd_kernel_offsets
> +	(set -e; \
> +	 echo "#ifndef __LINUX_KERNEL_OFFSETS_H__"; \
> +	 echo "#define __LINUX_KERNEL_OFFSETS_H__"; \
> +	 echo "/*"; \
> +	 echo " * DO NOT MODIFY."; \
> +	 echo " *"; \
> +	 echo " * This file was generated by Kbuild"; \
> +	 echo " *"; \
> +	 echo " */"; \
> +	 echo ""; \
> +	 sed -ne $(sed-y) $<; \
> +	 echo ""; \
> +	 echo "#endif" ) > $@
> +endef
> +
> +# We use internal kbuild rules to avoid the "is up to date" message from make
> +kernel/kernel-offsets.s: kernel/kernel-offsets.c $(obj)/$(bounds-file) \
> +                         $(obj)/$(asm-offsets-file) FORCE
> +	$(call if_changed_dep,offsets_cc_s_c)
> 
> -$(obj)/$(offsets-file): arch/$(SRCARCH)/kernel/asm-offsets.s Kbuild
> -	$(call cmd,offsets)
> +$(obj)/$(kernel-offsets-file): kernel/kernel-offsets.s Kbuild
> +	$(call cmd,kernel_offsets)
> 
>  #####
> -# 3) Check for missing system calls
> +# 4) Check for missing system calls
>  #
> 
>  quiet_cmd_syscalls = CALL    $<
> diff --git a/include/linux/rcupdate.h b/include/linux/rcupdate.h
> index 89414d6..42cd6bc 100644
> --- a/include/linux/rcupdate.h
> +++ b/include/linux/rcupdate.h
> @@ -124,6 +124,7 @@ extern void rcu_bh_qs(int cpu);
>  extern void rcu_check_callbacks(int cpu, int user);
>  struct notifier_block;
> 
> +#ifndef CONFIG_RCURING
>  #ifdef CONFIG_NO_HZ
> 
>  extern void rcu_enter_nohz(void);
> @@ -140,11 +141,14 @@ static inline void rcu_exit_nohz(void)
>  }
> 
>  #endif /* #else #ifdef CONFIG_NO_HZ */
> +#endif
> 
>  #if defined(CONFIG_TREE_RCU) || defined(CONFIG_TREE_PREEMPT_RCU)
>  #include <linux/rcutree.h>
>  #elif defined(CONFIG_TINY_RCU) || defined(CONFIG_TINY_PREEMPT_RCU)
>  #include <linux/rcutiny.h>
> +#elif defined(CONFIG_RCURING)
> +#include <linux/rcuring.h>
>  #else
>  #error "Unknown RCU implementation specified to kernel configuration"
>  #endif
> @@ -686,6 +690,7 @@ struct rcu_synchronize {
> 
>  extern void wakeme_after_rcu(struct rcu_head  *head);
> 
> +#ifndef CONFIG_RCURING
>  #ifdef CONFIG_PREEMPT_RCU
> 
>  /**
> @@ -710,6 +715,7 @@ extern void call_rcu(struct rcu_head *head,
>  #define	call_rcu	call_rcu_sched
> 
>  #endif /* #else #ifdef CONFIG_PREEMPT_RCU */
> +#endif
> 
>  /**
>   * call_rcu_bh() - Queue an RCU for invocation after a quicker grace period.
> diff --git a/include/linux/rcuring.h b/include/linux/rcuring.h
> new file mode 100644
> index 0000000..343b932
> --- /dev/null
> +++ b/include/linux/rcuring.h
> @@ -0,0 +1,195 @@
> +/*
> + * Read-Copy Update mechanism for mutual exclusion
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
> + *
> + * Copyright IBM Corporation, 2008
> + * Copyright Fujitsu 2008-2010 Lai Jiangshan
> + *
> + * Authors: Dipankar Sarma <dipankar@...ibm.com>
> + *	    Manfred Spraul <manfred@...orfullife.com>
> + *	    Paul E. McKenney <paulmck@...ux.vnet.ibm.com> Hierarchical version
> + *	    Lai Jiangshan <laijs@...fujitsu.com> RCURING version
> + */
> +
> +#ifndef __LINUX_RCURING_H
> +#define __LINUX_RCURING_H
> +
> +#define __rcu_read_lock_bh() local_bh_disable()
> +#define __rcu_read_unlock_bh() local_bh_enable()
> +#define __rcu_read_lock_sched() preempt_disable()
> +#define __rcu_read_unlock_sched() preempt_enable()
> +
> +#ifdef CONFIG_RCURING_BH
> +extern void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *));
> +extern void synchronize_rcu_bh(void);
> +extern void synchronize_rcu_bh_expedited(void);
> +extern void rcu_bh_qs(int cpu);
> +extern long rcu_batches_completed_bh(void);
> +extern void rcu_barrier_bh(void);
> +extern void rcu_bh_force_quiescent_state(void);
> +#else /* CONFIG_RCURING_BH */
> +#define call_rcu_bh call_rcu_sched
> +#define synchronize_rcu_bh synchronize_rcu_sched
> +#define synchronize_rcu_bh_expedited synchronize_rcu_sched_expedited
> +#define rcu_bh_qs(cpu) do { (void)(cpu); } while (0)
> +#define rcu_batches_completed_bh rcu_batches_completed_sched
> +#define rcu_barrier_bh rcu_barrier_sched
> +#define rcu_bh_force_quiescent_state rcu_sched_force_quiescent_state
> +#endif /* CONFIG_RCURING_BH */
> +
> +extern
> +void call_rcu_sched(struct rcu_head *head, void (*func)(struct rcu_head *));
> +extern void synchronize_rcu_sched(void);
> +#define synchronize_sched synchronize_rcu_sched
> +extern void synchronize_rcu_sched_expedited(void);
> +#define synchronize_sched_expedited synchronize_rcu_sched_expedited
> +extern void rcu_note_context_switch(int cpu);
> +extern long rcu_batches_completed_sched(void);
> +extern void rcu_barrier_sched(void);
> +extern void rcu_sched_force_quiescent_state(void);
> +
> +/*
> + * The @flags is valid only when RCURING_PREEMPT_SAVED is set.
> + *
> + * When preemptible rcu read site is preempted, we save RCURING_PREEMPT_SAVED,
> + * RCURING_PREEMPT_QS and this read site's locked_complete(only the last
> + * RCURING_COUNTERS_SHIFT bits) in the @flags.
> + *
> + * When the outmost rcu read site is closed, we will clear
> + * RCURING_PREEMPT_QS bit if it's saved, and let rcu system knows
> + * this task has passed a quiescent state and release the old read site's
> + * locked_complete.
> + */
> +#define RCURING_PREEMPT_SAVED	(1 << 31)
> +#define RCURING_PREEMPT_QS	(1 << 30)
> +#define RCURING_PREEMPT_FLAGS	(RCURING_PREEMPT_SAVED | RCURING_PREEMPT_QS)
> +
> +struct rcu_task_preempt {
> +	unsigned int nesting;
> +	unsigned int flags;
> +};
> +
> +static inline void rcu_read_lock_preempt(struct rcu_task_preempt *rcu_task)
> +{
> +	rcu_task->nesting++;
> +	barrier();
> +}
> +
> +static inline void rcu_read_unlock_preempt(struct rcu_task_preempt *rcu_task)
> +{
> +	int nesting = rcu_task->nesting;
> +
> +	if (nesting == 1) {
> +		unsigned int flags;
> +
> +		barrier();
> +		flags = ACCESS_ONCE(rcu_task->flags);
> +
> +		if ((flags & RCURING_PREEMPT_FLAGS) == RCURING_PREEMPT_FLAGS) {
> +			flags &= ~RCURING_PREEMPT_QS;
> +			ACCESS_ONCE(rcu_task->flags) = flags;
> +		}
> +	}
> +
> +	barrier();
> +	rcu_task->nesting = nesting - 1;
> +}
> +
> +#ifdef CONFIG_RCURING_PREEMPT
> +
> +#ifdef __GENARATING_OFFSETS__
> +#define task_rcu_preempt(p) ((struct rcu_task_preempt *)NULL)

The above looks a bit strange.  The idea is that if you were generating
offsets for the fields within rcu_task_preempt, you would select the
fields?  Something like the following?

#define example_offset(p) &(((struct rcu_task_preempt *)NULL)->field_a)

> +#else
> +#include <generated/kernel-offsets.h>
> +#define task_rcu_preempt(p)						\
> +	((struct rcu_task_preempt *)(((void *)p) + TASK_RCU_PREEMPT))
> +#endif
> +
> +#define current_rcu_preempt() task_rcu_preempt(current)
> +#define __rcu_read_lock() rcu_read_lock_preempt(current_rcu_preempt())
> +#define __rcu_read_unlock() rcu_read_unlock_preempt(current_rcu_preempt())
> +extern
> +void call_rcu_preempt(struct rcu_head *head, void (*func)(struct rcu_head *));
> +#define call_rcu call_rcu_preempt
> +extern void synchronize_rcu_preempt(void);
> +#define synchronize_rcu synchronize_rcu_preempt
> +extern void synchronize_rcu_preempt_expedited(void);
> +#define synchronize_rcu_expedited synchronize_rcu_preempt_expedited
> +extern long rcu_batches_completed_preempt(void);
> +#define rcu_batches_completed rcu_batches_completed_preempt
> +extern void rcu_barrier_preempt(void);
> +#define rcu_barrier rcu_barrier_preempt
> +extern void rcu_preempt_force_quiescent_state(void);
> +#define rcu_force_quiescent_state rcu_preempt_force_quiescent_state
> +
> +struct task_struct;
> +static inline void rcu_copy_process(struct task_struct *p)
> +{
> +	struct rcu_task_preempt *rcu_task = task_rcu_preempt(p);
> +
> +	rcu_task->nesting = 0;
> +	rcu_task->flags = 0;
> +}
> +
> +static inline void exit_rcu(void)
> +{
> +	if (current_rcu_preempt()->nesting) {
> +		WARN_ON(1);
> +		current_rcu_preempt()->nesting = 0;

If the RCU core was waiting on this task, how does it learn that this
task is no longer blocking the current grace period(s)?

> +	}
> +}
> +
> +#else /*CONFIG_RCURING_PREEMPT */
> +
> +#define __rcu_read_lock() __rcu_read_lock_sched();
> +#define __rcu_read_unlock() __rcu_read_unlock_sched();
> +#define call_rcu call_rcu_sched
> +#define synchronize_rcu synchronize_rcu_sched
> +#define synchronize_rcu_expedited synchronize_rcu_sched_expedited
> +#define rcu_batches_completed rcu_batches_completed_sched
> +#define rcu_barrier rcu_barrier_sched
> +#define rcu_force_quiescent_state rcu_sched_force_quiescent_state
> +
> +static inline void rcu_copy_process(struct task_struct *p) {}
> +static inline void exit_rcu(void) {}
> +#endif /* CONFIG_RCURING_PREEMPT */
> +
> +#ifdef CONFIG_NO_HZ
> +extern void rcu_kernel_enter(void);
> +extern void rcu_kernel_exit(void);
> +
> +static inline void rcu_nmi_enter(void) { rcu_kernel_enter(); }
> +static inline void rcu_nmi_exit(void) { rcu_kernel_exit(); }
> +static inline void rcu_irq_enter(void) { rcu_kernel_enter(); }
> +static inline void rcu_irq_exit(void) { rcu_kernel_exit(); }
> +extern void rcu_enter_nohz(void);
> +extern void rcu_exit_nohz(void);
> +#else
> +static inline void rcu_nmi_enter(void) {}
> +static inline void rcu_nmi_exit(void) {}
> +static inline void rcu_irq_enter(void) {}
> +static inline void rcu_irq_exit(void) {}
> +static inline void rcu_enter_nohz(void) {}
> +static inline void rcu_exit_nohz(void) {}
> +#endif
> +
> +extern int rcu_needs_cpu(int cpu);
> +extern void rcu_check_callbacks(int cpu, int user);
> +
> +extern void rcu_scheduler_starting(void);
> +extern int rcu_scheduler_active __read_mostly;
> +
> +#endif /* __LINUX_RCURING_H */
> diff --git a/include/linux/sched.h b/include/linux/sched.h
> index e18473f..15b332d 100644
> --- a/include/linux/sched.h
> +++ b/include/linux/sched.h
> @@ -1211,6 +1211,10 @@ struct task_struct {
>  	struct rcu_node *rcu_blocked_node;
>  #endif /* #ifdef CONFIG_TREE_PREEMPT_RCU */
> 
> +#ifdef CONFIG_RCURING_PREEMPT
> +	struct rcu_task_preempt rcu_task_preempt;
> +#endif
> +
>  #if defined(CONFIG_SCHEDSTATS) || defined(CONFIG_TASK_DELAY_ACCT)
>  	struct sched_info sched_info;
>  #endif
> @@ -1757,7 +1761,7 @@ static inline void rcu_copy_process(struct task_struct *p)
>  	INIT_LIST_HEAD(&p->rcu_node_entry);
>  }
> 
> -#else
> +#elif !defined(CONFIG_RCURING)
> 
>  static inline void rcu_copy_process(struct task_struct *p)
>  {
> diff --git a/init/Kconfig b/init/Kconfig
> index a619a1a..48e58d9 100644
> --- a/init/Kconfig
> +++ b/init/Kconfig
> @@ -374,6 +374,9 @@ config TINY_PREEMPT_RCU
>  	  for real-time UP systems.  This option greatly reduces the
>  	  memory footprint of RCU.
> 
> +config RCURING
> +	bool "Multi-GP ring based RCU"
> +
>  endchoice
> 
>  config PREEMPT_RCU
> @@ -450,6 +453,35 @@ config TREE_RCU_TRACE
>  	  TREE_PREEMPT_RCU implementations, permitting Makefile to
>  	  trivially select kernel/rcutree_trace.c.
> 
> +config RCURING_BH
> +	bool "RCU for bh"
> +	default y
> +	depends on RCURING && !PREEMPT_RT
> +	help
> +	  If Y, use a independent rcu domain for rcu_bh.
> +	  If N, rcu_bh will map to rcu_sched(except rcu_read_lock_bh,
> +	  rcu_read_unlock_bh).

If I understand correctly, "N" defeats the purpose of bh, which is to
have faster grace periods -- not having to wait for a context switch.

> +
> +config RCURING_BH_PREEMPT
> +	bool
> +	depends on PREEMPT_RT
> +	help
> +	  rcu_bh will map to rcu_preempt.

If I understand what you are doing, this config option will break
networking.

> +
> +config RCURING_PREEMPT
> +	bool "RCURING based reemptible RCU"
> +	default n
> +	depends on RCURING && PREEMPT
> +	help
> +	  If Y, normal rcu functionality will map to rcu_preempt.
> +	  If N, normal rcu functionality will map to rcu_sched.
> +
> +config RCURING_COUNTERS_SHIFT
> +	int "RCURING's counter shift"
> +	range 1 10
> +	depends on RCURING
> +	default 4
> +
>  endmenu # "RCU Subsystem"
> 
>  config IKCONFIG
> diff --git a/kernel/Makefile b/kernel/Makefile
> index 92b7420..66eab8c 100644
> --- a/kernel/Makefile
> +++ b/kernel/Makefile
> @@ -86,6 +86,7 @@ obj-$(CONFIG_TREE_PREEMPT_RCU) += rcutree.o
>  obj-$(CONFIG_TREE_RCU_TRACE) += rcutree_trace.o
>  obj-$(CONFIG_TINY_RCU) += rcutiny.o
>  obj-$(CONFIG_TINY_PREEMPT_RCU) += rcutiny.o
> +obj-$(CONFIG_RCURING) += rcuring.o
>  obj-$(CONFIG_RELAY) += relay.o
>  obj-$(CONFIG_SYSCTL) += utsname_sysctl.o
>  obj-$(CONFIG_TASK_DELAY_ACCT) += delayacct.o
> diff --git a/kernel/kernel-offsets.c b/kernel/kernel-offsets.c
> new file mode 100644
> index 0000000..0d30817
> --- /dev/null
> +++ b/kernel/kernel-offsets.c
> @@ -0,0 +1,20 @@
> +/*
> + * Generate definitions needed by assembly language modules.
> + *
> + * Copyright (C) 2008-2010 Lai Jiangshan
> + */
> +
> +#define __GENERATING_KERNEL_OFFSETS__
> +#include <linux/rcupdate.h>
> +#include <linux/sched.h>
> +#include <linux/kbuild.h>
> +
> +void foo(void);
> +
> +void foo(void)
> +{
> +#ifdef CONFIG_RCURING_PREEMPT
> +	OFFSET(TASK_RCU_PREEMPT, task_struct, rcu_task_preempt);
> +#endif
> +}
> +
> diff --git a/kernel/rcuring.c b/kernel/rcuring.c
> new file mode 100644
> index 0000000..e7cedb5
> --- /dev/null
> +++ b/kernel/rcuring.c
> @@ -0,0 +1,1002 @@
> +/*
> + * Read-Copy Update mechanism for mutual exclusion
> + *
> + * This program is free software; you can redistribute it and/or modify
> + * it under the terms of the GNU General Public License as published by
> + * the Free Software Foundation; either version 2 of the License, or
> + * (at your option) any later version.
> + *
> + * This program is distributed in the hope that it will be useful,
> + * but WITHOUT ANY WARRANTY; without even the implied warranty of
> + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
> + * GNU General Public License for more details.
> + *
> + * You should have received a copy of the GNU General Public License
> + * along with this program; if not, write to the Free Software
> + * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
> + *
> + * Copyright IBM Corporation, 2008
> + * Copyright Fujitsu 2008-2010 Lai Jiangshan
> + *
> + * Authors: Dipankar Sarma <dipankar@...ibm.com>
> + *	    Manfred Spraul <manfred@...orfullife.com>
> + *	    Paul E. McKenney <paulmck@...ux.vnet.ibm.com> Hierarchical version
> + *	    Lai Jiangshan <laijs@...fujitsu.com> RCURING version
> + */
> +
> +#include <linux/rcupdate.h>
> +#include <linux/percpu.h>
> +#include <asm/atomic.h>
> +#include <linux/spinlock.h>
> +#include <linux/module.h>
> +#include <linux/interrupt.h>
> +#include <linux/cpu.h>
> +
> +#define RCURING_IDX_SHIFT	CONFIG_RCURING_COUNTERS_SHIFT
> +#define RCURING_IDX_MAX		(1L << RCURING_IDX_SHIFT)
> +#define RCURING_IDX_MASK	(RCURING_IDX_MAX - 1)
> +#define RCURING_IDX(complete)	((complete) & RCURING_IDX_MASK)
> +
> +struct rcuring {
> +	atomic_t	ctr[RCURING_IDX_MAX];
> +	long		curr_complete;
> +	long		done_complete;
> +
> +	raw_spinlock_t	lock;
> +};
> +
> +/* It is long history that we use -300 as an initial complete */
> +#define RCURING_INIT_COMPLETE	-300L
> +#define RCURING_INIT(name)						\
> +{									\
> +	{[RCURING_IDX(RCURING_INIT_COMPLETE)] = ATOMIC_INIT(1),},	\
> +	RCURING_INIT_COMPLETE,						\
> +	RCURING_INIT_COMPLETE - 1,					\
> +	__RAW_SPIN_LOCK_UNLOCKED(name.lock),				\

Please use the gcc-style initializers here.

> +}
> +
> +static inline
> +int rcuring_ctr_read(struct rcuring *rr, unsigned long complete)
> +{
> +	int res;
> +
> +	/* atomic_read() does not ensure to imply barrier(). */
> +	barrier();
> +	res = atomic_read(rr->ctr + RCURING_IDX(complete));
> +	barrier();
> +
> +	return res;
> +}
> +
> +void __rcuring_advance_done_complete(struct rcuring *rr)
> +{
> +	long wait_complete = rr->done_complete + 1;
> +
> +	if (!rcuring_ctr_read(rr, wait_complete)) {
> +		do {
> +			wait_complete++;
> +		} while (!rcuring_ctr_read(rr, wait_complete));
> +
> +		ACCESS_ONCE(rr->done_complete) = wait_complete - 1;
> +	}
> +}

For a given grace period, all quiescent states hit the same counter.
Or am I missing something?

> +
> +static inline
> +int rcuring_could_advance_done_complete(struct rcuring *rr)
> +{
> +	return !rcuring_ctr_read(rr, rr->done_complete + 1);
> +}
> +
> +static inline
> +void rcuring_advance_done_complete(struct rcuring *rr)
> +{
> +	if (rcuring_could_advance_done_complete(rr)) {
> +		if (__raw_spin_trylock(&rr->lock)) {
> +			__rcuring_advance_done_complete(rr);
> +			__raw_spin_unlock(&rr->lock);
> +		}
> +	}
> +}
> +
> +void __rcuring_advance_curr_complete(struct rcuring *rr, long next_complete)
> +{
> +	long curr_complete = rr->curr_complete;
> +
> +	if (curr_complete + 1 == next_complete) {
> +		if (!rcuring_ctr_read(rr, next_complete)) {
> +			ACCESS_ONCE(rr->curr_complete) = next_complete;
> +			smp_mb__before_atomic_inc();
> +			atomic_inc(rr->ctr + RCURING_IDX(next_complete));
> +			smp_mb__after_atomic_inc();
> +			atomic_dec(rr->ctr + RCURING_IDX(curr_complete));
> +		}
> +	}
> +}
> +
> +static inline
> +int rcuring_could_advance_complete(struct rcuring *rr, long next_complete)
> +{
> +	if (rcuring_could_advance_done_complete(rr))
> +		return 1;
> +
> +	if (rr->curr_complete + 1 == next_complete) {
> +		if (!rcuring_ctr_read(rr, next_complete))
> +			return 1;
> +	}
> +
> +	return 0;
> +}
> +
> +static inline
> +void rcuring_advance_complete(struct rcuring *rr, long next_complete)
> +{
> +	if (rcuring_could_advance_complete(rr, next_complete)) {
> +		if (__raw_spin_trylock(&rr->lock)) {
> +			__rcuring_advance_done_complete(rr);
> +			__rcuring_advance_curr_complete(rr, next_complete);
> +			__raw_spin_unlock(&rr->lock);
> +		}
> +	}
> +}
> +
> +static inline
> +void rcuring_advance_complete_force(struct rcuring *rr, long next_complete)
> +{
> +	if (rcuring_could_advance_complete(rr, next_complete)) {
> +		__raw_spin_lock(&rr->lock);
> +		__rcuring_advance_done_complete(rr);
> +		__rcuring_advance_curr_complete(rr, next_complete);
> +		__raw_spin_unlock(&rr->lock);
> +	}
> +}
> +
> +static inline long __locked_complete_adjust(int locked_idx, long complete)
> +{
> +	long locked_complete;
> +
> +	locked_complete = (complete & ~RCURING_IDX_MASK) | (long)locked_idx;
> +	if (locked_complete - complete <= 0)
> +		return locked_complete;
> +	else
> +		return locked_complete - RCURING_IDX_MAX;
> +}
> +
> +static long rcuring_lock(struct rcuring *rr)
> +{
> +	long locked_complete;
> +	int idx;
> +
> +	for (;;) {
> +		idx = RCURING_IDX(ACCESS_ONCE(rr->curr_complete));
> +		if (likely(atomic_inc_not_zero(rr->ctr + idx)))
> +			break;
> +	}
> +	smp_mb__after_atomic_inc();
> +
> +	locked_complete = ACCESS_ONCE(rr->curr_complete);
> +	if (likely(RCURING_IDX(locked_complete) == idx))
> +		return locked_complete;
> +	else
> +		return __locked_complete_adjust(idx, locked_complete);
> +}
> +
> +static void rcuring_unlock(struct rcuring *rr, long locked_complete)
> +{
> +	smp_mb__before_atomic_dec();
> +
> +	atomic_dec(rr->ctr + RCURING_IDX(locked_complete));
> +}
> +
> +static inline
> +void rcuring_dup_lock(struct rcuring *rr, long locked_complete)
> +{
> +	atomic_inc(rr->ctr + RCURING_IDX(locked_complete));
> +}
> +
> +static inline
> +long rcuring_advance_lock(struct rcuring *rr, long locked_complete)
> +{
> +	long new_locked_complete = locked_complete;
> +
> +	if (locked_complete != rr->curr_complete) {
> +		/*
> +		 * Lock the new complete at first, and then release
> +		 * the old one. It prevents errors when NMI/SMI occurs
> +		 * after rcuring_unlock() - we still lock it.
> +		 */
> +		new_locked_complete = rcuring_lock(rr);
> +		rcuring_unlock(rr, locked_complete);
> +	}
> +
> +	return new_locked_complete;
> +}
> +
> +static inline long rcuring_get_done_complete(struct rcuring *rr)
> +{
> +	return ACCESS_ONCE(rr->done_complete);
> +}
> +
> +static inline long rcuring_get_curr_complete(struct rcuring *rr)
> +{
> +	return ACCESS_ONCE(rr->curr_complete);
> +}
> +
> +struct rcu_batch {
> +	struct rcu_head *list, **tail;
> +};
> +
> +static void rcu_batch_merge(struct rcu_batch *to, struct rcu_batch *from)
> +{
> +	if (from->list != NULL) {
> +		*to->tail = from->list;
> +		to->tail = from->tail;
> +
> +		from->list = NULL;
> +		from->tail = &from->list;
> +	}
> +}
> +
> +static void rcu_batch_add(struct rcu_batch *batch, struct rcu_head *new)
> +{
> +	*batch->tail = new;
> +	batch->tail = &new->next;
> +}
> +
> +struct rcu_data {
> +	long locked_complete;
> +
> +	/*
> +	 * callbacks are in batches (done_complete, curr_complete]
> +	 * curr_complete - done_complete >= 0
> +	 * curr_complete - done_complete <= RCURING_IDX_MAX
> +	 * domain's curr_complete - curr_complete >= 0
> +	 * domain's done_complete - done_complete >= 0
> +	 *
> +	 * curr_complete == done_complete just means @batch is empty.
> +	 * we have at least one callback in batch[RCURING_IDX(done_complete)]
> +	 * if curr_complete != done_complete
> +	 */
> +	long curr_complete;
> +	long done_complete;
> +
> +	struct rcu_batch batch[RCURING_IDX_MAX];
> +	struct rcu_batch done_batch;
> +
> +	unsigned int qlen;
> +	unsigned long jiffies;
> +	unsigned long stall_jiffies;
> +};
> +
> +struct rcu_domain {
> +	struct rcuring rcuring;
> +	const char *domain_name;
> +	struct rcu_data __percpu *rcu_data;
> +
> +	spinlock_t lock; /* this lock is for fqs or other misc things */
> +	long fqs_complete;
> +	void (*force_quiescent_state)(struct rcu_domain *domain,
> +			long fqs_complete);
> +};
> +
> +#define EXPEDITED_GP_RESERVED_RECOMMEND (RCURING_IDX_SHIFT - 1)
> +
> +static inline long gp_reserved(struct rcu_domain *domain)
> +{
> +	return EXPEDITED_GP_RESERVED_RECOMMEND;
> +}
> +
> +static inline void __init rcu_data_init(struct rcu_data *rdp)
> +{
> +	int idx;
> +
> +	for (idx = 0; idx < RCURING_IDX_MAX; idx++)
> +		rdp->batch[idx].tail = &rdp->batch[idx].list;
> +
> +	rdp->done_batch.tail = &rdp->done_batch.list;
> +}
> +
> +static void do_advance_callbacks(struct rcu_data *rdp, long done_complete)
> +{
> +	int idx;
> +
> +	while (rdp->done_complete != done_complete) {
> +		rdp->done_complete++;
> +		idx = RCURING_IDX(rdp->done_complete);
> +		rcu_batch_merge(&rdp->done_batch, rdp->batch + idx);
> +	}
> +}
> +
> +/* Is value in the range (@left_open, @right_close] */
> +static inline bool in_range(long left_open, long right_close, long value)
> +{
> +	return left_open - value < 0 && value - right_close <= 0;
> +}
> +
> +static void advance_callbacks(struct rcu_data *rdp, long done_complete,
> +		long curr_complete)
> +{
> +	if (in_range(done_complete, curr_complete, rdp->curr_complete)) {
> +		do_advance_callbacks(rdp, done_complete);
> +	} else {
> +		do_advance_callbacks(rdp, rdp->curr_complete);
> +		rdp->curr_complete = done_complete;
> +		rdp->done_complete = done_complete;
> +	}
> +}
> +
> +static void __force_quiescent_state(struct rcu_domain *domain,
> +		long fqs_complete)
> +{
> +	int cpu;
> +	long fqs_complete_old;
> +	long curr_complete = rcuring_get_curr_complete(&domain->rcuring);
> +	long done_complete = rcuring_get_done_complete(&domain->rcuring);
> +
> +	spin_lock(&domain->lock);
> +
> +	fqs_complete_old = domain->fqs_complete;
> +	if (!in_range(done_complete, curr_complete, fqs_complete_old))
> +		fqs_complete_old = done_complete;
> +
> +	if (fqs_complete_old - fqs_complete >= 0) {
> +		spin_unlock(&domain->lock);
> +		return;
> +	}
> +
> +	domain->fqs_complete = fqs_complete;
> +	spin_unlock(&domain->lock);
> +
> +	for_each_online_cpu(cpu) {
> +		struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu);
> +		long locked_complete = ACCESS_ONCE(rdp->locked_complete);
> +
> +		if (!in_range(fqs_complete_old, fqs_complete, locked_complete))
> +			continue;

Is preemption disabled here?

> +
> +		if (cpu == smp_processor_id())
> +			set_tsk_need_resched(current);
> +		else
> +			smp_send_reschedule(cpu);
> +	}
> +}
> +
> +static void force_quiescent_state(struct rcu_domain *domain, long fqs_complete)
> +{
> +	domain->force_quiescent_state(domain, fqs_complete);
> +}
> +
> +static
> +long prepare_for_new_callback(struct rcu_domain *domain, struct rcu_data *rdp)
> +{
> +	long curr_complete, done_complete;
> +	struct rcuring *rr = &domain->rcuring;
> +
> +	smp_mb();
> +	curr_complete = rcuring_get_curr_complete(rr);
> +	done_complete = rcuring_get_done_complete(rr);
> +
> +	advance_callbacks(rdp, done_complete, curr_complete);
> +	rdp->curr_complete = curr_complete;
> +
> +	if (!rdp->qlen) {
> +		rdp->jiffies = jiffies;
> +		rdp->stall_jiffies = jiffies;
> +	}
> +
> +	return curr_complete;
> +}
> +
> +static long __call_rcu(struct rcu_domain *domain, struct rcu_head *head,
> +		void (*func)(struct rcu_head *))
> +{
> +	struct rcu_data *rdp;
> +	long curr_complete;
> +
> +	head->next = NULL;
> +	head->func = func;
> +
> +	rdp = this_cpu_ptr(domain->rcu_data);
> +	curr_complete = prepare_for_new_callback(domain, rdp);
> +	rcu_batch_add(rdp->batch + RCURING_IDX(curr_complete), head);
> +	rdp->qlen++;
> +
> +	return curr_complete;
> +}
> +
> +static void print_rcuring_stall(struct rcu_domain *domain)
> +{
> +	struct rcuring *rr = &domain->rcuring;
> +	unsigned long curr_complete = rcuring_get_curr_complete(rr);
> +	unsigned long done_complete = rcuring_get_done_complete(rr);
> +	int cpu;
> +
> +	printk(KERN_ERR "RCU is stall, cpu=%d, domain_name=%s\n", smp_processor_id(),
> +			domain->domain_name);
> +
> +	printk(KERN_ERR "domain's complete(done/curr)=%ld/%ld\n",
> +			curr_complete, done_complete);
> +	printk(KERN_ERR "curr_ctr=%d, wait_ctr=%d\n",
> +			rcuring_ctr_read(rr, curr_complete),
> +			rcuring_ctr_read(rr, done_complete + 1));
> +
> +	for_each_online_cpu(cpu) {
> +		struct rcu_data *rdp = per_cpu_ptr(domain->rcu_data, cpu);
> +		printk(KERN_ERR "cpu=%d, qlen=%d, complete(locked/dome/curr/)="
> +				"%ld/%ld/%ld\n", cpu, rdp->qlen,
> +				rdp->locked_complete, rdp->done_complete,
> +				rdp->curr_complete);
> +	}
> +}
> +
> +static void __rcu_check_callbacks(struct rcu_domain *domain,
> +		struct rcu_data *rdp, int in_rcu_softirq)
> +{
> +	long curr_complete, done_complete;
> +	struct rcuring *rr = &domain->rcuring;
> +
> +	if (!rdp->qlen)
> +		return;
> +
> +	rcuring_advance_done_complete(rr);
> +
> +	curr_complete = rcuring_get_curr_complete(rr);
> +	done_complete = ACCESS_ONCE(rr->done_complete);
> +	advance_callbacks(rdp, done_complete, curr_complete);
> +
> +	if (rdp->curr_complete == curr_complete
> +			&& rdp->batch[RCURING_IDX(rdp->curr_complete)].list) {
> +		long max_gp_allow = RCURING_IDX_MAX - gp_reserved(domain);
> +
> +		if (curr_complete - done_complete <= max_gp_allow)
> +			rcuring_advance_complete(rr, curr_complete + 1);
> +	}
> +
> +	if (rdp->done_batch.list) {
> +		if (!in_rcu_softirq)
> +			raise_softirq(RCU_SOFTIRQ);
> +	} else {
> +		if (jiffies - rdp->jiffies > 10) {
> +			force_quiescent_state(domain, rdp->curr_complete);
> +			rdp->jiffies = jiffies;
> +		}
> +		if (jiffies - rdp->stall_jiffies > 2 * HZ) {
> +			print_rcuring_stall(domain);
> +			rdp->stall_jiffies = jiffies;
> +		}
> +	}
> +}
> +
> +static void rcu_do_batch(struct rcu_domain *domain, struct rcu_data *rdp)
> +{
> +	int count = 0;
> +	struct rcu_head *list, *next;
> +
> +	list = rdp->done_batch.list;
> +
> +	if (list) {
> +		rdp->done_batch.list = NULL;
> +		rdp->done_batch.tail = &rdp->done_batch.list;
> +
> +		local_irq_enable();
> +
> +		smp_mb();
> +
> +		while (list) {
> +			next = list->next;
> +			prefetch(next);
> +			list->func(list);
> +			count++;
> +			list = next;
> +		}
> +		local_irq_disable();
> +
> +		rdp->qlen -= count;
> +		rdp->jiffies = jiffies;
> +	}
> +}
> +
> +static int __rcu_needs_cpu(struct rcu_data *rdp)
> +{
> +	return !!rdp->qlen;
> +}
> +
> +static inline
> +void __rcu_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp)
> +{
> +	rdp->locked_complete = rcuring_advance_lock(&domain->rcuring,
> +			rdp->locked_complete);
> +}
> +
> +static inline void rcu_preempt_save_complete(struct rcu_task_preempt *rcu_task,
> +		long locked_complete)
> +{
> +	unsigned int new_flags;
> +
> +	new_flags = RCURING_PREEMPT_FLAGS | RCURING_IDX(locked_complete);
> +	ACCESS_ONCE(rcu_task->flags) = new_flags;
> +}
> +
> +/*
> + * Every cpu hold a lock_complete. But for preempt rcu, any task may also
> + * hold a lock_complete. This function increases lock_complete for the current
> + * cpu and check(dup_lock_and_save or release or advance) the lock_complete of
> + * the current task.
> + */
> +static inline
> +void __rcu_preempt_qsctr_inc(struct rcu_domain *domain, struct rcu_data *rdp,
> +		struct rcu_task_preempt *rcu_task)
> +{
> +	long locked_complete = rdp->locked_complete;
> +	unsigned int flags = ACCESS_ONCE(rcu_task->flags);
> +
> +	if (!(flags & RCURING_PREEMPT_SAVED)) {
> +		BUG_ON(flags & RCURING_PREEMPT_QS);
> +		if (rcu_task->nesting) {
> +			/* dup_lock_and_save current task's lock_complete */
> +			rcuring_dup_lock(&domain->rcuring, locked_complete);
> +			rcu_preempt_save_complete(rcu_task, locked_complete);
> +		}
> +	} else if (!(rcu_task->nesting)) {
> +		/* release current task's lock_complete */
> +		rcuring_unlock(&domain->rcuring, (long)flags);
> +		ACCESS_ONCE(rcu_task->flags) = 0;
> +	} else if (!(flags & RCURING_PREEMPT_QS)) {
> +		/* advance_and_save current task's lock_complete */
> +		if (RCURING_IDX(locked_complete) != RCURING_IDX((long)flags)) {
> +			rcuring_unlock(&domain->rcuring, (long)flags);
> +			rcuring_dup_lock(&domain->rcuring, locked_complete);
> +		}
> +		rcu_preempt_save_complete(rcu_task, locked_complete);
> +	}
> +
> +	/* increases lock_complete for the current cpu */
> +	rdp->locked_complete = rcuring_advance_lock(&domain->rcuring,
> +			locked_complete);
> +}

When we need to boost the priority of preempted readers, how do we tell
who they are?

> +
> +static void __synchronize_rcu(struct rcu_domain *domain, int expedited)
> +{
> +	struct rcu_synchronize rcu;
> +	long complete;
> +
> +	init_completion(&rcu.completion);
> +
> +	local_irq_disable();
> +	complete = __call_rcu(domain, &rcu.head, wakeme_after_rcu);
> +
> +	if (expedited) {
> +		/*
> +		 * Fore a new gp to be started immediately(can use reserved gp).
> +		 * But if all gp are used, it will fail to start new gp,
> +		 * and we have to wait until some new gps available.
> +		 */
> +		rcuring_advance_complete_force(&domain->rcuring,
> +				complete + 1);

It looks to me like rcuring_advance_complete_force() can just fail
silently.  How does this work?

> +
> +		/* Fore qs and expedite the new gp */
> +		force_quiescent_state(domain, complete);
> +	}
> +	local_irq_enable();
> +
> +	wait_for_completion(&rcu.completion);
> +}
> +
> +static inline long __rcu_batches_completed(struct rcu_domain *domain)
> +{
> +	return rcuring_get_done_complete(&domain->rcuring);
> +}
> +
> +#ifdef CONFIG_RCURING_BH
> +#define gen_code_for_bh(gen_code, args...) gen_code(bh, ##args)

Where is gen_code() defined?

> +#else
> +#define gen_code_for_bh(gen_code, args...)
> +#endif
> +#define gen_code_for_sched(gen_code, args...) gen_code(sched, ##args)
> +#ifdef CONFIG_RCURING_PREEMPT
> +#define gen_code_for_preempt(gen_code, args...) gen_code(preempt, ##args)
> +#else
> +#define gen_code_for_preempt(gen_code, args...)
> +#endif
> +
> +#define GEN_CODES(gen_code, args...)			\
> +	gen_code_for_bh(gen_code, ##args)		\
> +	gen_code_for_sched(gen_code, ##args)		\
> +	gen_code_for_preempt(gen_code, ##args)		\
> +
> +#define gen_basic_code(domain)						\
> +									\
> +static void force_quiescent_state_##domain(struct rcu_domain *domain,	\
> +		long fqs_complete);					\
> +									\
> +static DEFINE_PER_CPU(struct rcu_data, rcu_data_##domain);		\
> +									\
> +struct rcu_domain rcu_domain_##domain =					\
> +{									\
> +	.rcuring = RCURING_INIT(rcu_domain_##domain.rcuring),		\
> +	.domain_name = #domain,						\
> +	.rcu_data = &rcu_data_##domain,					\
> +	.lock = __SPIN_LOCK_UNLOCKED(rcu_domain_##domain.lock),		\
> +	.force_quiescent_state = force_quiescent_state_##domain,	\
> +};									\
> +									\
> +void call_rcu_##domain(struct rcu_head *head,				\
> +		void (*func)(struct rcu_head *))			\
> +{									\
> +	unsigned long flags;						\
> +									\
> +	local_irq_save(flags);						\
> +	__call_rcu(&rcu_domain_##domain, head, func);			\
> +	local_irq_restore(flags);					\
> +}									\
> +EXPORT_SYMBOL_GPL(call_rcu_##domain);					\
> +									\
> +void synchronize_rcu_##domain(void)					\
> +{									\
> +	__synchronize_rcu(&rcu_domain_##domain, 0);			\
> +}									\
> +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain);				\
> +									\
> +void synchronize_rcu_##domain##_expedited(void)				\
> +{									\
> +	__synchronize_rcu(&rcu_domain_##domain, 1);			\
> +}									\
> +EXPORT_SYMBOL_GPL(synchronize_rcu_##domain##_expedited);		\
> +									\
> +long rcu_batches_completed_##domain(void)				\
> +{									\
> +	return __rcu_batches_completed(&rcu_domain_##domain);		\
> +}									\
> +EXPORT_SYMBOL_GPL(rcu_batches_completed_##domain);			\
> +									\
> +void rcu_##domain##_force_quiescent_state(void)				\
> +{									\
> +	force_quiescent_state(&rcu_domain_##domain, 0);			\
> +}									\
> +EXPORT_SYMBOL_GPL(rcu_##domain##_force_quiescent_state);

I used to use a single macro for the synchronize_rcu*() APIs, but was
talked into separate functions.  Maybe some of the more ornate recent
macros have shifted people away from duplicate code in functions, so
might be time to try again.  ;-)

> +GEN_CODES(gen_basic_code)
> +
> +static DEFINE_PER_CPU(int, kernel_count);
> +
> +#define LOCK_COMPLETE(domain)						\
> +	long complete_##domain = rcuring_lock(&rcu_domain_##domain.rcuring);
> +#define SAVE_COMPLETE(domain)						\
> +	per_cpu(rcu_data_##domain, cpu).locked_complete = complete_##domain;
> +#define LOAD_COMPLETE(domain)						\
> +	int complete_##domain = per_cpu(rcu_data_##domain, cpu).locked_complete;
> +#define RELEASE_COMPLETE(domain)					\
> +	rcuring_unlock(&rcu_domain_##domain.rcuring, complete_##domain);
> +
> +static void __rcu_kernel_enter_outmost(int cpu)
> +{
> +	GEN_CODES(LOCK_COMPLETE)
> +
> +	barrier();
> +	per_cpu(kernel_count, cpu) = 1;
> +	barrier();
> +
> +	GEN_CODES(SAVE_COMPLETE)
> +}
> +
> +static void __rcu_kernel_exit_outmost(int cpu)
> +{
> +	GEN_CODES(LOAD_COMPLETE)
> +
> +	barrier();
> +	per_cpu(kernel_count, cpu) = 0;
> +	barrier();
> +
> +	GEN_CODES(RELEASE_COMPLETE)
> +}
> +
> +#ifdef CONFIG_RCURING_BH
> +static void force_quiescent_state_bh(struct rcu_domain *domain,
> +		long fqs_complete)
> +{
> +	__force_quiescent_state(domain, fqs_complete);
> +}
> +
> +static inline void rcu_bh_qsctr_inc_irqoff(int cpu)
> +{
> +	__rcu_qsctr_inc(&rcu_domain_bh, &per_cpu(rcu_data_bh, cpu));
> +}
> +
> +void rcu_bh_qs(int cpu)
> +{
> +	unsigned long flags;
> +
> +	local_irq_save(flags);
> +	rcu_bh_qsctr_inc_irqoff(cpu);
> +	local_irq_restore(flags);
> +}
> +
> +#else /* CONFIG_RCURING_BH */
> +static inline void rcu_bh_qsctr_inc_irqoff(int cpu) { (void)(cpu); }
> +#endif /* CONFIG_RCURING_BH */
> +
> +static void force_quiescent_state_sched(struct rcu_domain *domain,
> +		long fqs_complete)
> +{
> +	__force_quiescent_state(domain, fqs_complete);
> +}
> +
> +static inline void rcu_sched_qsctr_inc_irqoff(int cpu)
> +{
> +	__rcu_qsctr_inc(&rcu_domain_sched, &per_cpu(rcu_data_sched, cpu));
> +}
> +
> +#ifdef CONFIG_RCURING_PREEMPT
> +static void force_quiescent_state_preempt(struct rcu_domain *domain,
> +		long fqs_complete)
> +{
> +	/* To Be Implemented */
> +}
> +
> +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu)
> +{
> +	__rcu_preempt_qsctr_inc(&rcu_domain_preempt,
> +			&per_cpu(rcu_data_preempt, cpu),
> +			&current->rcu_task_preempt);
> +}
> +
> +#else /* CONFIG_RCURING_PREEMPT */
> +static inline void rcu_preempt_qsctr_inc_irqoff(int cpu)
> +{
> +	(void)(cpu);
> +	(void)(__rcu_preempt_qsctr_inc);
> +}
> +#endif /* CONFIG_RCURING_PREEMPT */
> +
> +#define gen_needs_cpu(domain, cpu)					\
> +	if (__rcu_needs_cpu(&per_cpu(rcu_data_##domain, cpu)))		\
> +		return 1;
> +int rcu_needs_cpu(int cpu)
> +{
> +	GEN_CODES(gen_needs_cpu, cpu)
> +	return 0;
> +}
> +
> +#define call_func(domain, func, args...)				\
> +	func(&rcu_domain_##domain, &__get_cpu_var(rcu_data_##domain), ##args);
> +
> +/*
> + * Note a context switch. This is a quiescent state for RCU-sched,
> + * and requires special handling for preemptible RCU.
> + */
> +void rcu_note_context_switch(int cpu)
> +{
> +	unsigned long flags;
> +
> +#ifdef CONFIG_HOTPLUG_CPU
> +	/*
> +	 * The stoper thread need to schedule() to the idle thread
> +	 * after CPU_DYING.
> +	 */
> +	if (unlikely(!per_cpu(kernel_count, cpu))) {
> +		BUG_ON(cpu_online(cpu));
> +		return;
> +	}
> +#endif
> +
> +	local_irq_save(flags);
> +	rcu_bh_qsctr_inc_irqoff(cpu);
> +	rcu_sched_qsctr_inc_irqoff(cpu);
> +	rcu_preempt_qsctr_inc_irqoff(cpu);
> +	local_irq_restore(flags);
> +}
> +
> +static int rcu_cpu_idle(int cpu)
> +{
> +	return idle_cpu(cpu) && rcu_scheduler_active &&
> +		!in_softirq() && hardirq_count() <= (1 << HARDIRQ_SHIFT);
> +}
> +
> +void rcu_check_callbacks(int cpu, int user)
> +{
> +	unsigned long flags;
> +
> +	local_irq_save(flags);
> +
> +	if (user || rcu_cpu_idle(cpu)) {
> +		rcu_bh_qsctr_inc_irqoff(cpu);
> +		rcu_sched_qsctr_inc_irqoff(cpu);
> +		rcu_preempt_qsctr_inc_irqoff(cpu);
> +	} else if (!in_softirq())
> +		rcu_bh_qsctr_inc_irqoff(cpu);
> +
> +	GEN_CODES(call_func, __rcu_check_callbacks, 0)
> +	local_irq_restore(flags);
> +}
> +
> +static void rcu_process_callbacks(struct softirq_action *unused)
> +{
> +	local_irq_disable();
> +	GEN_CODES(call_func, __rcu_check_callbacks, 1)
> +	GEN_CODES(call_func, rcu_do_batch)
> +	local_irq_enable();
> +}
> +
> +static void __cpuinit __rcu_offline_cpu(struct rcu_domain *domain,
> +		struct rcu_data *off_rdp, struct rcu_data *rdp)
> +{
> +	if (off_rdp->qlen > 0) {
> +		unsigned long flags;
> +		long curr_complete;
> +
> +		/* move all callbacks in @off_rdp to @off_rdp->done_batch */
> +		do_advance_callbacks(off_rdp, off_rdp->curr_complete);
> +
> +		/* move all callbacks in @off_rdp to this cpu(@rdp) */
> +		local_irq_save(flags);
> +		curr_complete = prepare_for_new_callback(domain, rdp);
> +		rcu_batch_merge(rdp->batch + RCURING_IDX(curr_complete),
> +				&off_rdp->done_batch);
> +		rdp->qlen += off_rdp->qlen;
> +		local_irq_restore(flags);
> +
> +		off_rdp->qlen = 0;
> +	}
> +}
> +
> +#define gen_offline(domain, cpu, recieve_cpu)				\
> +	__rcu_offline_cpu(&rcu_domain_##domain,				\
> +			&per_cpu(rcu_data_##domain, cpu),		\
> +			&per_cpu(rcu_data_##domain, recieve_cpu));
> +
> +static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
> +				unsigned long action, void *hcpu)
> +{
> +	int cpu = (long)hcpu;
> +	int recieve_cpu;
> +
> +	(void)(recieve_cpu);
> +	(void)(__rcu_offline_cpu);
> +	(void)(__rcu_kernel_exit_outmost);
> +
> +	switch (action) {
> +#ifdef CONFIG_HOTPLUG_CPU
> +	case CPU_DYING:
> +	case CPU_DYING_FROZEN:
> +		/*
> +		 * the machine is stopped now, we can access to
> +		 * any other cpu data.
> +		 * */
> +		recieve_cpu = cpumask_any_but(cpu_online_mask, cpu);
> +		GEN_CODES(gen_offline, cpu, recieve_cpu)
> +		__rcu_kernel_exit_outmost(cpu);
> +		break;
> +#endif
> +	case CPU_STARTING:
> +	case CPU_STARTING_FROZEN:
> +		__rcu_kernel_enter_outmost(cpu);
> +	default:
> +		break;
> +	}
> +	return NOTIFY_OK;
> +}
> +
> +#define rcu_init_domain_data(domain, cpu)			\
> +	for_each_possible_cpu(cpu)				\
> +		rcu_data_init(&per_cpu(rcu_data_##domain, cpu));
> +
> +void __init rcu_init(void)
> +{
> +	int cpu;
> +
> +	GEN_CODES(rcu_init_domain_data, cpu)

This one actually consumes more lines than would writing out the
calls explicitly.  ;-)

> +
> +	__rcu_kernel_enter_outmost(raw_smp_processor_id());
> +	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
> +	cpu_notifier(rcu_cpu_notify, 0);
> +}
> +
> +int rcu_scheduler_active __read_mostly;
> +EXPORT_SYMBOL_GPL(rcu_scheduler_active);
> +
> +/*
> + * This function is invoked towards the end of the scheduler's initialization
> + * process. Before this is called, the idle task might contain
> + * RCU read-side critical sections (during which time, this idle
> + * task is booting the system). After this function is called, the
> + * idle tasks are prohibited from containing RCU read-side critical
> + * sections. This function also enables RCU lockdep checking.
> + */
> +void rcu_scheduler_starting(void)
> +{
> +	rcu_scheduler_active = 1;
> +}
> +
> +#ifdef CONFIG_NO_HZ
> +
> +void rcu_kernel_enter(void)
> +{
> +	unsigned long flags;
> +
> +	raw_local_irq_save(flags);
> +	if (__get_cpu_var(kernel_count) == 0)
> +		__rcu_kernel_enter_outmost(raw_smp_processor_id());
> +	else
> +		__get_cpu_var(kernel_count)++;
> +	raw_local_irq_restore(flags);
> +}
> +
> +void rcu_kernel_exit(void)
> +{
> +	unsigned long flags;
> +
> +	raw_local_irq_save(flags);
> +	if (__get_cpu_var(kernel_count) == 1)
> +		__rcu_kernel_exit_outmost(raw_smp_processor_id());
> +	else
> +		__get_cpu_var(kernel_count)--;
> +	raw_local_irq_restore(flags);
> +}
> +
> +void rcu_enter_nohz(void)
> +{
> +	unsigned long flags;
> +
> +	raw_local_irq_save(flags);
> +	__rcu_kernel_exit_outmost(raw_smp_processor_id());
> +	raw_local_irq_restore(flags);
> +}
> +
> +void rcu_exit_nohz(void)
> +{
> +	unsigned long flags;
> +
> +	raw_local_irq_save(flags);
> +	__rcu_kernel_enter_outmost(raw_smp_processor_id());
> +	raw_local_irq_restore(flags);
> +}
> +#endif /* CONFIG_NO_HZ */
> +
> +static DEFINE_PER_CPU(struct rcu_head, rcu_barrier_head);
> +static struct completion rcu_barrier_completion;
> +static struct kref rcu_barrier_wait_ref;
> +static DEFINE_MUTEX(rcu_barrier_mutex);
> +
> +static void rcu_barrier_release_ref(struct kref *notused)
> +{
> +	complete(&rcu_barrier_completion);
> +}
> +
> +static void rcu_barrier_callback(struct rcu_head *notused)
> +{
> +	kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref);
> +}
> +
> +static void rcu_barrier_func(void *data)
> +{
> +	struct rcu_domain *rcu_domain = data;
> +
> +	kref_get(&rcu_barrier_wait_ref);
> +	__call_rcu(rcu_domain, &__get_cpu_var(rcu_barrier_head),
> +			rcu_barrier_callback);
> +}
> +
> +static void __rcu_barrier(struct rcu_domain *rcu_domain)
> +{
> +	mutex_lock(&rcu_barrier_mutex);
> +
> +	init_completion(&rcu_barrier_completion);
> +	kref_init(&rcu_barrier_wait_ref);
> +
> +	/* queue barrier rcu_heads for every cpu */
> +	on_each_cpu(rcu_barrier_func, rcu_domain, 1);
> +
> +	kref_put(&rcu_barrier_wait_ref, rcu_barrier_release_ref);
> +	wait_for_completion(&rcu_barrier_completion);
> +
> +	mutex_unlock(&rcu_barrier_mutex);
> +}
> +
> +#define gen_rcu_barrier(domain)						\
> +void rcu_barrier_##domain(void)						\
> +{									\
> +	__rcu_barrier(&rcu_domain_##domain);				\
> +}									\
> +EXPORT_SYMBOL_GPL(rcu_barrier_##domain);
> +
> +GEN_CODES(gen_rcu_barrier)
> +
> diff --git a/kernel/sched.c b/kernel/sched.c
> index 09b574e..967f25a 100644
> --- a/kernel/sched.c
> +++ b/kernel/sched.c
> @@ -9113,6 +9113,7 @@ struct cgroup_subsys cpuacct_subsys = {
>  };
>  #endif	/* CONFIG_CGROUP_CPUACCT */
> 
> +#ifndef CONFIG_RCURING
>  #ifndef CONFIG_SMP
> 
>  void synchronize_sched_expedited(void)
> @@ -9182,3 +9183,4 @@ void synchronize_sched_expedited(void)
>  EXPORT_SYMBOL_GPL(synchronize_sched_expedited);
> 
>  #endif /* #else #ifndef CONFIG_SMP */
> +#endif /* CONFIG_RCURING */
> diff --git a/kernel/time/tick-sched.c b/kernel/time/tick-sched.c
> index 3e216e0..0eba561 100644
> --- a/kernel/time/tick-sched.c
> +++ b/kernel/time/tick-sched.c
> @@ -269,6 +269,10 @@ void tick_nohz_stop_sched_tick(int inidle)
>  	cpu = smp_processor_id();
>  	ts = &per_cpu(tick_cpu_sched, cpu);
> 
> +	/* Don't enter nohz when cpu is offlining, it is going to die */
> +	if (cpu_is_offline(cpu))
> +		goto end;
> +
>  	/*
>  	 * Call to tick_nohz_start_idle stops the last_update_time from being
>  	 * updated. Thus, it must not be called in the event we are called from
> 
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ