lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20080825000738.GA24339@linux.vnet.ibm.com>
Date:	Sun, 24 Aug 2008 17:07:38 -0700
From:	"Paul E. McKenney" <paulmck@...ux.vnet.ibm.com>
To:	linux-kernel@...r.kernel.org
Cc:	cl@...ux-foundation.org, mingo@...e.hu, akpm@...ux-foundation.org,
	manfred@...orfullife.com, dipankar@...ibm.com,
	josht@...ux.vnet.ibm.com, schamp@....com, niv@...ibm.com,
	dvhltc@...ibm.com, ego@...ibm.com, laijs@...fujitsu.com,
	rostedt@...dmis.org
Subject: [PATCH, RFC, tip/core/rcu] v2 scalable classic RCU implementation

Hello!

Still experimental, not for inclusion.

Updates from earlier version:

o	Handles dyntick idle state, including interrupts and NMIs,
	but while allowing dyntick-idle CPUs to remain idle (in
	contrast to the previous version, which simply whacked all
	non-responding CPUs with a resched IPI).

o	Made force_quiescent_state() more intelligent, so that it
	no longer whacks all CPUs whether they need it or not.

o	Cleaned up cpp code that determines size and shape of the
	rcu_node hierarchy.

o	Added debugfs tracing capability (was in previous patch,
	but forgot to mention it).

Attached is an updated patch to Classic RCU that applies a hierarchy,
greatly reducing the contention on the top-level lock for large machines.
This passes mild rcutorture testing on x86 and ppc64, but is most
definitely not ready for inclusion.  It is OK for experimental work
assuming sufficiently brave experimenters.  See also Manfred Spraul's
patch at http://lkml.org/lkml/2008/8/21/336 (or his earlier work from
2004 at http://marc.info/?l=linux-kernel&m=108546384711797&w=2).  We will
converge onto a common patch in the fullness of time, but are currently
exploring different regions of the design space.

This patch provides CONFIG_RCU_FANOUT, which controls the bushiness
of the RCU hierarchy.  Defaults to 32 on 32-bit machines and 64 on
64-bit machines.  If CONFIG_NR_CPUS is less than CONFIG_RCU_FANOUT,
there is no hierarchy.  By default, the RCU initialization code will
adjust CONFIG_RCU_FANOUT to balance the hierarchy, so strongly NUMA
architectures may choose to set CONFIG_RCU_FANOUT_EXACT to disable
this balancing, allowing the hierarchy to be exactly aligned to the
underlying hardware.  Up to two levels of hierarchy are permitted
(in addition to the root node), allowing up to 16,384 CPUs on 32-bit
systems and up to 262,144 CPUs on 64-bit systems.  I just know that I
am going to regret saying this, but this seems more than sufficient
for the foreseeable future.  (Some architectures might wish to set
CONFIG_RCU_FANOUT=4, which would limit such architectures to 64 CPUs.
If this becomes a real problem, additional levels can be added, but I
doubt that it will make a significant difference on real hardware.)

In the common case, a given CPU will manipulate its private rcu_data
structure and the rcu_node structure that it shares with its immediate
neighbors.  This can reduce both lock and memory contention by multiple
orders of magnitude, which should eliminate the need for the strange
manipulations that are reported to be required when running Linux on
very large systems.

Some shortcomings:

o	Entering and leaving dynticks idle mode is a quiescent state,
	but the current patch doesn't take advantage of this (noted
	by Manfred).  If there was an arch-independent in_nmi() or
	some such, it would also be possible to take advantage of
	interrupts from dyntick-idle mode -- and simplify the dynticks
	code interfacing to RCU.

o	CPU onlining and offlining is probably broken.  Testing in
	progress.

o	The check-CPU-stalls code is busted.  Will be fixed.

o	There are probably hangs, rcutorture failures, &c.

o	There is not yet a human-readable design document.  Will be fixed.

o	The largest machine I can get my hands on at the moment only
	has 8 CPUs, which really doesn't stress this algorithm much.

If you want to use this against a Linus kernel, the following will work:

Start with 2.6.27-rc3.

Apply http://www.rdrop.com/users/paulmck/patches/paulmck-rcu.2008.08.20a.patch
which catches you up to a recent linux-2.6-tip tip/core/rcu commit.

Apply http://www.rdrop.com/users/paulmck/patches/2.6.27-rc3-hierRCU-9.patch
which gets you the current hierarchical RCU implementation.

Thoughts?

Signed-off-by: Paul E. McKenney <paulmck@...ux.vnet.ibm.com>
---

 include/linux/hardirq.h    |    4 
 include/linux/rcuclassic.h |  221 +++++--
 kernel/Kconfig.preempt     |   32 +
 kernel/Makefile            |    5 
 kernel/rcuclassic.c        | 1374 ++++++++++++++++++++++++++++++++-------------
 kernel/rcuclassic_trace.c  |  218 +++++++
 6 files changed, 1421 insertions(+), 433 deletions(-)

diff --git a/include/linux/hardirq.h b/include/linux/hardirq.h
index 181006c..a776bf0 100644
--- a/include/linux/hardirq.h
+++ b/include/linux/hardirq.h
@@ -118,13 +118,13 @@ static inline void account_system_vtime(struct task_struct *tsk)
 }
 #endif
 
-#if defined(CONFIG_PREEMPT_RCU) && defined(CONFIG_NO_HZ)
+#if defined(CONFIG_NO_HZ)
 extern void rcu_irq_enter(void);
 extern void rcu_irq_exit(void);
 #else
 # define rcu_irq_enter() do { } while (0)
 # define rcu_irq_exit() do { } while (0)
-#endif /* CONFIG_PREEMPT_RCU */
+#endif /* #if defined(CONFIG_NO_HZ) */
 
 /*
  * It is safe to do non-atomic ops on ->hardirq_context,
diff --git a/include/linux/rcuclassic.h b/include/linux/rcuclassic.h
index 1658995..b21300f 100644
--- a/include/linux/rcuclassic.h
+++ b/include/linux/rcuclassic.h
@@ -15,19 +15,16 @@
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  *
- * Copyright IBM Corporation, 2001
+ * Copyright IBM Corporation, 2008
  *
  * Author: Dipankar Sarma <dipankar@...ibm.com>
+ *	   Paul E. McKenney <paulmck@...ux.vnet.ibm.com> Hierarchical algorithm
  *
  * Based on the original work by Paul McKenney <paulmck@...ibm.com>
  * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
- * Papers:
- * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
- * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
  *
  * For detailed explanation of Read-Copy Update mechanism see -
- * 		Documentation/RCU
- *
+ * 	Documentation/RCU
  */
 
 #ifndef __LINUX_RCUCLASSIC_H
@@ -40,69 +37,159 @@
 #include <linux/cpumask.h>
 #include <linux/seqlock.h>
 
+/*
+ * Define shape of hierarchy based on NR_CPUS and CONFIG_RCU_FANOUT.
+ * In theory, it should be possible to add more levels straightforwardly.
+ * In practice, this has not been tested, so there is probably some
+ * bug somewhere.
+ */
+#define MAX_RCU_LVLS 3
+#define RCU_FANOUT	      (CONFIG_RCU_FANOUT)
+#define RCU_FANOUT_SQ	      (RCU_FANOUT * RCU_FANOUT)
+#define RCU_FANOUT_CUBE	      (RCU_FANOUT_SQ * RCU_FANOUT)
 
-/* Global control variables for rcupdate callback mechanism. */
-struct rcu_ctrlblk {
-	long	cur;		/* Current batch number.                      */
-	long	completed;	/* Number of the last completed batch         */
-	long	pending;	/* Number of the last pending batch           */
-#ifdef CONFIG_DEBUG_RCU_STALL
-	unsigned long gp_check;	/* Time grace period should end, in seconds.  */
-#endif /* #ifdef CONFIG_DEBUG_RCU_STALL */
+#if (NR_CPUS) <= RCU_FANOUT
+#  define NUM_RCU_LVLS	      1
+#  define NUM_RCU_LVL_0	      1
+#  define NUM_RCU_LVL_1	      (NR_CPUS)
+#  define NUM_RCU_LVL_2	      0
+#  define NUM_RCU_LVL_3	      0
+#elif (NR_CPUS) <= RCU_FANOUT_SQ
+#  define NUM_RCU_LVLS	      2
+#  define NUM_RCU_LVL_0	      1
+#  define NUM_RCU_LVL_1	      (((NR_CPUS) + RCU_FANOUT - 1) / RCU_FANOUT)
+#  define NUM_RCU_LVL_2	      (NR_CPUS)
+#  define NUM_RCU_LVL_3	      0
+#elif (NR_CPUS) <= RCU_FANOUT_CUBE
+#  define NUM_RCU_LVLS	      3
+#  define NUM_RCU_LVL_0	      1
+#  define NUM_RCU_LVL_1	      (((NR_CPUS) + RCU_FANOUT_SQ - 1) / RCU_FANOUT_SQ)
+#  define NUM_RCU_LVL_2	      (((NR_CPUS) + (RCU_FANOUT) - 1) / (RCU_FANOUT))
+#  define NUM_RCU_LVL_3	      NR_CPUS
+#else
+# error "CONFIG_RCU_FANOUT insufficient for NR_CPUS"
+#endif /* #if (NR_CPUS) <= RCU_FANOUT */
 
-	int	signaled;
+#define RCU_SUM (NUM_RCU_LVL_0 + NUM_RCU_LVL_1 + NUM_RCU_LVL_2 + NUM_RCU_LVL_3)
+#define NUM_RCU_NODES (RCU_SUM - NR_CPUS)
 
-	spinlock_t	lock	____cacheline_internodealigned_in_smp;
-	cpumask_t	cpumask; /* CPUs that need to switch in order    */
-				 /* for current batch to proceed.        */
+/*
+ * Definition for node within the RCU grace-period-detection hierarchy.
+ */
+struct rcu_node {
+	spinlock_t lock;
+	unsigned long	qsmask;	/* CPUs or groups that need to switch in      */
+				/*  order for current grace period to proceed.*/
+	unsigned long	qsmaskinit;
+				/* Per-GP initialization for qsmask.	      */
+	int	grplo;		/* lowest-numbered CPU or group here.	      */
+	int	grphi;		/* highest-numbered CPU or group here.	      */
+	u8	grpnum;		/* CPU/group number for next level up.	      */
+	u8	level;		/* root is at level 0.			      */
+	struct rcu_node *parent;
 } ____cacheline_internodealigned_in_smp;
 
-/* Is batch a before batch b ? */
-static inline int rcu_batch_before(long a, long b)
-{
-	return (a - b) < 0;
-}
+/* Values for signaled field in struc rcu_data. */
+#define RCU_SAVE_DYNTICK	0	/* Need to scan dyntick state. */
+#define RCU_FORCE_QS		1	/* Need to force quiescent state. */
+#define RCU_SIGNALED		2	/* We have done all that we can. */
+#ifdef CONFIG_NO_HZ
+#define RCU_SIGNAL_INIT		RCU_SAVE_DYNTICK
+#else /* #ifdef CONFIG_NO_HZ */
+#define RCU_SIGNAL_INIT		RCU_FORCE_QS
+#endif /* #else #ifdef CONFIG_NO_HZ */
 
-/* Is batch a after batch b ? */
-static inline int rcu_batch_after(long a, long b)
-{
-	return (a - b) > 0;
-}
+/*
+ * RCU global state, including node hierarchy.  This hierarchy is
+ * represented in "heap" form in a dense array.  The root (first level)
+ * of the hierarchy is in ->node[0] (referenced by ->level[0]), the second
+ * level in ->node[1] through ->node[m] (->node[1] referenced by ->level[1]),
+ * and the third level in ->node[m+1] and following (->node[m+1] referenced
+ * by ->level[2]).  The number of levels is determined by the number of
+ * CPUs and by CONFIG_RCU_FANOUT.  Small systems will have a "hierarchy"
+ * consisting of a single rcu_node.
+ */
+struct rcu_state {
+	struct rcu_node node[NUM_RCU_NODES];	/* Hierarchy. */
+	struct rcu_node *level[NUM_RCU_LVLS];	/* Hierarchy levels. */
+	u8 levelcnt[MAX_RCU_LVLS + 1];		/* # nodes in each level. */
+	u8 levelspread[NUM_RCU_LVLS];		/* kids/node in each level. */
 
-/* Per-CPU data for Read-Copy UPdate. */
+	/* The following fields are guarded by the root rcu_node's lock. */
+
+	u8	signaled ____cacheline_internodealigned_in_smp;
+						/* sent GP-kick IPIs? */
+	long	gpnum;				/* Current gp number. */
+	long	completed;			/* # of last completed gp. */
+	spinlock_t onofflock;			/* exclude on/offline and */
+						/*  starting new GP. */
+#ifdef CONFIG_NO_HZ
+	long dynticks_completed;		/* Value of completed @ snap. */
+#endif /* #ifdef CONFIG_NO_HZ */
+};
+
+/* Index values for nxttail array in struct rcu_data. */
+#define RCU_DONE_TAIL		0	/* Also RCU_WAIT head. */
+#define RCU_WAIT_TAIL		1	/* Also RCU_NEXT_READY head. */
+#define RCU_NEXT_READY_TAIL	2	/* Also RCU_NEXT head. */
+#define RCU_NEXT_TAIL		3
+#define RCU_NEXT_SIZE		4
+
+/* Per-CPU data for read-copy update. */
 struct rcu_data {
-	/* 1) quiescent state handling : */
-	long		quiescbatch;     /* Batch # for grace period */
-	int		passed_quiesc;	 /* User-mode/idle loop etc. */
-	int		qs_pending;	 /* core waits for quiesc state */
+	/* 1) quiescent-state and grace-period handling : */
+	long		completed;	/* Track rsp->completed gp number */
+					/*  in order to detect GP end. */
+	long		gpnum;		/* Highest gp number that this CPU */
+					/*  is aware of having started. */
+	bool		passed_quiesc;	/* User-mode/idle loop etc. */
+	long		passed_quiesc_completed;
+					/* Value of completed at time of qs. */
+	bool		qs_pending;	/* Core waits for quiesc state. */
+	struct rcu_node *mynode;	/* This CPU's leaf of hierarchy */
 
 	/* 2) batch handling */
 	/*
-	 * if nxtlist is not NULL, then:
-	 * batch:
-	 *	The batch # for the last entry of nxtlist
-	 * [*nxttail[1], NULL = *nxttail[2]):
-	 *	Entries that batch # <= batch
-	 * [*nxttail[0], *nxttail[1]):
-	 *	Entries that batch # <= batch - 1
-	 * [nxtlist, *nxttail[0]):
-	 *	Entries that batch # <= batch - 2
+	 * If nxtlist is not NULL, it is partitioned as follows.
+	 * Any of the partitions might be empty, in which case the
+	 * pointer to that partition will be equal to the pointer for
+	 * the following partition.  When the list is empty, all of
+	 * the nxttail elements point to nxtlist, which is NULL.
+	 *
+	 * [*nxttail[RCU_NEXT_READY_TAIL], NULL = *nxttail[RCU_NEXT_TAIL]):
+	 *	Entries that might have arrived after current GP ended
+	 * [*nxttail[RCU_WAIT_TAIL], *nxttail[RCU_NEXT_READY_TAIL]):
+	 *	Entries known to have arrived before current GP ended
+	 * [*nxttail[RCU_DONE_TAIL], *nxttail[RCU_WAIT_TAIL]):
+	 *	Entries that batch # <= ->completed - 1: waiting for current GP
+	 * [nxtlist, *nxttail[RCU_DONE_TAIL]):
+	 *	Entries that batch # <= ->completed
 	 *	The grace period for these entries has completed, and
 	 *	the other grace-period-completed entries may be moved
 	 *	here temporarily in rcu_process_callbacks().
 	 */
-	long  	       	batch;
 	struct rcu_head *nxtlist;
-	struct rcu_head **nxttail[3];
-	long            qlen; 	 	 /* # of queued callbacks */
-	struct rcu_head *donelist;
-	struct rcu_head **donetail;
-	long		blimit;		 /* Upper limit on a processed batch */
-	int cpu;
+	struct rcu_head **nxttail[RCU_NEXT_SIZE];
+	long		qlen; 	 	/* # of queued callbacks */
+	long		blimit;		/* Upper limit on a processed batch */
+
+	/* 3) rcu-barrier functions */
 	struct rcu_head barrier;
+
+#ifdef CONFIG_NO_HZ
+	/* 4) dynticks interface (see http://lwn.net/Articles/279077/) */
+	int dynticks_nesting;		/* Track nesting level, sort of. */
+	int dynticks;			/* Even for dynticks-idle mode. */
+	int dynticks_snap;		/* Per-GP tracking for dynticks. */
+#endif /* #ifdef CONFIG_NO_HZ */
+
+	int cpu;
 };
 
+extern struct rcu_state rcu_state;
 DECLARE_PER_CPU(struct rcu_data, rcu_data);
+
+extern struct rcu_state rcu_bh_state;
 DECLARE_PER_CPU(struct rcu_data, rcu_bh_data);
 
 /*
@@ -115,11 +202,13 @@ static inline void rcu_qsctr_inc(int cpu)
 {
 	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
 	rdp->passed_quiesc = 1;
+	rdp->passed_quiesc_completed = rdp->completed;
 }
 static inline void rcu_bh_qsctr_inc(int cpu)
 {
 	struct rcu_data *rdp = &per_cpu(rcu_bh_data, cpu);
 	rdp->passed_quiesc = 1;
+	rdp->passed_quiesc_completed = rdp->completed;
 }
 
 extern int rcu_pending(int cpu);
@@ -172,7 +261,41 @@ extern void rcu_restart_cpu(int cpu);
 extern long rcu_batches_completed(void);
 extern long rcu_batches_completed_bh(void);
 
+#ifdef CONFIG_NO_HZ
+
+/*
+ * Enter nohz mode, in other words, -leave- the mode in which RCU
+ * read-side critical sections can occur.  (Though RCU read-side
+ * critical sections can occur in irq handlers in nohz mode, a possibility
+ * handled by rcu_irq_enter() and rcu_irq_exit()).
+ *
+ * @@@ note quiescent state???
+ */
+static inline void rcu_enter_nohz(void)
+{
+	static DEFINE_RATELIMIT_STATE(rs, 10 * HZ, 1);
+
+	smp_mb(); /* CPUs seeing ++ must see prior RCU read-side crit sects */
+	__get_cpu_var(rcu_data).dynticks++;
+	WARN_ON_RATELIMIT(__get_cpu_var(rcu_data).dynticks & 0x1, &rs);
+}
+
+/*
+ * Exit nohz mode.
+ */
+static inline void rcu_exit_nohz(void)
+{
+	static DEFINE_RATELIMIT_STATE(rs, 10 * HZ, 1);
+
+	__get_cpu_var(rcu_data).dynticks++;
+	smp_mb(); /* CPUs seeing ++ must see later RCU read-side crit sects */
+	WARN_ON_RATELIMIT(!(__get_cpu_var(rcu_data).dynticks & 0x1),
+				&rs);
+}
+
+#else /* CONFIG_NO_HZ */
 #define rcu_enter_nohz()	do { } while (0)
 #define rcu_exit_nohz()		do { } while (0)
+#endif /* CONFIG_NO_HZ */
 
 #endif /* __LINUX_RCUCLASSIC_H */
diff --git a/kernel/Kconfig.preempt b/kernel/Kconfig.preempt
index 9fdba03..38a64ae 100644
--- a/kernel/Kconfig.preempt
+++ b/kernel/Kconfig.preempt
@@ -68,7 +68,6 @@ config PREEMPT_RCU
 
 config RCU_TRACE
 	bool "Enable tracing for RCU - currently stats in debugfs"
-	depends on PREEMPT_RCU
 	select DEBUG_FS
 	default y
 	help
@@ -77,3 +76,34 @@ config RCU_TRACE
 
 	  Say Y here if you want to enable RCU tracing
 	  Say N if you are unsure.
+
+config RCU_FANOUT
+	int "Hierarchical RCU fanout value"
+	range 2 64 if 64BIT
+	range 2 32 if !64BIT
+	depends on CLASSIC_RCU
+	default 64 if 64BIT
+	default 32 if !64BIT
+	help
+	  This option controls the fanout of hierarchical implementations
+	  of RCU, allowing RCU to work efficiently on machines with
+	  large numbers of CPUs.  This value must be at least the cube
+	  root of NR_CPUS, which allows NR_CPUS up to 32,768 for 32-bit
+	  systems and up to 262,144 for 64-bit systems.
+
+	  Select a specific number if testing RCU itself.
+	  Take the default if unsure.
+
+config RCU_FANOUT_EXACT
+	bool "Disable hierarchical RCU auto-balancing"
+	depends on CLASSIC_RCU
+	default n
+	help
+	  This option forces use of the exact RCU_FANOUT value specified,
+	  regardless of imbalances in the hierarchy.  This is useful for
+	  testing RCU itself, and might one day be useful on systems with
+	  strong NUMA behavior.
+
+	  Without RCU_FANOUT_EXACT, the code will balance the hierarchy.
+
+	  Say n if unsure.
diff --git a/kernel/Makefile b/kernel/Makefile
index 4e1d7df..b018f62 100644
--- a/kernel/Makefile
+++ b/kernel/Makefile
@@ -75,8 +75,9 @@ obj-$(CONFIG_SECCOMP) += seccomp.o
 obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o
 obj-$(CONFIG_CLASSIC_RCU) += rcuclassic.o
 obj-$(CONFIG_PREEMPT_RCU) += rcupreempt.o
-ifeq ($(CONFIG_PREEMPT_RCU),y)
-obj-$(CONFIG_RCU_TRACE) += rcupreempt_trace.o
+ifeq ($(CONFIG_RCU_TRACE),y)
+obj-$(CONFIG_CLASSIC_RCU) += rcuclassic_trace.o
+obj-$(CONFIG_PREEMPT_RCU) += rcupreempt_trace.o
 endif
 obj-$(CONFIG_RELAY) += relay.o
 obj-$(CONFIG_SYSCTL) += utsname_sysctl.o
diff --git a/kernel/rcuclassic.c b/kernel/rcuclassic.c
index 01e761a..905018c 100644
--- a/kernel/rcuclassic.c
+++ b/kernel/rcuclassic.c
@@ -15,20 +15,17 @@
  * along with this program; if not, write to the Free Software
  * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
  *
- * Copyright IBM Corporation, 2001
+ * Copyright IBM Corporation, 2008
  *
  * Authors: Dipankar Sarma <dipankar@...ibm.com>
  *	    Manfred Spraul <manfred@...orfullife.com>
+ *	    Paul E. McKenney <paulmck@...ux.vnet.ibm.com> Hierarchical version
  *
  * Based on the original work by Paul McKenney <paulmck@...ibm.com>
  * and inputs from Rusty Russell, Andrea Arcangeli and Andi Kleen.
- * Papers:
- * http://www.rdrop.com/users/paulmck/paper/rclockpdcsproof.pdf
- * http://lse.sourceforge.net/locking/rclock_OLS.2001.05.01c.sc.pdf (OLS2001)
  *
  * For detailed explanation of Read-Copy Update mechanism see -
- * 		Documentation/RCU
- *
+ * 	Documentation/RCU
  */
 #include <linux/types.h>
 #include <linux/kernel.h>
@@ -56,164 +53,227 @@ struct lockdep_map rcu_lock_map =
 EXPORT_SYMBOL_GPL(rcu_lock_map);
 #endif
 
+/* Data structures. */
+
+#define RCU_STATE_INITIALIZER(name) { \
+	.level = { &name.node[0] }, \
+	.levelcnt = { \
+		NUM_RCU_LVL_0,  /* root of hierarchy. */ \
+		NUM_RCU_LVL_1, \
+		NUM_RCU_LVL_2, \
+		NUM_RCU_LVL_3, /* == MAX_RCU_LVLS */ \
+	}, \
+	.signaled = RCU_SIGNAL_INIT, \
+	.gpnum = -300, \
+	.completed = -300, \
+	.onofflock = __SPIN_LOCK_UNLOCKED(&name.onofflock), \
+}
 
-/* Definition for rcupdate control block. */
-static struct rcu_ctrlblk rcu_ctrlblk = {
-	.cur = -300,
-	.completed = -300,
-	.pending = -300,
-	.lock = __SPIN_LOCK_UNLOCKED(&rcu_ctrlblk.lock),
-	.cpumask = CPU_MASK_NONE,
-};
-static struct rcu_ctrlblk rcu_bh_ctrlblk = {
-	.cur = -300,
-	.completed = -300,
-	.pending = -300,
-	.lock = __SPIN_LOCK_UNLOCKED(&rcu_bh_ctrlblk.lock),
-	.cpumask = CPU_MASK_NONE,
-};
-
+struct rcu_state rcu_state = RCU_STATE_INITIALIZER(rcu_state);
 DEFINE_PER_CPU(struct rcu_data, rcu_data) = { 0L };
+
+struct rcu_state rcu_bh_state = RCU_STATE_INITIALIZER(rcu_bh_state);
 DEFINE_PER_CPU(struct rcu_data, rcu_bh_data) = { 0L };
 
-static int blimit = 10;
-static int qhimark = 10000;
-static int qlowmark = 100;
+static int blimit = 10;		/* Maximum callbacks per softirq. */
+static int qhimark = 10000;	/* If this many pending, ignore blimit. */
+static int qlowmark = 100;	/* Once only this many pending, use blimit. */
 
-#ifdef CONFIG_SMP
-static void force_quiescent_state(struct rcu_data *rdp,
-			struct rcu_ctrlblk *rcp)
+#ifdef CONFIG_NO_HZ
+
+/**
+ * rcu_irq_enter - Called from Hard irq handlers and NMI/SMI.
+ *
+ * If the CPU was idle with dynamic ticks active, this updates the
+ * rdp->dynticks to let the RCU handling know that the CPU is active.
+ */
+void rcu_irq_enter(void)
 {
-	int cpu;
-	cpumask_t cpumask;
-	unsigned long flags;
+	struct rcu_data *rdp = &__get_cpu_var(rcu_data);
 
-	set_need_resched();
-	spin_lock_irqsave(&rcp->lock, flags);
-	if (unlikely(!rcp->signaled)) {
-		rcp->signaled = 1;
+	if (rdp->dynticks_nesting)
+		rdp->dynticks_nesting++;
+
+	/*
+	 * Only update if we are coming from a stopped ticks mode
+	 * (rdp->dynticks is even).
+	 */
+	if (!in_interrupt() &&
+	    (rdp->dynticks & 0x1) == 0) {
 		/*
-		 * Don't send IPI to itself. With irqs disabled,
-		 * rdp->cpu is the current cpu.
-		 *
-		 * cpu_online_map is updated by the _cpu_down()
-		 * using __stop_machine(). Since we're in irqs disabled
-		 * section, __stop_machine() is not exectuting, hence
-		 * the cpu_online_map is stable.
+		 * The following might seem like we could have a race
+		 * with NMI/SMIs. But this really isn't a problem.
+		 * Here we do a read/modify/write, and the race happens
+		 * when an NMI/SMI comes in after the read and before
+		 * the write. But NMI/SMIs will increment this counter
+		 * twice before returning, so the zero bit will not
+		 * be corrupted by the NMI/SMI which is the most important
+		 * part.
 		 *
-		 * However,  a cpu might have been offlined _just_ before
-		 * we disabled irqs while entering here.
-		 * And rcu subsystem might not yet have handled the CPU_DEAD
-		 * notification, leading to the offlined cpu's bit
-		 * being set in the rcp->cpumask.
+		 * The only thing is that we would bring back the counter
+		 * to a postion that it was in during the NMI/SMI.
+		 * But the zero bit would be set, so the rest of the
+		 * counter would again be ignored.
 		 *
-		 * Hence cpumask = (rcp->cpumask & cpu_online_map) to prevent
-		 * sending smp_reschedule() to an offlined CPU.
+		 * On return from the IRQ, the counter may have the zero
+		 * bit be 0 and the counter the same as the return from
+		 * the NMI/SMI. If the state machine was so unlucky to
+		 * see that, it still doesn't matter, since all
+		 * RCU read-side critical sections on this CPU would
+		 * have already completed.
+		 */
+		rdp->dynticks++;
+		/*
+		 * The following memory barrier ensures that any RCU
+		 * read-side critical sections in the irq handler are
+		 * seen by other CPUs to follow the above increment to
+		 * rdp->dynticks. This is required in order for other CPUs
+		 * to correctly determine when it is safe to advance the
+		 * RCU grace-period state machine.
+		 */
+		smp_mb(); /* see above block comment. */
+		/*
+		 * Since we can't determine the dynamic tick mode from
+		 * the rdp->dynticks after this routine, we use a second
+		 * flag to acknowledge that we came from an idle state
+		 * with ticks stopped.
+		 */
+		rdp->dynticks_nesting++;
+		/*
+		 * If we take an NMI/SMI now, they will also increment
+		 * the dynticks_nesting counter, and will not update the
+		 * rdp->dynticks on exit. That is for this IRQ to do.
 		 */
-		cpus_and(cpumask, rcp->cpumask, cpu_online_map);
-		cpu_clear(rdp->cpu, cpumask);
-		for_each_cpu_mask_nr(cpu, cpumask)
-			smp_send_reschedule(cpu);
 	}
-	spin_unlock_irqrestore(&rcp->lock, flags);
 }
-#else
-static inline void force_quiescent_state(struct rcu_data *rdp,
-			struct rcu_ctrlblk *rcp)
-{
-	set_need_resched();
-}
-#endif
 
-static void __call_rcu(struct rcu_head *head, struct rcu_ctrlblk *rcp,
-		struct rcu_data *rdp)
+/**
+ * rcu_irq_exit - Called from exiting Hard irq context.
+ *
+ * If the CPU was idle with dynamic ticks active, update the rdp->dynticks
+ * to put let the RCU handling be aware that the CPU is going back to idle
+ * with no ticks.
+ */
+void rcu_irq_exit(void)
 {
-	long batch;
-
-	head->next = NULL;
-	smp_mb(); /* Read of rcu->cur must happen after any change by caller. */
+	struct rcu_data *rdp = &__get_cpu_var(rcu_data);
 
 	/*
-	 * Determine the batch number of this callback.
-	 *
-	 * Using ACCESS_ONCE to avoid the following error when gcc eliminates
-	 * local variable "batch" and emits codes like this:
-	 *	1) rdp->batch = rcp->cur + 1 # gets old value
-	 *	......
-	 *	2)rcu_batch_after(rcp->cur + 1, rdp->batch) # gets new value
-	 * then [*nxttail[0], *nxttail[1]) may contain callbacks
-	 * that batch# = rdp->batch, see the comment of struct rcu_data.
+	 * rdp->dynticks_nesting is set if we interrupted the CPU
+	 * when it was idle with ticks stopped.
+	 * Once this occurs, we keep track of interrupt nesting
+	 * because a NMI/SMI could also come in, and we still
+	 * only want the IRQ that started the increment of the
+	 * rdp->dynticks to be the one that modifies it on exit.
 	 */
-	batch = ACCESS_ONCE(rcp->cur) + 1;
-
-	if (rdp->nxtlist && rcu_batch_after(batch, rdp->batch)) {
-		/* process callbacks */
-		rdp->nxttail[0] = rdp->nxttail[1];
-		rdp->nxttail[1] = rdp->nxttail[2];
-		if (rcu_batch_after(batch - 1, rdp->batch))
-			rdp->nxttail[0] = rdp->nxttail[2];
-	}
+	if (rdp->dynticks_nesting) {
+		if (--rdp->dynticks_nesting)
+			return;
 
-	rdp->batch = batch;
-	*rdp->nxttail[2] = head;
-	rdp->nxttail[2] = &head->next;
+		/* This must match the interrupt nesting */
+		WARN_ON(in_interrupt());
 
-	if (unlikely(++rdp->qlen > qhimark)) {
-		rdp->blimit = INT_MAX;
-		force_quiescent_state(rdp, &rcu_ctrlblk);
+		/*
+		 * If an NMI/SMI happens now we are still
+		 * protected by the rdp->dynticks being odd.
+		 */
+
+		/*
+		 * The following memory barrier ensures that any
+		 * rcu_read_unlock() primitives in the irq handler
+		 * are seen by other CPUs to preceed the following
+		 * increment to rdp->dynticks. This is required in
+		 * order for other CPUs to determine when it is safe
+		 * to advance the RCU grace-period state machine.
+		 */
+		smp_mb(); /* see above block comment. */
+		rdp->dynticks++;
+		WARN_ON(rdp->dynticks & 0x1);
 	}
 }
 
-/**
- * call_rcu - Queue an RCU callback for invocation after a grace period.
- * @head: structure to be used for queueing the RCU updates.
- * @func: actual update function to be invoked after the grace period
- *
- * The update function will be invoked some time after a full grace
- * period elapses, in other words after all currently executing RCU
- * read-side critical sections have completed.  RCU read-side critical
- * sections are delimited by rcu_read_lock() and rcu_read_unlock(),
- * and may be nested.
+/*
+ * Snapshot the specified CPU's dynticks counter so that we can later
+ * credit them with an implicit quiescent state.  Return 1 if this CPU
+ * is already in a quiescent state courtesy of dynticks idle mode.
  */
-void call_rcu(struct rcu_head *head,
-				void (*func)(struct rcu_head *rcu))
+static int dyntick_save_progress_counter(int cpu)
 {
-	unsigned long flags;
+	struct rcu_data *rdp = &__get_cpu_var(rcu_data);
+	int snap;
 
-	head->func = func;
-	local_irq_save(flags);
-	__call_rcu(head, &rcu_ctrlblk, &__get_cpu_var(rcu_data));
-	local_irq_restore(flags);
+	snap = rdp->dynticks;
+	smp_mb();	/* Order sampling of snap with end of grace period. */
+	rdp->dynticks_snap = snap;
+	return ((snap & 0x1) == 0);
 }
-EXPORT_SYMBOL_GPL(call_rcu);
 
-/**
- * call_rcu_bh - Queue an RCU for invocation after a quicker grace period.
- * @head: structure to be used for queueing the RCU updates.
- * @func: actual update function to be invoked after the grace period
- *
- * The update function will be invoked some time after a full grace
- * period elapses, in other words after all currently executing RCU
- * read-side critical sections have completed. call_rcu_bh() assumes
- * that the read-side critical sections end on completion of a softirq
- * handler. This means that read-side critical sections in process
- * context must not be interrupted by softirqs. This interface is to be
- * used when most of the read-side critical sections are in softirq context.
- * RCU read-side critical sections are delimited by rcu_read_lock() and
- * rcu_read_unlock(), * if in interrupt context or rcu_read_lock_bh()
- * and rcu_read_unlock_bh(), if in process context. These may be nested.
- */
-void call_rcu_bh(struct rcu_head *head,
-				void (*func)(struct rcu_head *rcu))
+/*
+ * Snapshot the global completed counter so that later on it will be
+ * possible to tell which grace period any detected dyntick-idle
+ * quiescent states belong to.  The caller must hold the root rcu_node
+ * lock.
+ */
+static void dyntick_save_completed(struct rcu_state *rsp, long completed)
 {
-	unsigned long flags;
+	rsp->dynticks_completed = completed;
+}
 
-	head->func = func;
-	local_irq_save(flags);
-	__call_rcu(head, &rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
-	local_irq_restore(flags);
+/*
+ * Get the value previously saved by dyntick_save_completed().
+ */
+static long dyntick_get_completed(struct rcu_state *rsp)
+{
+	return rsp->dynticks_completed;
 }
-EXPORT_SYMBOL_GPL(call_rcu_bh);
+
+/*
+ * Return true if the specified CPU has passed through a quiescent
+ * state by virtue of being in or having passed through an dynticks
+ * idle state since the last call to dyntick_save_progress_counter()
+ * for this same CPU.
+ */
+static int rcu_implicit_dynticks_qs(int cpu)
+{
+	long curr;
+	long snap;
+	struct rcu_data *rdp = &__get_cpu_var(rcu_data);
+
+	curr = rdp->dynticks;
+	snap = rdp->dynticks_snap;
+	smp_mb(); /* force ordering with cpu entering/leaving dynticks. */
+
+	/*
+	 * If the CPU passed through or entered a dynticks idle phase with
+	 * no active irq handlers, then we can safely pretend that the CPU
+	 * already acknowledged the request to pass through a quiescent
+	 * state.  Either way, that CPU cannot possibly be in an RCU
+	 * read-side critical section that started before the beginning
+	 * of the current RCU grace period.
+	 */
+	if ((curr - snap) >= 2 || (curr & 0x1) == 0)
+		return 1;
+
+	/*
+	 * We need this CPU to either enter dynticks idle mode or pass
+	 * through a quiescent state.  Send it a reschedule IPI.
+	 */
+
+	if (cpu != smp_processor_id())
+		smp_send_reschedule(cpu);
+	else
+		set_need_resched();
+	return 0;
+}
+
+#else /* #ifdef CONFIG_NO_HZ */
+
+static int dyntick_save_progress_counter(int cpu) { return 0; }
+static int rcu_implicit_dynticks_qs(int cpu) { return 0; }
+# define dyntick_save_completed(rsp, completed) do { } while (0)
+# define dyntick_get_completed(rsp)		((rsp)->completed - 1)
+
+#endif /* #else #ifdef CONFIG_NO_HZ */
 
 /*
  * Return the number of RCU batches processed thus far.  Useful
@@ -221,7 +281,7 @@ EXPORT_SYMBOL_GPL(call_rcu_bh);
  */
 long rcu_batches_completed(void)
 {
-	return rcu_ctrlblk.completed;
+	return rcu_state.completed;
 }
 EXPORT_SYMBOL_GPL(rcu_batches_completed);
 
@@ -231,70 +291,19 @@ EXPORT_SYMBOL_GPL(rcu_batches_completed);
  */
 long rcu_batches_completed_bh(void)
 {
-	return rcu_bh_ctrlblk.completed;
+	return rcu_bh_state.completed;
 }
 EXPORT_SYMBOL_GPL(rcu_batches_completed_bh);
 
 /* Raises the softirq for processing rcu_callbacks. */
-static inline void raise_rcu_softirq(void)
+static void raise_rcu_softirq(void)
 {
 	raise_softirq(RCU_SOFTIRQ);
 }
 
-/*
- * Invoke the completed RCU callbacks. They are expected to be in
- * a per-cpu list.
- */
-static void rcu_do_batch(struct rcu_data *rdp)
-{
-	struct rcu_head *next, *list;
-	int count = 0;
-
-	list = rdp->donelist;
-	while (list) {
-		next = list->next;
-		prefetch(next);
-		list->func(list);
-		list = next;
-		if (++count >= rdp->blimit)
-			break;
-	}
-	rdp->donelist = list;
-
-	local_irq_disable();
-	rdp->qlen -= count;
-	local_irq_enable();
-	if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
-		rdp->blimit = blimit;
-
-	if (!rdp->donelist)
-		rdp->donetail = &rdp->donelist;
-	else
-		raise_rcu_softirq();
-}
-
-/*
- * Grace period handling:
- * The grace period handling consists out of two steps:
- * - A new grace period is started.
- *   This is done by rcu_start_batch. The start is not broadcasted to
- *   all cpus, they must pick this up by comparing rcp->cur with
- *   rdp->quiescbatch. All cpus are recorded  in the
- *   rcu_ctrlblk.cpumask bitmap.
- * - All cpus must go through a quiescent state.
- *   Since the start of the grace period is not broadcasted, at least two
- *   calls to rcu_check_quiescent_state are required:
- *   The first call just notices that a new grace period is running. The
- *   following calls check if there was a quiescent state since the beginning
- *   of the grace period. If so, it updates rcu_ctrlblk.cpumask. If
- *   the bitmap is empty, then the grace period is completed.
- *   rcu_check_quiescent_state calls rcu_start_batch(0) to start the next grace
- *   period (if necessary).
- */
-
 #ifdef CONFIG_DEBUG_RCU_STALL
 
-static inline void record_gp_check_time(struct rcu_ctrlblk *rcp)
+static void record_gp_check_time(struct rcu_ctrlblk *rcp)
 {
 	rcp->gp_check = get_seconds() + 3;
 }
@@ -359,78 +368,354 @@ static void check_cpu_stall(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
 
 #else /* #ifdef CONFIG_DEBUG_RCU_STALL */
 
-static inline void record_gp_check_time(struct rcu_ctrlblk *rcp)
+static void record_gp_check_time(struct rcu_state *rsp)
 {
 }
 
-static inline void
-check_cpu_stall(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
+static void
+check_cpu_stall(struct rcu_state *rsp, struct rcu_data *rdp)
 {
 }
 
 #endif /* #else #ifdef CONFIG_DEBUG_RCU_STALL */
 
 /*
- * Register a new batch of callbacks, and start it up if there is currently no
- * active batch and the batch to be registered has not already occurred.
- * Caller must hold rcu_ctrlblk.lock.
+ * Does the CPU have callbacks ready to be invoked?
+ */
+static int
+cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
+{
+	return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL];
+}
+
+/*
+ * Does the current CPU require a yet-as-unscheduled grace period?
+ */
+static int
+cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
+{
+	/* ACCESS_ONCE() because we are accessing outside of lock. */
+	return *rdp->nxttail[RCU_DONE_TAIL] &&
+	       ACCESS_ONCE(rsp->completed) == ACCESS_ONCE(rsp->gpnum);
+}
+
+/*
+ * Return the root node of the specified rcu_state structure.
+ */
+static struct rcu_node *rcu_get_root(struct rcu_state *rsp)
+{
+	return &rsp->node[0];
+}
+
+/*
+ * Compute the per-level fanout, either using the exact fanout specified
+ * or balancing the tree, depending on CONFIG_RCU_FANOUT_EXACT.
+ */
+#ifdef CONFIG_RCU_FANOUT_EXACT
+void rcu_init_levelspread(struct rcu_state *rsp)
+{
+	int i;
+
+	for (i = NUM_RCU_LVLS - 1; i >= 0; i--) {
+		levelspread[i] = CONFIG_RCU_FANOUT;
+	}
+	
+}
+#else /* #ifdef CONFIG_RCU_FANOUT_EXACT */
+void rcu_init_levelspread(struct rcu_state *rsp)
+{
+	int ccur;
+	int cprv;
+	int i;
+
+	cprv = NR_CPUS;
+	for (i = NUM_RCU_LVLS - 1; i >= 0; i--) {
+		ccur = rsp->levelcnt[i];
+		rsp->levelspread[i] = (cprv + ccur - 1) / ccur;
+		cprv = ccur;
+	}
+	
+}
+#endif /* #else #ifdef CONFIG_RCU_FANOUT_EXACT */
+
+/*
+ * When a given CPU first becomes aware of a grace period, it knows
+ * that all of its pre-existing callbacks will be covered by the next
+ * grace period.
+ *
+ * Similarly, if a given CPU has not yet let RCU know that it passed
+ * through a quiescent state for the current grace period, then that
+ * CPU knows that all of its callbacks may safely be invoked at the
+ * end of the next grace period.
+ */
+static void
+rcu_next_callbacks_are_ready(struct rcu_data *rdp)
+{
+	rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+}
+
+/*
+ * Update CPU-local rcu_data state to record the newly noticed grace period.
+ * This is used both when we started the grace period and when we notice
+ * that someone else started the grace period.
+ */
+static void note_new_gpnum(struct rcu_state *rsp, struct rcu_data *rdp)
+{
+	rdp->qs_pending = 1;
+	rdp->passed_quiesc = 0;
+	rdp->gpnum = rsp->gpnum;
+}
+
+/*
+ * Did someone else start a new RCU grace period start since we last
+ * checked?  Update local state appropriately if so.
+ */
+static int
+check_for_new_grace_period(struct rcu_state *rsp, struct rcu_data *rdp)
+{
+	unsigned long flags;
+	int ret = 0;
+
+	local_irq_save(flags);
+	if (rdp->gpnum != rsp->gpnum) {
+		note_new_gpnum(rsp, rdp);
+		ret = 1;
+	}
+	local_irq_restore(flags);
+	return ret;
+}
+
+/*
+ * Start a new RCU grace period if warranted, re-initializing the hierarchy
+ * in preparation for detecting the next grace period.  The caller must hold
+ * the root node's ->lock, which is released before return.  Hard irqs must
+ * be disabled.
  */
-static void rcu_start_batch(struct rcu_ctrlblk *rcp)
+static void
+rcu_start_gp(struct rcu_state *rsp, struct rcu_data *rdp, unsigned long iflg)
 {
-	if (rcp->cur != rcp->pending &&
-			rcp->completed == rcp->cur) {
-		rcp->cur++;
-		record_gp_check_time(rcp);
+	unsigned long flags = iflg;
+	struct rcu_node *rnp = rcu_get_root(rsp);
+	struct rcu_node *rnp_cur;
+	struct rcu_node *rnp_end;
+
+	if (!cpu_needs_another_gp(rsp, rdp)) {
 
 		/*
-		 * Accessing nohz_cpu_mask before incrementing rcp->cur needs a
-		 * Barrier  Otherwise it can cause tickless idle CPUs to be
-		 * included in rcp->cpumask, which will extend graceperiods
-		 * unnecessarily.
+		 * Either there is no need to detect any more grace periods
+		 * at the moment, or we are already in the process of
+		 * detecting one.  Either way, we should not start a new
+		 * RCU grace period, so drop the lock and return.
 		 */
-		smp_mb();
-		cpus_andnot(rcp->cpumask, cpu_online_map, nohz_cpu_mask);
 
-		rcp->signaled = 0;
+		spin_unlock_irqrestore(&rnp->lock, flags);
+		return;
+	}
+
+	/* Advance to a new grace period and initialize state. */
+
+	rsp->gpnum++;
+	rsp->signaled = RCU_SIGNAL_INIT;
+	dyntick_save_completed(rsp, rsp->completed - 1);
+	note_new_gpnum(rsp, rdp);
+
+	/*
+	 * Because we are first, we know that all our callbacks will
+	 * be covered by this upcoming grace period, even the ones
+	 * that were registered arbitrarily recently.
+	 */
+
+	rcu_next_callbacks_are_ready(rdp);
+	rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+
+	/* Special-case the common single-level case. */
+
+	if (NUM_RCU_NODES == 1) {
+		rnp->qsmask = rnp->qsmaskinit;
+		spin_unlock_irqrestore(&rnp->lock, flags);
+		return;
+	}
+
+	spin_unlock_irqrestore(&rnp->lock, flags);
+
+
+	/* Exclude any concurrent CPU-hotplug operations. */
+	spin_lock_irqsave(&rsp->onofflock, flags);
+
+	/*
+	 * Set the quiescent-state-needed bits in all the non-leaf RCU
+	 * nodes for all currently online CPUs.  This operation relies
+	 * on the layout of the hierarchy within the rsp->node[] array.
+	 * Note that other CPUs will access only the leaves of the
+	 * hierarchy, which still indicate that no grace period is in
+	 * progress.  In addition, we have excluded CPU-hotplug operations.
+	 *
+	 * We therefore do not need to hold any locks.  Any required
+	 * memory barriers will be supplied by the locks guarding the
+	 * leaf rcu_nodes in the hierarchy.
+	 */
+
+	rnp_end = rsp->level[NUM_RCU_LVLS - 1];
+	for (rnp_cur = &rsp->node[0]; rnp_cur < rnp_end; rnp_cur++)
+		rnp_cur->qsmask = rnp_cur->qsmaskinit;
+
+	/*
+	 * Now set up the leaf nodes.  Here we must be careful.  First,
+	 * we need to hold the lock in order to exclude other CPUs, which
+	 * might be contending for the leaf nodes' locks.  Second, as
+	 * soon as we initialize a given leaf node, its CPUs might run
+	 * up the rest of the hierarchy.  We must therefore acquire locks
+	 * for each node that we touch during this stage.  (But we still
+	 * are excluding CPU-hotplug operations.)
+	 *
+	 * Note that the grace period cannot complete until we finish
+	 * the initialization process, as there will be at least one
+	 * qsmask bit set in the root node until that time, namely the
+	 * one corresponding to this CPU.
+	 */
+
+	rnp_end = &rsp->node[NUM_RCU_NODES];
+	rnp_cur = rsp->level[NUM_RCU_LVLS - 1];
+	for (; rnp_cur < rnp_end; rnp_cur++) {
+		spin_lock(&rnp_cur->lock);	/* irqs already disabled. */
+		rnp_cur->qsmask = rnp_cur->qsmaskinit;
+		spin_unlock(&rnp_cur->lock);	/* irqs already disabled. */
 	}
+
+	spin_unlock_irqrestore(&rsp->onofflock, flags);
 }
 
 /*
- * cpu went through a quiescent state since the beginning of the grace period.
- * Clear it from the cpu mask and complete the grace period if it was the last
- * cpu. Start another grace period if someone has further entries pending
+ * Advance this CPU's callbacks, but only if the current grace period
+ * has ended.
  */
-static void cpu_quiet(int cpu, struct rcu_ctrlblk *rcp)
+static void
+rcu_process_gp_end(struct rcu_state *rsp, struct rcu_data *rdp)
 {
-	cpu_clear(cpu, rcp->cpumask);
-	if (cpus_empty(rcp->cpumask)) {
-		/* batch completed ! */
-		rcp->completed = rcp->cur;
-		rcu_start_batch(rcp);
+	long completed_snap;
+	unsigned long flags;
+
+	local_irq_save(flags);
+	completed_snap = ACCESS_ONCE(rsp->completed);  /* outside of lock. */
+
+	/* Did another grace period end? */
+	if (rdp->completed != completed_snap) {
+
+		/* Advance callbacks.  No harm if list empty. */
+		rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[RCU_WAIT_TAIL];
+		rdp->nxttail[RCU_WAIT_TAIL] = rdp->nxttail[RCU_NEXT_READY_TAIL];
+		rdp->nxttail[RCU_NEXT_READY_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+
+		/* Remember that we saw this grace-period completion. */
+		rdp->completed = completed_snap;
 	}
+	local_irq_restore(flags);
 }
 
 /*
- * Check if the cpu has gone through a quiescent state (say context
- * switch). If so and if it already hasn't done so in this RCU
- * quiescent cycle, then indicate that it has done so.
+ * Similar to cpu_quiet(), for which it is a helper function.  Allows
+ * a group of CPUs to be quieted at one go, though all the CPUs in the
+ * group must be represented by the same leaf rcu_node structure.
+ * That structure's lock must be held upon entry, and it is released
+ * before return.
  */
-static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
-					struct rcu_data *rdp)
+static void
+cpu_quiet_msk(unsigned long mask,
+	      struct rcu_state *rsp, struct rcu_data *rdp, struct rcu_node *rnp,
+	      long *lastcomp, unsigned long flags)
 {
-	unsigned long flags;
+	if (lastcomp != NULL &&
+	    *lastcomp != ACCESS_ONCE(rsp->completed)) {
 
-	if (rdp->quiescbatch != rcp->cur) {
-		/* start new grace period: */
-		rdp->qs_pending = 1;
-		rdp->passed_quiesc = 0;
-		rdp->quiescbatch = rcp->cur;
+		/*
+		 * Someone beat us to it for this grace period, so leave.
+		 * The race with GP start is resolved by the fact that we
+		 * hold the leaf rcu_node lock, so that the per-CPU bits
+		 * cannot yet be initialized -- so we would simply find our
+		 * CPU's bit already cleared below if this race occurred.
+		 */
+		spin_unlock_irqrestore(&rnp->lock, flags);
 		return;
 	}
+	for (;;) {
+		if (!(rnp->qsmask & mask)) {
+
+			/* Our bit has already been cleared, so done. */
+
+			spin_unlock_irqrestore(&rnp->lock, flags);
+			return;
+		}
+		rnp->qsmask &= ~mask;
+		if (rnp->qsmask != 0) {
+
+			/* Other bits still set at this level, so done. */
+
+			spin_unlock_irqrestore(&rnp->lock, flags);
+			return;
+		}
+		mask = 1L << rnp->grpnum;
+		if (rnp->parent == NULL) {
+
+			/* No more levels.  Exit loop holding root lock. */
+
+			break;
+		}
+		spin_unlock_irqrestore(&rnp->lock, flags);
+		rnp = rnp->parent;
+		spin_lock_irqsave(&rnp->lock, flags);
+	}
+
+	/*
+	 * Get here if we are the last CPU to pass through a quiescent
+	 * state for this grace period.  Clean up and let rcu_start_gp()
+	 * start up the next grace period if one is needed.  Note that
+	 * we still hold rnp->lock, as required by rcu_start_gp(), which
+	 * will release it.
+	 */
+	rsp->completed = rsp->gpnum;
+	rcu_process_gp_end(rsp, rdp);
+	rcu_start_gp(rsp, rdp, flags);  /* releases rnp->lock. */
+}
+
+/*
+ * Record a quiescent state for the specified CPU.  Note that a CPU
+ * going offline counts as a quiescent state.  If invoking this on behalf
+ * of an online CPU (even if you are that CPU), lastcomp is used to make
+ * sure we are still in the grace period of interest.  We don't want to
+ * end the current grace period based on quiescent states detected in an
+ * earlier grace period!
+ *
+ * @@@@ make sure all callers are passing -their- rdp, not cpu's!!!
+ */
+static void
+cpu_quiet(int cpu, struct rcu_state *rsp, struct rcu_data *rdp, long *lastcomp)
+{
+	unsigned long flags;
+	long mask;
+	struct rcu_node *rnp;
+
+	rnp = rdp->mynode;
+	spin_lock_irqsave(&rnp->lock, flags);
+	mask = 1L << (cpu - rnp->grplo);
+	cpu_quiet_msk(mask, rsp, rdp, rnp, lastcomp, flags);
+}
+
+/*
+ * Check to see if there is a new grace period of which this CPU
+ * is not yet aware, and if so, set up local rcu_data state for it.
+ * Otherwise, see if this CPU has just passed through its first
+ * quiescent state for this grace period, and record that fact if so.
+ */
+static void
+rcu_check_quiescent_state(struct rcu_state *rsp, struct rcu_data *rdp)
+{
+	/* If there is now a new grace period, record and return. */
+	if (check_for_new_grace_period(rsp, rdp))
+		return;
 
-	/* Grace period already completed for this cpu?
-	 * qs_pending is checked instead of the actual bitmap to avoid
-	 * cacheline trashing.
+	/*
+	 * Does this CPU still need to do its part for current grace period?
+	 * If no, return and let the other CPUs do their part as well.
 	 */
 	if (!rdp->qs_pending)
 		return;
@@ -441,195 +726,253 @@ static void rcu_check_quiescent_state(struct rcu_ctrlblk *rcp,
 	 */
 	if (!rdp->passed_quiesc)
 		return;
-	rdp->qs_pending = 0;
 
-	spin_lock_irqsave(&rcp->lock, flags);
 	/*
-	 * rdp->quiescbatch/rcp->cur and the cpu bitmap can come out of sync
-	 * during cpu startup. Ignore the quiescent state.
+	 * Set up to process all currently pending callbacks at the end
+	 * of the next grace period, as these pending callbacks are
+	 * guaranteed to have been registered before the beginning of
+	 * the next grace period.  Then record the fact that this CPU
+	 * has done its part for the current grace period.
 	 */
-	if (likely(rdp->quiescbatch == rcp->cur))
-		cpu_quiet(rdp->cpu, rcp);
-
-	spin_unlock_irqrestore(&rcp->lock, flags);
+	rcu_next_callbacks_are_ready(rdp);
+	rdp->qs_pending = 0;
+	cpu_quiet(rdp->cpu, rsp, rdp, &rdp->passed_quiesc_completed);
 }
 
-
 #ifdef CONFIG_HOTPLUG_CPU
 
-/* warning! helper for rcu_offline_cpu. do not use elsewhere without reviewing
- * locking requirements, the list it's pulling from has to belong to a cpu
- * which is dead and hence not processing interrupts.
+
+/*
+ * Remove the outgoing CPU from the bitmasks in the rcu_node hierarchy
+ * and move all callbacks from the outgoing CPU to the current one.
  */
-static void rcu_move_batch(struct rcu_data *this_rdp, struct rcu_head *list,
-				struct rcu_head **tail, long batch)
+static void __rcu_offline_cpu(int cpu, struct rcu_state *rsp,
+			      struct rcu_data *rdp, struct rcu_data *rdp_me)
 {
-	if (list) {
-		local_irq_disable();
-		this_rdp->batch = batch;
-		*this_rdp->nxttail[2] = list;
-		this_rdp->nxttail[2] = tail;
-		local_irq_enable();
+	int i;
+	unsigned long flags;
+	long mask;
+	struct rcu_node *rnp;
+
+	/* Exclude any attempts to start a new grace period. */
+	spin_lock_irqsave(&rsp->onofflock, flags);
+
+	/* Remove the outgoing CPU from the masks in the rcu_node hierarchy. */
+	rnp = rdp->mynode;
+	spin_lock(&rnp->lock);			/* irqs already disabled. */
+	mask = 1L << (cpu - rnp->grplo);
+	for (;;) {
+		rnp->qsmaskinit &= ~mask;
+		if (rnp->qsmaskinit != 0) {
+			spin_unlock(&rnp->lock); /* irqs already disabled. */
+			break;
+		}
+		mask = 1L << rnp->grpnum;
+		spin_unlock(&rnp->lock);	/* irqs already disabled. */
+		rnp = rnp->parent;
+		if (rnp == NULL)
+			break;
+		spin_lock(&rnp->lock);		/* irqs already disabled. */
 	}
-}
 
-static void __rcu_offline_cpu(struct rcu_data *this_rdp,
-				struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
-{
-	unsigned long flags;
+	/* Being offline is a quiescent state, so go record it. */
+	cpu_quiet(cpu, rsp, rdp, NULL);
 
 	/*
-	 * if the cpu going offline owns the grace period
-	 * we can block indefinitely waiting for it, so flush
-	 * it here
+	 * Move callbacks from the outgoing CPU to the running CPU.
+	 * Note that the outgoing CPU is now quiscent, so it is now
+	 * (uncharacteristically) safe to access it rcu_data structure.
+	 * Note also that we must carefully retain the order of the
+	 * outgoing CPU's callbacks in order for rcu_barrier() to work
+	 * correctly.  Finally, note that we start all the callbacks
+	 * afresh, even those that have passed through a grace period
+	 * and are therefore ready to invoke.  The theory is that hotplug
+	 * events are rare, and that if they are frequent enough to
+	 * indefinitely delay callbacks, you have far worse things to
+	 * be worrying about.
 	 */
-	spin_lock_irqsave(&rcp->lock, flags);
-	if (rcp->cur != rcp->completed)
-		cpu_quiet(rdp->cpu, rcp);
-	rcu_move_batch(this_rdp, rdp->donelist, rdp->donetail, rcp->cur + 1);
-	rcu_move_batch(this_rdp, rdp->nxtlist, rdp->nxttail[2], rcp->cur + 1);
-	spin_unlock(&rcp->lock);
+	if (rdp->nxtlist != NULL) {
+		*rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxtlist;
+		rdp_me->nxttail[RCU_NEXT_TAIL] = rdp->nxttail[RCU_NEXT_TAIL];
+		rdp->nxtlist = NULL;
+		for (i = 0; i < RCU_NEXT_SIZE; i++)
+			rdp->nxttail[i] = &rdp->nxtlist;
+	}
 
-	this_rdp->qlen += rdp->qlen;
-	local_irq_restore(flags);
+	spin_unlock_irqrestore(&rsp->onofflock, flags);
 }
 
+/*
+ * Remove the specified CPU from the RCU hierarchy and move any pending
+ * callbacks that it might have to the current CPU.  This code assumes
+ * that at least one CPU in the system will remain running at all times.
+ * Any attempt to offline -all- CPUs is likely to strand RCU callbacks.
+ */
 static void rcu_offline_cpu(int cpu)
 {
-	struct rcu_data *this_rdp = &get_cpu_var(rcu_data);
-	struct rcu_data *this_bh_rdp = &get_cpu_var(rcu_bh_data);
+	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
+	struct rcu_data *rdp_me = &__get_cpu_var(rcu_data);
+	struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
+	struct rcu_data *bh_rdp_me = &__get_cpu_var(rcu_bh_data);
 
-	__rcu_offline_cpu(this_rdp, &rcu_ctrlblk,
-					&per_cpu(rcu_data, cpu));
-	__rcu_offline_cpu(this_bh_rdp, &rcu_bh_ctrlblk,
-					&per_cpu(rcu_bh_data, cpu));
-	put_cpu_var(rcu_data);
-	put_cpu_var(rcu_bh_data);
+	__rcu_offline_cpu(cpu, &rcu_state, rdp, rdp_me);
+	__rcu_offline_cpu(cpu, &rcu_bh_state, bh_rdp, bh_rdp_me);
 }
 
-#else
+#else /* #ifdef CONFIG_HOTPLUG_CPU */
 
-static void rcu_offline_cpu(int cpu)
+static void
+rcu_offline_cpu(int cpu)
 {
 }
 
-#endif
+#endif /* #else #ifdef CONFIG_HOTPLUG_CPU */
 
 /*
- * This does the RCU processing work from softirq context.
+ * Invoke any RCU callbacks that have made it to the end of their grace
+ * period.
  */
-static void __rcu_process_callbacks(struct rcu_ctrlblk *rcp,
-					struct rcu_data *rdp)
+static void rcu_do_batch(struct rcu_data *rdp)
 {
-	long completed_snap;
+	unsigned long flags;
+	struct rcu_head *next, *list, **tail;
+	int count;
 
-	if (rdp->nxtlist) {
-		local_irq_disable();
-		completed_snap = ACCESS_ONCE(rcp->completed);
+	/* If no callbacks are ready, just return.*/
+	if (!cpu_has_callbacks_ready_to_invoke(rdp))
+		return;
 
-		/*
-		 * move the other grace-period-completed entries to
-		 * [rdp->nxtlist, *rdp->nxttail[0]) temporarily
-		 */
-		if (!rcu_batch_before(completed_snap, rdp->batch))
-			rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2];
-		else if (!rcu_batch_before(completed_snap, rdp->batch - 1))
-			rdp->nxttail[0] = rdp->nxttail[1];
+	/*
+	 * Extract the list of ready callbacks, disabling to prevent
+	 * races with call_rcu() from interrupt handlers.
+	 */
+	local_irq_save(flags);
+	list = rdp->nxtlist;
+	rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
+	*rdp->nxttail[RCU_DONE_TAIL] = NULL;
+	tail = rdp->nxttail[RCU_DONE_TAIL];
+	for (count = RCU_NEXT_SIZE - 1; count >= 0; count--)
+		if (rdp->nxttail[count] == rdp->nxttail[RCU_DONE_TAIL])
+			rdp->nxttail[count] = &rdp->nxtlist;
+	local_irq_restore(flags);
 
-		/*
-		 * the grace period for entries in
-		 * [rdp->nxtlist, *rdp->nxttail[0]) has completed and
-		 * move these entries to donelist
-		 */
-		if (rdp->nxttail[0] != &rdp->nxtlist) {
-			*rdp->donetail = rdp->nxtlist;
-			rdp->donetail = rdp->nxttail[0];
-			rdp->nxtlist = *rdp->nxttail[0];
-			*rdp->donetail = NULL;
-
-			if (rdp->nxttail[1] == rdp->nxttail[0])
-				rdp->nxttail[1] = &rdp->nxtlist;
-			if (rdp->nxttail[2] == rdp->nxttail[0])
-				rdp->nxttail[2] = &rdp->nxtlist;
-			rdp->nxttail[0] = &rdp->nxtlist;
-		}
+	/* Invoke callbacks. */
+	count = 0;
+	while (list) {
+		next = list->next;
+		prefetch(next);
+		list->func(list);
+		list = next;
+		if (++count >= rdp->blimit)
+			break;
+	}
+
+	/* Update count, and requeue any remaining callbacks. */
+	local_irq_save(flags);
+	rdp->qlen -= count;
+	if (list != NULL) {
+		*tail = rdp->nxtlist;
+		rdp->nxtlist = list;
+		for (count = 0; count < RCU_NEXT_SIZE; count++)
+			if (&rdp->nxtlist == rdp->nxttail[count])
+				rdp->nxttail[count] = tail;
+			else
+				break;
+	}
+	local_irq_restore(flags);
 
-		local_irq_enable();
+	/* Reinstate batch limit if we have worked down the excess. */
+	if (rdp->blimit == INT_MAX && rdp->qlen <= qlowmark)
+		rdp->blimit = blimit;
 
-		if (rcu_batch_after(rdp->batch, rcp->pending)) {
-			unsigned long flags;
+	/* Re-raise the RCU softirq if there are callbacks remaining. */
+	if (cpu_has_callbacks_ready_to_invoke(rdp))
+		raise_rcu_softirq();
+}
 
-			/* and start it/schedule start if it's a new batch */
-			spin_lock_irqsave(&rcp->lock, flags);
-			if (rcu_batch_after(rdp->batch, rcp->pending)) {
-				rcp->pending = rdp->batch;
-				rcu_start_batch(rcp);
-			}
-			spin_unlock_irqrestore(&rcp->lock, flags);
-		}
+/*
+ * This does the RCU processing work from softirq context for the
+ * specified rcu_state and rcu_data structures.
+ */
+static void
+__rcu_process_callbacks(struct rcu_state *rsp, struct rcu_data *rdp)
+{
+	unsigned long flags;
+
+	/*
+	 * Advance callbacks in response to end of earlier grace
+	 * period that some other CPU ended.
+	 */
+	rcu_process_gp_end(rsp, rdp);
+
+	/* Update RCU state based on any recent quiescent states. */
+	rcu_check_quiescent_state(rsp, rdp);
+
+	/* Does this CPU require a not-yet-started grace period? */
+	if (cpu_needs_another_gp(rsp, rdp)) {
+		spin_lock_irqsave(&rcu_get_root(rsp)->lock, flags);
+		rcu_start_gp(rsp, rdp, flags);  /* releases rsp->lock */
 	}
 
-	rcu_check_quiescent_state(rcp, rdp);
-	if (rdp->donelist)
-		rcu_do_batch(rdp);
+	/* If there are callbacks ready, invoke them. */
+	rcu_do_batch(rdp);
 }
 
+/*
+ * Do softirq processing for the current CPU.
+ */
 static void rcu_process_callbacks(struct softirq_action *unused)
 {
 	/*
 	 * Memory references from any prior RCU read-side critical sections
-	 * executed by the interrupted code must be see before any RCU
+	 * executed by the interrupted code must be seen before any RCU
 	 * grace-period manupulations below.
 	 */
 
 	smp_mb(); /* See above block comment. */
 
-	__rcu_process_callbacks(&rcu_ctrlblk, &__get_cpu_var(rcu_data));
-	__rcu_process_callbacks(&rcu_bh_ctrlblk, &__get_cpu_var(rcu_bh_data));
+	__rcu_process_callbacks(&rcu_state, &__get_cpu_var(rcu_data));
+	__rcu_process_callbacks(&rcu_bh_state, &__get_cpu_var(rcu_bh_data));
 
 	/*
 	 * Memory references from any later RCU read-side critical sections
-	 * executed by the interrupted code must be see after any RCU
+	 * executed by the interrupted code must be seen after any RCU
 	 * grace-period manupulations above.
 	 */
 
 	smp_mb(); /* See above block comment. */
 }
 
-static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
+/*
+ * Check to see if there is any immediate RCU-related work to be done
+ * by the current CPU, for the specified type of RCU, returning 1 if so.
+ * The checks are in order of increasing expense: checks that can be
+ * carried out against CPU-local state are performed first.  However,
+ * we must check for CPU stalls first, else we might not get a chance.
+ */
+static int __rcu_pending(struct rcu_state *rsp, struct rcu_data *rdp)
 {
 	/* Check for CPU stalls, if enabled. */
-	check_cpu_stall(rcp, rdp);
+	check_cpu_stall(rsp, rdp);
 
-	if (rdp->nxtlist) {
-		long completed_snap = ACCESS_ONCE(rcp->completed);
+	/* Is the RCU core waiting for a quiescent state from this CPU? */
+	if (rdp->qs_pending)
+		return 1;
 
-		/*
-		 * This cpu has pending rcu entries and the grace period
-		 * for them has completed.
-		 */
-		if (!rcu_batch_before(completed_snap, rdp->batch))
-			return 1;
-		if (!rcu_batch_before(completed_snap, rdp->batch - 1) &&
-				rdp->nxttail[0] != rdp->nxttail[1])
-			return 1;
-		if (rdp->nxttail[0] != &rdp->nxtlist)
-			return 1;
+	/* Does this CPU have finished callbacks to invoke? */
+	if (cpu_has_callbacks_ready_to_invoke(rdp))
+		return 1;
 
-		/*
-		 * This cpu has pending rcu entries and the new batch
-		 * for then hasn't been started nor scheduled start
-		 */
-		if (rcu_batch_after(rdp->batch, rcp->pending))
-			return 1;
-	}
+	/* Are there callbacks waiting for a GP that needs to be started? */
+	if (cpu_needs_another_gp(rsp, rdp))
+		return 1;
 
-	/* This cpu has finished callbacks to invoke */
-	if (rdp->donelist)
+	/* Has another RCU grace period completed?  */
+	if (ACCESS_ONCE(rsp->completed) != rdp->completed) /* outside of lock */
 		return 1;
 
-	/* The rcu core waits for a quiescent state from the cpu */
-	if (rdp->quiescbatch != rcp->cur || rdp->qs_pending)
+	/* Has a new RCU grace period started? */
+	if (ACCESS_ONCE(rsp->gpnum) != rdp->gpnum) /* outside of lock */
 		return 1;
 
 	/* nothing to do */
@@ -643,8 +986,8 @@ static int __rcu_pending(struct rcu_ctrlblk *rcp, struct rcu_data *rdp)
  */
 int rcu_pending(int cpu)
 {
-	return __rcu_pending(&rcu_ctrlblk, &per_cpu(rcu_data, cpu)) ||
-		__rcu_pending(&rcu_bh_ctrlblk, &per_cpu(rcu_bh_data, cpu));
+	return __rcu_pending(&rcu_state, &per_cpu(rcu_data, cpu)) ||
+	       __rcu_pending(&rcu_bh_state, &per_cpu(rcu_bh_data, cpu));
 }
 
 /*
@@ -658,14 +1001,19 @@ int rcu_needs_cpu(int cpu)
 	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
 	struct rcu_data *rdp_bh = &per_cpu(rcu_bh_data, cpu);
 
-	return !!rdp->nxtlist || !!rdp_bh->nxtlist || rcu_pending(cpu);
+	return !!*rdp->nxttail[RCU_DONE_TAIL] ||
+	       !!*rdp_bh->nxttail[RCU_DONE_TAIL] ||
+	       rcu_pending(cpu);
 }
 
 /*
- * Top-level function driving RCU grace-period detection, normally
- * invoked from the scheduler-clock interrupt.  This function simply
- * increments counters that are read only from softirq by this same
- * CPU, so there are no memory barriers required.
+ * Check to see if this CPU is in a non-context-switch quiescent state
+ * (user mode or idle loop for rcu, non-softirq execution for rcu_bh).
+ * Also schedule the RCU softirq handler.
+ *
+ * This function must be called with hardirqs disabled.  It is normally
+ * invoked from the scheduling-clock interrupt.  If rcu_pending returns
+ * false, there is no point in invoking rcu_check_callbacks().
  */
 void rcu_check_callbacks(int cpu, int user)
 {
@@ -707,20 +1055,224 @@ void rcu_check_callbacks(int cpu, int user)
 	raise_rcu_softirq();
 }
 
-static void rcu_init_percpu_data(int cpu, struct rcu_ctrlblk *rcp,
-						struct rcu_data *rdp)
+#ifdef CONFIG_SMP
+
+/*
+ * Scan the leaf rcu_node structures, processing dyntick state for any that
+ * have not yet encountered a quiescent state, using the function specified.
+ * Returns 1 if the current grace period ends while scanning (possibly
+ * because we made it end).
+ */
+static int
+rcu_process_dyntick(struct rcu_state *rsp, long lastcomp, int (*f)(int))
+{
+	unsigned long bit;
+	int cpu;
+	unsigned long flags;
+	unsigned long mask;
+	struct rcu_data *rdp = &__get_cpu_var(rcu_data);
+	struct rcu_node *rnp_cur = rsp->level[NUM_RCU_LVLS - 1];
+	struct rcu_node *rnp_end = &rsp->node[NUM_RCU_NODES];
+
+	for (; rnp_cur < rnp_end; rnp_cur++) {
+		mask = 0;
+		spin_lock_irqsave(&rnp_cur->lock, flags);
+		if (rsp->completed != lastcomp) {
+			spin_unlock_irqrestore(&rnp_cur->lock, flags);
+			return 1;
+		}
+		if (rnp_cur->qsmask == 0) {
+			spin_unlock_irqrestore(&rnp_cur->lock, flags);
+			continue;
+		}
+		cpu = rnp_cur->grplo;
+		bit = 1;
+		mask = 0;
+		for (; cpu <= rnp_cur->grphi; cpu++, bit <<= 1) {
+			if ((rnp_cur->qsmask & bit) != 0L && f(cpu))
+				mask |= bit;
+		}
+		if (mask != 0) {
+			cpu_quiet_msk(mask, rsp, rdp, rnp_cur,
+				      &lastcomp, flags);
+			continue;
+		}
+		spin_unlock_irqrestore(&rnp_cur->lock, flags);
+	}
+	return 0;
+}
+
+/*
+ * Force quiescent states on reluctant CPUs, and also detect which
+ * CPUs are in dyntick-idle mode.
+ */
+static void force_quiescent_state(struct rcu_state *rsp)
 {
-	long flags;
+	unsigned long flags;
+	long lastcomp;
+	struct rcu_node *rnp = rcu_get_root(rsp);
+	u8 signaled;
 
-	spin_lock_irqsave(&rcp->lock, flags);
-	memset(rdp, 0, sizeof(*rdp));
-	rdp->nxttail[0] = rdp->nxttail[1] = rdp->nxttail[2] = &rdp->nxtlist;
-	rdp->donetail = &rdp->donelist;
-	rdp->quiescbatch = rcp->completed;
+	if (!spin_trylock_irqsave(&rsp->onofflock, flags))
+		return;
+	spin_lock(&rnp->lock);
+	lastcomp = rsp->completed;
+	signaled = rsp->signaled;
+	spin_unlock(&rnp->lock);
+	switch (signaled) {
+	case RCU_SAVE_DYNTICK:
+
+		/* Record dyntick-idle state. */
+		if (rcu_process_dyntick(rsp, lastcomp,
+					dyntick_save_progress_counter))
+			goto unlock_ret;
+
+		/* Update state, record completion counter. */
+		spin_lock(&rnp->lock);
+		if (lastcomp == rsp->completed) {
+			rsp->signaled = RCU_FORCE_QS;
+			dyntick_save_completed(rsp, lastcomp);
+		}
+		spin_unlock(&rnp->lock);
+		break;
+
+	case RCU_FORCE_QS:
+
+		/* Check dyntick-idle state, send IPI to laggarts. */
+		if (rcu_process_dyntick(rsp, dyntick_get_completed(rsp),
+					rcu_implicit_dynticks_qs))
+			goto unlock_ret;
+
+		/* Update state. */
+		spin_lock(&rnp->lock);
+		if (lastcomp == rsp->completed)
+			rsp->signaled = RCU_SIGNALED;
+		spin_unlock(&rnp->lock);
+		break;
+
+	case RCU_SIGNALED:
+		break;
+	}
+unlock_ret:
+	spin_unlock_irqrestore(&rsp->onofflock, flags);
+}
+
+#else /* #ifdef CONFIG_SMP */
+
+static void force_quiescent_state(struct rcu_state *rsp)
+{
+	set_need_resched();
+}
+
+#endif /* #else #ifdef CONFIG_SMP */
+
+static void
+__call_rcu(struct rcu_head *head, struct rcu_state *rsp, struct rcu_data *rdp)
+{
+	smp_mb(); /* Ensure RCU update seen before callback registry. */
+
+	/*
+	 * Opportunistically note grace-period endings and beginnings.
+	 * Note that we might see a beginning right after we see an
+	 * end, but never vice versa, since this CPU has to pass through
+	 * a quiescent state betweentimes.
+	 */
+	rcu_process_gp_end(rsp, rdp);
+	check_for_new_grace_period(rsp, rdp);
+
+	*rdp->nxttail[RCU_NEXT_TAIL] = head;
+	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
+
+	if (unlikely(++rdp->qlen > qhimark)) {
+		rdp->blimit = INT_MAX;
+		force_quiescent_state(rsp);
+	}
+}
+
+/*
+ * Queue an RCU callback for invocation after a grace period.
+ */
+void call_rcu(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
+{
+	unsigned long flags;
+
+	head->func = func;
+	head->next = NULL;
+	local_irq_save(flags);
+	__call_rcu(head, &rcu_state, &__get_cpu_var(rcu_data));
+	local_irq_restore(flags);
+}
+EXPORT_SYMBOL_GPL(call_rcu);
+
+/*
+ * Queue an RCU for invocation after a quicker grace period.
+ */
+void call_rcu_bh(struct rcu_head *head, void (*func)(struct rcu_head *rcu))
+{
+	unsigned long flags;
+
+	head->func = func;
+	head->next = NULL;
+	local_irq_save(flags);
+	__call_rcu(head, &rcu_bh_state, &__get_cpu_var(rcu_bh_data));
+	local_irq_restore(flags);
+}
+EXPORT_SYMBOL_GPL(call_rcu_bh);
+
+/*
+ * Initialize a CPU's per-CPU RCU data.  We take this "scorched earth"
+ * approach so that we don't have to worry about how long the CPU has
+ * been gone, or whether it ever was online previously.  We do trust the
+ * ->mynode field, as it is constant for a given struct rcu_data and
+ * initialized during early boot.
+ *
+ * Note that only one online or offline event can be happening at a given
+ * time.  Note also that we can accept some slop in the rsp->completed
+ * access due to the fact that this CPU cannot possibly have any RCU
+ * callbacks in flight yet.
+ */
+static void
+rcu_init_percpu_data(int cpu, struct rcu_state *rsp, struct rcu_data *rdp)
+{
+	long completed_snap;
+	unsigned long flags;
+	int i;
+	long mask;
+	struct rcu_node *rnp = rdp->mynode;
+
+	/* Exclude any attempts to start a new grace period. */
+	spin_lock_irqsave(&rsp->onofflock, flags);
+
+	spin_lock(&rnp->lock);		/* irqs already disabled. */
+	completed_snap = ACCESS_ONCE(rsp->completed); /* outside of lock */
+	rdp->completed = completed_snap;
+	rdp->gpnum = completed_snap;
+	rdp->passed_quiesc = 1;
 	rdp->qs_pending = 0;
-	rdp->cpu = cpu;
+	rdp->nxtlist = NULL;
+	for (i = 0; i < RCU_NEXT_SIZE; i++)
+		rdp->nxttail[i] = &rdp->nxtlist;
 	rdp->blimit = blimit;
-	spin_unlock_irqrestore(&rcp->lock, flags);
+#ifdef CONFIG_NO_HZ
+	rdp->dynticks = 1;
+	rdp->dynticks_nesting = 0;
+#endif /* #ifdef CONFIG_NO_HZ */
+	rdp->cpu = cpu;
+
+	/* Add CPU to rcu_node bitmasks. */
+
+	mask = 1L << (cpu - rnp->grplo);
+	for (;;) {
+		rnp->qsmaskinit |= mask;
+		mask = 1L << rnp->grpnum;
+		spin_unlock(&rnp->lock); /* irqs already disabled. */
+		rnp = rnp->parent;
+		if ((rnp == NULL) || !!(rnp->qsmaskinit & mask))
+			break;
+		spin_lock(&rnp->lock);	/* irqs already disabled. */
+	}
+
+	spin_unlock_irqrestore(&rsp->onofflock, flags);
 }
 
 static void __cpuinit rcu_online_cpu(int cpu)
@@ -728,11 +1280,14 @@ static void __cpuinit rcu_online_cpu(int cpu)
 	struct rcu_data *rdp = &per_cpu(rcu_data, cpu);
 	struct rcu_data *bh_rdp = &per_cpu(rcu_bh_data, cpu);
 
-	rcu_init_percpu_data(cpu, &rcu_ctrlblk, rdp);
-	rcu_init_percpu_data(cpu, &rcu_bh_ctrlblk, bh_rdp);
+	rcu_init_percpu_data(cpu, &rcu_state, rdp);
+	rcu_init_percpu_data(cpu, &rcu_bh_state, bh_rdp);
 	open_softirq(RCU_SOFTIRQ, rcu_process_callbacks);
 }
 
+/*
+ * Handle CPU online/offline notifcation events.
+ */
 static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 				unsigned long action, void *hcpu)
 {
@@ -753,20 +1308,81 @@ static int __cpuinit rcu_cpu_notify(struct notifier_block *self,
 	return NOTIFY_OK;
 }
 
+/*
+ * Helper function for rcu_init() that initializes one rcu_state structure.
+ */
+static void __init rcu_init_one(struct rcu_state *rsp)
+{
+	int i;
+	int j;
+	struct rcu_node *rnp;
+
+	/* Initialize the level-tracking arrays. */
+
+	for (i = 1; i < NUM_RCU_LVLS; i++) {
+		rsp->level[i] = rsp->level[i - 1] + rsp->levelcnt[i - 1];
+	}
+	rcu_init_levelspread(rsp);
+
+	/* Initialize the elements themselves, starting from the leaves. */
+
+	for (i = NUM_RCU_LVLS - 1; i >= 0; i--) {
+		rnp = rsp->level[i];
+		for (j = 0; j < rsp->levelcnt[i]; j++, rnp++) {
+			spin_lock_init(&rnp->lock);
+			rnp->qsmask = 0;
+			rnp->grplo = j * rsp->levelspread[i];
+			rnp->grphi = (j + 1) * rsp->levelspread[i] - 1;
+			if (rnp->grphi >= rsp->levelcnt[i + 1])
+				rnp->grphi = rsp->levelcnt[i + 1] - 1;
+			rnp->qsmaskinit = 0;
+			if (i != NUM_RCU_LVLS - 1)
+				rnp->grplo = rnp->grphi = 0;
+			if (i == 0) {
+				rnp->grpnum = 0;
+				rnp->parent = NULL;
+			} else {
+				rnp->grpnum = j % rsp->levelspread[i - 1];
+				rnp->parent = rsp->level[i - 1] + 
+					      j / rsp->levelspread[i - 1];
+			}
+			rnp->level = i;
+		}
+	}
+}
+
+/*
+ * Helper macro for rcu_init().  To be used nowhere else!
+ * Assigns leaf node pointers into each CPU's rcu_data structure.
+ */
+#define RCU_DATA_PTR_INIT(rsp, rcu_data) \
+do { \
+	rnp = (rsp)->level[NUM_RCU_LVLS - 1]; \
+	j = 0; \
+	for_each_possible_cpu(i) { \
+		if (i > rnp[j].grphi) \
+			j++; \
+		per_cpu(rcu_data, i).mynode = &rnp[j]; \
+	} \
+} while (0)
+
 static struct notifier_block __cpuinitdata rcu_nb = {
 	.notifier_call	= rcu_cpu_notify,
 };
 
-/*
- * Initializes rcu mechanism.  Assumed to be called early.
- * That is before local timer(SMP) or jiffie timer (uniproc) is setup.
- * Note that rcu_qsctr and friends are implicitly
- * initialized due to the choice of ``0'' for RCU_CTR_INVALID.
- */
 void __init __rcu_init(void)
 {
-	rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE,
-			(void *)(long)smp_processor_id());
+	int i;			/* All used by RCU_DATA_PTR_INIT(). */
+	int j;
+	struct rcu_node *rnp;
+
+	rcu_init_one(&rcu_state);
+	RCU_DATA_PTR_INIT(&rcu_state, rcu_data);
+	rcu_init_one(&rcu_bh_state);
+	RCU_DATA_PTR_INIT(&rcu_bh_state, rcu_bh_data);
+
+	for_each_online_cpu(i)
+		rcu_cpu_notify(&rcu_nb, CPU_UP_PREPARE, (void *)(long)i);
 	/* Register notifier for non-boot CPUs */
 	register_cpu_notifier(&rcu_nb);
 }
diff --git a/kernel/rcuclassic_trace.c b/kernel/rcuclassic_trace.c
new file mode 100644
index 0000000..100d757
--- /dev/null
+++ b/kernel/rcuclassic_trace.c
@@ -0,0 +1,218 @@
+/*
+ * Read-Copy Update tracing for classic implementation
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place - Suite 330, Boston, MA 02111-1307, USA.
+ *
+ * Copyright IBM Corporation, 2008
+ *
+ * Papers:  http://www.rdrop.com/users/paulmck/RCU
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ * 		Documentation/RCU
+ *
+ */
+#include <linux/types.h>
+#include <linux/kernel.h>
+#include <linux/init.h>
+#include <linux/spinlock.h>
+#include <linux/smp.h>
+#include <linux/rcupdate.h>
+#include <linux/interrupt.h>
+#include <linux/sched.h>
+#include <asm/atomic.h>
+#include <linux/bitops.h>
+#include <linux/module.h>
+#include <linux/completion.h>
+#include <linux/moduleparam.h>
+#include <linux/percpu.h>
+#include <linux/notifier.h>
+#include <linux/cpu.h>
+#include <linux/mutex.h>
+#include <linux/debugfs.h>
+
+static DEFINE_MUTEX(rcuclassic_trace_mutex);
+static char *rcuclassic_trace_buf;
+#define RCUPREEMPT_TRACE_BUF_SIZE 4096
+
+static int print_one_rcu_data(struct rcu_data *rdp, char *buf, char *ebuf)
+{
+	int cnt = 0;
+
+	cnt += snprintf(&buf[cnt], ebuf - &buf[cnt],
+		"%3d completed=%ld gpnum=%ld passed_q: %d qs_pending: %d",
+		rdp->cpu,
+		rdp->completed, rdp->gpnum,
+		rdp->passed_quiesc, rdp->qs_pending);
+	cnt += snprintf(&buf[cnt], ebuf - &buf[cnt],
+		" qlen: %ld blimit: %ld\n", rdp->qlen, rdp->blimit);
+	return cnt;
+}
+
+#define PRINT_RCU_DATA(name, buf, ebuf) \
+	do { \
+		int _p_r_d_i; \
+		\
+		for_each_online_cpu(_p_r_d_i) \
+			(buf) += print_one_rcu_data(&per_cpu(name, _p_r_d_i), \
+						    buf, ebuf); \
+	} while (0)
+
+static ssize_t rcudata_read(struct file *filp, char __user *buffer,
+				size_t count, loff_t *ppos)
+{
+	ssize_t bcount;
+	char *buf = rcuclassic_trace_buf;
+	char *ebuf = &rcuclassic_trace_buf[RCUPREEMPT_TRACE_BUF_SIZE];
+
+	mutex_lock(&rcuclassic_trace_mutex);
+	buf += snprintf(buf, ebuf - buf, "rcu:\n");
+	PRINT_RCU_DATA(rcu_data, buf, ebuf);
+	buf += snprintf(buf, ebuf - buf, "rcu_bh:\n");
+	PRINT_RCU_DATA(rcu_bh_data, buf, ebuf);
+	bcount = simple_read_from_buffer(buffer, count, ppos,
+			rcuclassic_trace_buf, strlen(rcuclassic_trace_buf));
+	mutex_unlock(&rcuclassic_trace_mutex);
+	return bcount;
+}
+
+static int print_one_rcu_state(struct rcu_state *rsp, char *buf, char *ebuf)
+{
+	int cnt = 0;
+	int level = 0;
+	struct rcu_node *rnp;
+
+	cnt += snprintf(&buf[cnt], ebuf - &buf[cnt],
+			"completed: %ld gpnum: %ld signaled: %d\n",
+			rsp->completed, rsp->gpnum, rsp->signaled);
+	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < NUM_RCU_NODES; rnp++) {
+		if (rnp->level != level) {
+			cnt += snprintf(&buf[cnt], ebuf - &buf[cnt], "\n");
+			level = rnp->level;
+		}
+		cnt += snprintf(&buf[cnt], ebuf - &buf[cnt],
+				"%lx/%lx %d:%d ^%d    ",
+				rnp->qsmask, rnp->qsmaskinit,
+				rnp->grplo, rnp->grphi, rnp->grpnum);
+	}
+	cnt += snprintf(&buf[cnt], ebuf - &buf[cnt], "\n");
+	return cnt;
+}
+
+static ssize_t rcuhier_read(struct file *filp, char __user *buffer,
+				size_t count, loff_t *ppos)
+{
+	ssize_t bcount;
+	char *buf = rcuclassic_trace_buf;
+	char *ebuf = &rcuclassic_trace_buf[RCUPREEMPT_TRACE_BUF_SIZE];
+
+	mutex_lock(&rcuclassic_trace_mutex);
+	buf += snprintf(buf, ebuf - buf, "rcu:\n");
+	buf += print_one_rcu_state(&rcu_state, buf, ebuf);
+	buf += snprintf(buf, ebuf - buf, "rcu_bh:\n");
+	buf += print_one_rcu_state(&rcu_bh_state, buf, ebuf);
+	bcount = simple_read_from_buffer(buffer, count, ppos,
+			rcuclassic_trace_buf, strlen(rcuclassic_trace_buf));
+	mutex_unlock(&rcuclassic_trace_mutex);
+	return bcount;
+}
+
+static ssize_t rcugp_read(struct file *filp, char __user *buffer,
+				size_t count, loff_t *ppos)
+{
+	ssize_t bcount;
+	char *buf = rcuclassic_trace_buf;
+	char *ebuf = &rcuclassic_trace_buf[RCUPREEMPT_TRACE_BUF_SIZE];
+
+	mutex_lock(&rcuclassic_trace_mutex);
+	buf += snprintf(buf, ebuf - buf, "rcu: completed=%ld  gpnum=%ld\n",
+			rcu_state.completed, rcu_state.gpnum);
+	buf += snprintf(buf, ebuf - buf, "rcu_bh: completed=%ld  gpnum=%ld\n",
+			rcu_bh_state.completed, rcu_bh_state.gpnum);
+	bcount = simple_read_from_buffer(buffer, count, ppos,
+			rcuclassic_trace_buf, strlen(rcuclassic_trace_buf));
+	mutex_unlock(&rcuclassic_trace_mutex);
+	return bcount;
+}
+
+static struct file_operations rcudata_fops = {
+	.owner = THIS_MODULE,
+	.read = rcudata_read,
+};
+
+static struct file_operations rcuhier_fops = {
+	.owner = THIS_MODULE,
+	.read = rcuhier_read,
+};
+
+static struct file_operations rcugp_fops = {
+	.owner = THIS_MODULE,
+	.read = rcugp_read,
+};
+
+static struct dentry *rcudir, *datadir, *hierdir, *gpdir;
+static int rcuclassic_debugfs_init(void)
+{
+	rcudir = debugfs_create_dir("rcu", NULL);
+	if (!rcudir)
+		goto out;
+	datadir = debugfs_create_file("rcudata", 0444, rcudir,
+						NULL, &rcudata_fops);
+	if (!datadir)
+		goto free_out;
+
+	gpdir = debugfs_create_file("rcugp", 0444, rcudir, NULL, &rcugp_fops);
+	if (!gpdir)
+		goto free_out;
+
+	hierdir = debugfs_create_file("rcuhier", 0444, rcudir,
+						NULL, &rcuhier_fops);
+	if (!hierdir)
+		goto free_out;
+	return 0;
+free_out:
+	if (datadir)
+		debugfs_remove(datadir);
+	if (gpdir)
+		debugfs_remove(gpdir);
+	debugfs_remove(rcudir);
+out:
+	return 1;
+}
+
+static int __init rcuclassic_trace_init(void)
+{
+	int ret;
+
+	rcuclassic_trace_buf = kmalloc(RCUPREEMPT_TRACE_BUF_SIZE, GFP_KERNEL);
+	if (!rcuclassic_trace_buf)
+		return 1;
+	ret = rcuclassic_debugfs_init();
+	if (ret)
+		kfree(rcuclassic_trace_buf);
+	return ret;
+}
+
+static void __exit rcuclassic_trace_cleanup(void)
+{
+	debugfs_remove(datadir);
+	debugfs_remove(gpdir);
+	debugfs_remove(hierdir);
+	debugfs_remove(rcudir);
+	kfree(rcuclassic_trace_buf);
+}
+
+
+module_init(rcuclassic_trace_init);
+module_exit(rcuclassic_trace_cleanup);
--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ