lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1492621117-13939-37-git-send-email-paulmck@linux.vnet.ibm.com>
Date:   Wed, 19 Apr 2017 09:58:34 -0700
From:   "Paul E. McKenney" <paulmck@...ux.vnet.ibm.com>
To:     linux-kernel@...r.kernel.org
Cc:     mingo@...nel.org, jiangshanlai@...il.com, dipankar@...ibm.com,
        akpm@...ux-foundation.org, mathieu.desnoyers@...icios.com,
        josh@...htriplett.org, tglx@...utronix.de, peterz@...radead.org,
        rostedt@...dmis.org, dhowells@...hat.com, edumazet@...gle.com,
        fweisbec@...il.com, oleg@...hat.com, bobby.prani@...il.com,
        "Paul E. McKenney" <paulmck@...ux.vnet.ibm.com>
Subject: [PATCH v3 tip/core/rcu 37/40] srcu: Introduce CLASSIC_SRCU Kconfig option

The TREE_SRCU rewrite is large and a bit on the non-simple side, so
this commit helps reduce risk by allowing the old v4.11 SRCU algorithm
to be selected using a new CLASSIC_SRCU Kconfig option that depends
on RCU_EXPERT.  The default is to use the new TREE_SRCU and TINY_SRCU
algorithms, in order to help get these the testing that they need.
However, if your users do not require the update-side scalability that
is to be provided by TREE_SRCU, select RCU_EXPERT and then CLASSIC_SRCU
to revert back to the old classic SRCU algorithm.

Signed-off-by: Paul E. McKenney <paulmck@...ux.vnet.ibm.com>
---
 include/linux/srcu.h        |   2 +
 include/linux/srcuclassic.h | 101 ++++++++
 init/Kconfig                |  21 +-
 kernel/rcu/Makefile         |   3 +-
 kernel/rcu/rcutorture.c     |   2 +-
 kernel/rcu/srcu.c           | 347 ++++++++++++++-----------
 kernel/rcu/srcutree.c       | 613 ++++++++++++++++++++++++++++++++++++++++++++
 7 files changed, 934 insertions(+), 155 deletions(-)
 create mode 100644 include/linux/srcuclassic.h
 create mode 100644 kernel/rcu/srcutree.c

diff --git a/include/linux/srcu.h b/include/linux/srcu.h
index 907f09b14eda..167ad8831aaf 100644
--- a/include/linux/srcu.h
+++ b/include/linux/srcu.h
@@ -60,6 +60,8 @@ int init_srcu_struct(struct srcu_struct *sp);
 #include <linux/srcutiny.h>
 #elif defined(CONFIG_TREE_SRCU)
 #include <linux/srcutree.h>
+#elif defined(CONFIG_CLASSIC_SRCU)
+#include <linux/srcuclassic.h>
 #else
 #error "Unknown SRCU implementation specified to kernel configuration"
 #endif
diff --git a/include/linux/srcuclassic.h b/include/linux/srcuclassic.h
new file mode 100644
index 000000000000..41cf99930f34
--- /dev/null
+++ b/include/linux/srcuclassic.h
@@ -0,0 +1,101 @@
+/*
+ * Sleepable Read-Copy Update mechanism for mutual exclusion,
+ *	classic v4.11 variant.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, you can access it online at
+ * http://www.gnu.org/licenses/gpl-2.0.html.
+ *
+ * Copyright (C) IBM Corporation, 2017
+ *
+ * Author: Paul McKenney <paulmck@...ibm.com>
+ */
+
+#ifndef _LINUX_SRCU_CLASSIC_H
+#define _LINUX_SRCU_CLASSIC_H
+
+struct srcu_array {
+	unsigned long lock_count[2];
+	unsigned long unlock_count[2];
+};
+
+struct rcu_batch {
+	struct rcu_head *head, **tail;
+};
+
+#define RCU_BATCH_INIT(name) { NULL, &(name.head) }
+
+struct srcu_struct {
+	unsigned long completed;
+	struct srcu_array __percpu *per_cpu_ref;
+	spinlock_t queue_lock; /* protect ->batch_queue, ->running */
+	bool running;
+	/* callbacks just queued */
+	struct rcu_batch batch_queue;
+	/* callbacks try to do the first check_zero */
+	struct rcu_batch batch_check0;
+	/* callbacks done with the first check_zero and the flip */
+	struct rcu_batch batch_check1;
+	struct rcu_batch batch_done;
+	struct delayed_work work;
+#ifdef CONFIG_DEBUG_LOCK_ALLOC
+	struct lockdep_map dep_map;
+#endif /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
+};
+
+void process_srcu(struct work_struct *work);
+
+#define __SRCU_STRUCT_INIT(name)					\
+	{								\
+		.completed = -300,					\
+		.per_cpu_ref = &name##_srcu_array,			\
+		.queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock),	\
+		.running = false,					\
+		.batch_queue = RCU_BATCH_INIT(name.batch_queue),	\
+		.batch_check0 = RCU_BATCH_INIT(name.batch_check0),	\
+		.batch_check1 = RCU_BATCH_INIT(name.batch_check1),	\
+		.batch_done = RCU_BATCH_INIT(name.batch_done),		\
+		.work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\
+		__SRCU_DEP_MAP_INIT(name)				\
+	}
+
+/*
+ * Define and initialize a srcu struct at build time.
+ * Do -not- call init_srcu_struct() nor cleanup_srcu_struct() on it.
+ *
+ * Note that although DEFINE_STATIC_SRCU() hides the name from other
+ * files, the per-CPU variable rules nevertheless require that the
+ * chosen name be globally unique.  These rules also prohibit use of
+ * DEFINE_STATIC_SRCU() within a function.  If these rules are too
+ * restrictive, declare the srcu_struct manually.  For example, in
+ * each file:
+ *
+ *	static struct srcu_struct my_srcu;
+ *
+ * Then, before the first use of each my_srcu, manually initialize it:
+ *
+ *	init_srcu_struct(&my_srcu);
+ *
+ * See include/linux/percpu-defs.h for the rules on per-CPU variables.
+ */
+#define __DEFINE_SRCU(name, is_static)					\
+	static DEFINE_PER_CPU(struct srcu_array, name##_srcu_array);\
+	is_static struct srcu_struct name = __SRCU_STRUCT_INIT(name)
+#define DEFINE_SRCU(name)		__DEFINE_SRCU(name, /* not static */)
+#define DEFINE_STATIC_SRCU(name)	__DEFINE_SRCU(name, static)
+
+void synchronize_srcu_expedited(struct srcu_struct *sp);
+void srcu_barrier(struct srcu_struct *sp);
+unsigned long srcu_batches_completed(struct srcu_struct *sp);
+
+#endif
diff --git a/init/Kconfig b/init/Kconfig
index d269f2ca17b8..558cc3638ab9 100644
--- a/init/Kconfig
+++ b/init/Kconfig
@@ -526,15 +526,32 @@ config SRCU
 	  permits arbitrary sleeping or blocking within RCU read-side critical
 	  sections.
 
+config CLASSIC_SRCU
+	bool "Use v4.11 classic SRCU implementation"
+	default n
+	depends on RCU_EXPERT && SRCU
+	help
+	  This option selects the traditional well-tested classic SRCU
+	  implementation from v4.11, as might be desired for enterprise
+	  Linux distributions.  Without this option, the shiny new
+	  Tiny SRCU and Tree SRCU implementations are used instead.
+	  At some point, it is hoped that Tiny SRCU and Tree SRCU
+	  will accumulate enough test time and confidence to allow
+	  Classic SRCU to be dropped entirely.
+
+	  Say Y if you need a rock-solid SRCU.
+
+	  Say N if you would like help test Tree SRCU.
+
 config TINY_SRCU
 	bool
-	default y if TINY_RCU
+	default y if TINY_RCU && !CLASSIC_SRCU
 	help
 	  This option selects the single-CPU non-preemptible version of SRCU.
 
 config TREE_SRCU
 	bool
-	default y if !TINY_RCU
+	default y if !TINY_RCU && !CLASSIC_SRCU
 	help
 	  This option selects the full-fledged version of SRCU.
 
diff --git a/kernel/rcu/Makefile b/kernel/rcu/Makefile
index b853214a2b99..158e6593d58c 100644
--- a/kernel/rcu/Makefile
+++ b/kernel/rcu/Makefile
@@ -3,7 +3,8 @@
 KCOV_INSTRUMENT := n
 
 obj-y += update.o sync.o
-obj-$(CONFIG_TREE_SRCU) += srcu.o
+obj-$(CONFIG_CLASSIC_SRCU) += srcu.o
+obj-$(CONFIG_TREE_SRCU) += srcutree.o
 obj-$(CONFIG_TINY_SRCU) += srcutiny.o
 obj-$(CONFIG_RCU_TORTURE_TEST) += rcutorture.o
 obj-$(CONFIG_RCU_PERF_TEST) += rcuperf.o
diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c
index 9cbb8a7b909d..6f344b6748a8 100644
--- a/kernel/rcu/rcutorture.c
+++ b/kernel/rcu/rcutorture.c
@@ -562,7 +562,7 @@ static void srcu_torture_stats(void)
 	int __maybe_unused cpu;
 	int idx;
 
-#ifdef CONFIG_TREE_SRCU
+#if defined(CONFIG_TREE_SRCU) || defined(CONFIG_CLASSIC_SRCU)
 	idx = srcu_ctlp->completed & 0x1;
 	pr_alert("%s%s Tree SRCU per-CPU(idx=%d):",
 		 torture_type, TORTURE_FLAG, idx);
diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c
index 3cfcc59bddf3..584d8a983883 100644
--- a/kernel/rcu/srcu.c
+++ b/kernel/rcu/srcu.c
@@ -36,16 +36,75 @@
 #include <linux/delay.h>
 #include <linux/srcu.h>
 
-#include <linux/rcu_node_tree.h>
 #include "rcu.h"
 
+/*
+ * Initialize an rcu_batch structure to empty.
+ */
+static inline void rcu_batch_init(struct rcu_batch *b)
+{
+	b->head = NULL;
+	b->tail = &b->head;
+}
+
+/*
+ * Enqueue a callback onto the tail of the specified rcu_batch structure.
+ */
+static inline void rcu_batch_queue(struct rcu_batch *b, struct rcu_head *head)
+{
+	*b->tail = head;
+	b->tail = &head->next;
+}
+
+/*
+ * Is the specified rcu_batch structure empty?
+ */
+static inline bool rcu_batch_empty(struct rcu_batch *b)
+{
+	return b->tail == &b->head;
+}
+
+/*
+ * Remove the callback at the head of the specified rcu_batch structure
+ * and return a pointer to it, or return NULL if the structure is empty.
+ */
+static inline struct rcu_head *rcu_batch_dequeue(struct rcu_batch *b)
+{
+	struct rcu_head *head;
+
+	if (rcu_batch_empty(b))
+		return NULL;
+
+	head = b->head;
+	b->head = head->next;
+	if (b->tail == &head->next)
+		rcu_batch_init(b);
+
+	return head;
+}
+
+/*
+ * Move all callbacks from the rcu_batch structure specified by "from" to
+ * the structure specified by "to".
+ */
+static inline void rcu_batch_move(struct rcu_batch *to, struct rcu_batch *from)
+{
+	if (!rcu_batch_empty(from)) {
+		*to->tail = from->head;
+		to->tail = from->tail;
+		rcu_batch_init(from);
+	}
+}
+
 static int init_srcu_struct_fields(struct srcu_struct *sp)
 {
 	sp->completed = 0;
-	sp->srcu_gp_seq = 0;
-	atomic_set(&sp->srcu_exp_cnt, 0);
 	spin_lock_init(&sp->queue_lock);
-	rcu_segcblist_init(&sp->srcu_cblist);
+	sp->running = false;
+	rcu_batch_init(&sp->batch_queue);
+	rcu_batch_init(&sp->batch_check0);
+	rcu_batch_init(&sp->batch_check1);
+	rcu_batch_init(&sp->batch_done);
 	INIT_DELAYED_WORK(&sp->work, process_srcu);
 	sp->per_cpu_ref = alloc_percpu(struct srcu_array);
 	return sp->per_cpu_ref ? 0 : -ENOMEM;
@@ -180,8 +239,6 @@ static bool srcu_readers_active(struct srcu_struct *sp)
 	return sum;
 }
 
-#define SRCU_INTERVAL		1
-
 /**
  * cleanup_srcu_struct - deconstruct a sleep-RCU structure
  * @sp: structure to clean up.
@@ -197,16 +254,8 @@ static bool srcu_readers_active(struct srcu_struct *sp)
  */
 void cleanup_srcu_struct(struct srcu_struct *sp)
 {
-	WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt));
 	if (WARN_ON(srcu_readers_active(sp)))
 		return; /* Leakage unless caller handles error. */
-	if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist)))
-		return; /* Leakage unless caller handles error. */
-	flush_delayed_work(&sp->work);
-	if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) {
-		pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)));
-		return; /* Caller forgot to stop doing call_srcu()? */
-	}
 	free_percpu(sp->per_cpu_ref);
 	sp->per_cpu_ref = NULL;
 }
@@ -245,36 +294,26 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock);
  * We use an adaptive strategy for synchronize_srcu() and especially for
  * synchronize_srcu_expedited().  We spin for a fixed time period
  * (defined below) to allow SRCU readers to exit their read-side critical
- * sections.  If there are still some readers after a few microseconds,
- * we repeatedly block for 1-millisecond time periods.
+ * sections.  If there are still some readers after 10 microseconds,
+ * we repeatedly block for 1-millisecond time periods.  This approach
+ * has done well in testing, so there is no need for a config parameter.
  */
 #define SRCU_RETRY_CHECK_DELAY		5
+#define SYNCHRONIZE_SRCU_TRYCOUNT	2
+#define SYNCHRONIZE_SRCU_EXP_TRYCOUNT	12
 
 /*
- * Start an SRCU grace period.
- */
-static void srcu_gp_start(struct srcu_struct *sp)
-{
-	int state;
-
-	rcu_segcblist_accelerate(&sp->srcu_cblist,
-				 rcu_seq_snap(&sp->srcu_gp_seq));
-	rcu_seq_start(&sp->srcu_gp_seq);
-	state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
-	WARN_ON_ONCE(state != SRCU_STATE_SCAN1);
-}
-
-/*
- * Wait until all readers counted by array index idx complete, but
- * loop an additional time if there is an expedited grace period pending.
- * The caller must ensure that ->completed is not changed while checking.
+ * @@@ Wait until all pre-existing readers complete.  Such readers
+ * will have used the index specified by "idx".
+ * the caller should ensures the ->completed is not changed while checking
+ * and idx = (->completed & 1) ^ 1
  */
 static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
 {
 	for (;;) {
 		if (srcu_readers_active_idx_check(sp, idx))
 			return true;
-		if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0)
+		if (--trycount <= 0)
 			return false;
 		udelay(SRCU_RETRY_CHECK_DELAY);
 	}
@@ -300,19 +339,6 @@ static void srcu_flip(struct srcu_struct *sp)
 }
 
 /*
- * End an SRCU grace period.
- */
-static void srcu_gp_end(struct srcu_struct *sp)
-{
-	rcu_seq_end(&sp->srcu_gp_seq);
-
-	spin_lock_irq(&sp->queue_lock);
-	rcu_segcblist_advance(&sp->srcu_cblist,
-			      rcu_seq_current(&sp->srcu_gp_seq));
-	spin_unlock_irq(&sp->queue_lock);
-}
-
-/*
  * Enqueue an SRCU callback on the specified srcu_struct structure,
  * initiating grace-period processing if it is not already running.
  *
@@ -348,24 +374,26 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
 	head->func = func;
 	spin_lock_irqsave(&sp->queue_lock, flags);
 	smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
-	rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
-	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
-		srcu_gp_start(sp);
+	rcu_batch_queue(&sp->batch_queue, head);
+	if (!sp->running) {
+		sp->running = true;
 		queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
 	}
 	spin_unlock_irqrestore(&sp->queue_lock, flags);
 }
 EXPORT_SYMBOL_GPL(call_srcu);
 
-static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
+static void srcu_advance_batches(struct srcu_struct *sp, int trycount);
+static void srcu_reschedule(struct srcu_struct *sp);
 
 /*
  * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
  */
-static void __synchronize_srcu(struct srcu_struct *sp)
+static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
 {
 	struct rcu_synchronize rcu;
 	struct rcu_head *head = &rcu.head;
+	bool done = false;
 
 	RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) ||
 			 lock_is_held(&rcu_bh_lock_map) ||
@@ -373,8 +401,6 @@ static void __synchronize_srcu(struct srcu_struct *sp)
 			 lock_is_held(&rcu_sched_lock_map),
 			 "Illegal synchronize_srcu() in same-type SRCU (or in RCU) read-side critical section");
 
-	if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE)
-		return;
 	might_sleep();
 	init_completion(&rcu.completion);
 
@@ -382,47 +408,31 @@ static void __synchronize_srcu(struct srcu_struct *sp)
 	head->func = wakeme_after_rcu;
 	spin_lock_irq(&sp->queue_lock);
 	smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
-	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
+	if (!sp->running) {
 		/* steal the processing owner */
-		rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
-		srcu_gp_start(sp);
+		sp->running = true;
+		rcu_batch_queue(&sp->batch_check0, head);
 		spin_unlock_irq(&sp->queue_lock);
+
+		srcu_advance_batches(sp, trycount);
+		if (!rcu_batch_empty(&sp->batch_done)) {
+			BUG_ON(sp->batch_done.head != head);
+			rcu_batch_dequeue(&sp->batch_done);
+			done = true;
+		}
 		/* give the processing owner to work_struct */
-		srcu_reschedule(sp, 0);
+		srcu_reschedule(sp);
 	} else {
-		rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
+		rcu_batch_queue(&sp->batch_queue, head);
 		spin_unlock_irq(&sp->queue_lock);
 	}
 
-	wait_for_completion(&rcu.completion);
-	smp_mb(); /* Caller's later accesses after GP. */
-}
-
-/**
- * synchronize_srcu_expedited - Brute-force SRCU grace period
- * @sp: srcu_struct with which to synchronize.
- *
- * Wait for an SRCU grace period to elapse, but be more aggressive about
- * spinning rather than blocking when waiting.
- *
- * Note that synchronize_srcu_expedited() has the same deadlock and
- * memory-ordering properties as does synchronize_srcu().
- */
-void synchronize_srcu_expedited(struct srcu_struct *sp)
-{
-	bool do_norm = rcu_gp_is_normal();
-
-	if (!do_norm) {
-		atomic_inc(&sp->srcu_exp_cnt);
-		smp_mb__after_atomic(); /* increment before GP. */
-	}
-	__synchronize_srcu(sp);
-	if (!do_norm) {
-		smp_mb__before_atomic(); /* GP before decrement. */
-		atomic_dec(&sp->srcu_exp_cnt);
+	if (!done) {
+		wait_for_completion(&rcu.completion);
+		smp_mb(); /* Caller's later accesses after GP. */
 	}
+
 }
-EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
 
 /**
  * synchronize_srcu - wait for prior SRCU read-side critical-section completion
@@ -465,14 +475,29 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
  */
 void synchronize_srcu(struct srcu_struct *sp)
 {
-	if (rcu_gp_is_expedited())
-		synchronize_srcu_expedited(sp);
-	else
-		__synchronize_srcu(sp);
+	__synchronize_srcu(sp, (rcu_gp_is_expedited() && !rcu_gp_is_normal())
+			   ? SYNCHRONIZE_SRCU_EXP_TRYCOUNT
+			   : SYNCHRONIZE_SRCU_TRYCOUNT);
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu);
 
 /**
+ * synchronize_srcu_expedited - Brute-force SRCU grace period
+ * @sp: srcu_struct with which to synchronize.
+ *
+ * Wait for an SRCU grace period to elapse, but be more aggressive about
+ * spinning rather than blocking when waiting.
+ *
+ * Note that synchronize_srcu_expedited() has the same deadlock and
+ * memory-ordering properties as does synchronize_srcu().
+ */
+void synchronize_srcu_expedited(struct srcu_struct *sp)
+{
+	__synchronize_srcu(sp, SYNCHRONIZE_SRCU_EXP_TRYCOUNT);
+}
+EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
+
+/**
  * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
  * @sp: srcu_struct on which to wait for in-flight callbacks.
  */
@@ -495,13 +520,29 @@ unsigned long srcu_batches_completed(struct srcu_struct *sp)
 }
 EXPORT_SYMBOL_GPL(srcu_batches_completed);
 
+#define SRCU_CALLBACK_BATCH	10
+#define SRCU_INTERVAL		1
+
+/*
+ * Move any new SRCU callbacks to the first stage of the SRCU grace
+ * period pipeline.
+ */
+static void srcu_collect_new(struct srcu_struct *sp)
+{
+	if (!rcu_batch_empty(&sp->batch_queue)) {
+		spin_lock_irq(&sp->queue_lock);
+		rcu_batch_move(&sp->batch_check0, &sp->batch_queue);
+		spin_unlock_irq(&sp->queue_lock);
+	}
+}
+
 /*
  * Core SRCU state machine.  Advance callbacks from ->batch_check0 to
  * ->batch_check1 and then to ->batch_done as readers drain.
  */
-static void srcu_advance_batches(struct srcu_struct *sp)
+static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
 {
-	int idx;
+	int idx = 1 ^ (sp->completed & 1);
 
 	/*
 	 * Because readers might be delayed for an extended period after
@@ -509,44 +550,50 @@ static void srcu_advance_batches(struct srcu_struct *sp)
 	 * might well be readers using both idx=0 and idx=1.  We therefore
 	 * need to wait for readers to clear from both index values before
 	 * invoking a callback.
-	 *
-	 * The load-acquire ensures that we see the accesses performed
-	 * by the prior grace period.
 	 */
-	idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */
-	if (idx == SRCU_STATE_IDLE) {
-		spin_lock_irq(&sp->queue_lock);
-		if (rcu_segcblist_empty(&sp->srcu_cblist)) {
-			spin_unlock_irq(&sp->queue_lock);
-			return;
-		}
-		idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
-		if (idx == SRCU_STATE_IDLE)
-			srcu_gp_start(sp);
-		spin_unlock_irq(&sp->queue_lock);
-		if (idx != SRCU_STATE_IDLE)
-			return; /* Someone else started the grace period. */
-	}
 
-	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) {
-		idx = 1 ^ (sp->completed & 1);
-		if (!try_check_zero(sp, idx, 1))
-			return; /* readers present, retry later. */
-		srcu_flip(sp);
-		rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2);
-	}
+	if (rcu_batch_empty(&sp->batch_check0) &&
+	    rcu_batch_empty(&sp->batch_check1))
+		return; /* no callbacks need to be advanced */
 
-	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) {
+	if (!try_check_zero(sp, idx, trycount))
+		return; /* failed to advance, will try after SRCU_INTERVAL */
 
-		/*
-		 * SRCU read-side critical sections are normally short,
-		 * so check at least twice in quick succession after a flip.
-		 */
-		idx = 1 ^ (sp->completed & 1);
-		if (!try_check_zero(sp, idx, 2))
-			return; /* readers present, retry after later. */
-		srcu_gp_end(sp);
-	}
+	/*
+	 * The callbacks in ->batch_check1 have already done with their
+	 * first zero check and flip back when they were enqueued on
+	 * ->batch_check0 in a previous invocation of srcu_advance_batches().
+	 * (Presumably try_check_zero() returned false during that
+	 * invocation, leaving the callbacks stranded on ->batch_check1.)
+	 * They are therefore ready to invoke, so move them to ->batch_done.
+	 */
+	rcu_batch_move(&sp->batch_done, &sp->batch_check1);
+
+	if (rcu_batch_empty(&sp->batch_check0))
+		return; /* no callbacks need to be advanced */
+	srcu_flip(sp);
+
+	/*
+	 * The callbacks in ->batch_check0 just finished their
+	 * first check zero and flip, so move them to ->batch_check1
+	 * for future checking on the other idx.
+	 */
+	rcu_batch_move(&sp->batch_check1, &sp->batch_check0);
+
+	/*
+	 * SRCU read-side critical sections are normally short, so check
+	 * at least twice in quick succession after a flip.
+	 */
+	trycount = trycount < 2 ? 2 : trycount;
+	if (!try_check_zero(sp, idx^1, trycount))
+		return; /* failed to advance, will try after SRCU_INTERVAL */
+
+	/*
+	 * The callbacks in ->batch_check1 have now waited for all
+	 * pre-existing readers using both idx values.  They are therefore
+	 * ready to invoke, so move them to ->batch_done.
+	 */
+	rcu_batch_move(&sp->batch_done, &sp->batch_check1);
 }
 
 /*
@@ -557,48 +604,45 @@ static void srcu_advance_batches(struct srcu_struct *sp)
  */
 static void srcu_invoke_callbacks(struct srcu_struct *sp)
 {
-	struct rcu_cblist ready_cbs;
-	struct rcu_head *rhp;
+	int i;
+	struct rcu_head *head;
 
-	spin_lock_irq(&sp->queue_lock);
-	if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) {
-		spin_unlock_irq(&sp->queue_lock);
-		return;
-	}
-	rcu_cblist_init(&ready_cbs);
-	rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs);
-	spin_unlock_irq(&sp->queue_lock);
-	rhp = rcu_cblist_dequeue(&ready_cbs);
-	for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) {
+	for (i = 0; i < SRCU_CALLBACK_BATCH; i++) {
+		head = rcu_batch_dequeue(&sp->batch_done);
+		if (!head)
+			break;
 		local_bh_disable();
-		rhp->func(rhp);
+		head->func(head);
 		local_bh_enable();
 	}
-	spin_lock_irq(&sp->queue_lock);
-	rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs);
-	spin_unlock_irq(&sp->queue_lock);
 }
 
 /*
  * Finished one round of SRCU grace period.  Start another if there are
  * more SRCU callbacks queued, otherwise put SRCU into not-running state.
  */
-static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
+static void srcu_reschedule(struct srcu_struct *sp)
 {
 	bool pending = true;
-	int state;
 
-	if (rcu_segcblist_empty(&sp->srcu_cblist)) {
+	if (rcu_batch_empty(&sp->batch_done) &&
+	    rcu_batch_empty(&sp->batch_check1) &&
+	    rcu_batch_empty(&sp->batch_check0) &&
+	    rcu_batch_empty(&sp->batch_queue)) {
 		spin_lock_irq(&sp->queue_lock);
-		state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
-		if (rcu_segcblist_empty(&sp->srcu_cblist) &&
-		    state == SRCU_STATE_IDLE)
+		if (rcu_batch_empty(&sp->batch_done) &&
+		    rcu_batch_empty(&sp->batch_check1) &&
+		    rcu_batch_empty(&sp->batch_check0) &&
+		    rcu_batch_empty(&sp->batch_queue)) {
+			sp->running = false;
 			pending = false;
+		}
 		spin_unlock_irq(&sp->queue_lock);
 	}
 
 	if (pending)
-		queue_delayed_work(system_power_efficient_wq, &sp->work, delay);
+		queue_delayed_work(system_power_efficient_wq,
+				   &sp->work, SRCU_INTERVAL);
 }
 
 /*
@@ -610,8 +654,9 @@ void process_srcu(struct work_struct *work)
 
 	sp = container_of(work, struct srcu_struct, work.work);
 
-	srcu_advance_batches(sp);
+	srcu_collect_new(sp);
+	srcu_advance_batches(sp, 1);
 	srcu_invoke_callbacks(sp);
-	srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+	srcu_reschedule(sp);
 }
 EXPORT_SYMBOL_GPL(process_srcu);
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
new file mode 100644
index 000000000000..da676b0d016b
--- /dev/null
+++ b/kernel/rcu/srcutree.c
@@ -0,0 +1,613 @@
+/*
+ * Sleepable Read-Copy Update mechanism for mutual exclusion.
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, you can access it online at
+ * http://www.gnu.org/licenses/gpl-2.0.html.
+ *
+ * Copyright (C) IBM Corporation, 2006
+ * Copyright (C) Fujitsu, 2012
+ *
+ * Author: Paul McKenney <paulmck@...ibm.com>
+ *	   Lai Jiangshan <laijs@...fujitsu.com>
+ *
+ * For detailed explanation of Read-Copy Update mechanism see -
+ *		Documentation/RCU/ *.txt
+ *
+ */
+
+#include <linux/export.h>
+#include <linux/mutex.h>
+#include <linux/percpu.h>
+#include <linux/preempt.h>
+#include <linux/rcupdate_wait.h>
+#include <linux/sched.h>
+#include <linux/smp.h>
+#include <linux/delay.h>
+#include <linux/srcu.h>
+
+#include <linux/rcu_node_tree.h>
+#include "rcu.h"
+
+static int init_srcu_struct_fields(struct srcu_struct *sp)
+{
+	sp->completed = 0;
+	sp->srcu_gp_seq = 0;
+	atomic_set(&sp->srcu_exp_cnt, 0);
+	spin_lock_init(&sp->queue_lock);
+	rcu_segcblist_init(&sp->srcu_cblist);
+	INIT_DELAYED_WORK(&sp->work, process_srcu);
+	sp->per_cpu_ref = alloc_percpu(struct srcu_array);
+	return sp->per_cpu_ref ? 0 : -ENOMEM;
+}
+
+#ifdef CONFIG_DEBUG_LOCK_ALLOC
+
+int __init_srcu_struct(struct srcu_struct *sp, const char *name,
+		       struct lock_class_key *key)
+{
+	/* Don't re-initialize a lock while it is held. */
+	debug_check_no_locks_freed((void *)sp, sizeof(*sp));
+	lockdep_init_map(&sp->dep_map, name, key, 0);
+	return init_srcu_struct_fields(sp);
+}
+EXPORT_SYMBOL_GPL(__init_srcu_struct);
+
+#else /* #ifdef CONFIG_DEBUG_LOCK_ALLOC */
+
+/**
+ * init_srcu_struct - initialize a sleep-RCU structure
+ * @sp: structure to initialize.
+ *
+ * Must invoke this on a given srcu_struct before passing that srcu_struct
+ * to any other function.  Each srcu_struct represents a separate domain
+ * of SRCU protection.
+ */
+int init_srcu_struct(struct srcu_struct *sp)
+{
+	return init_srcu_struct_fields(sp);
+}
+EXPORT_SYMBOL_GPL(init_srcu_struct);
+
+#endif /* #else #ifdef CONFIG_DEBUG_LOCK_ALLOC */
+
+/*
+ * Returns approximate total of the readers' ->lock_count[] values for the
+ * rank of per-CPU counters specified by idx.
+ */
+static unsigned long srcu_readers_lock_idx(struct srcu_struct *sp, int idx)
+{
+	int cpu;
+	unsigned long sum = 0;
+
+	for_each_possible_cpu(cpu) {
+		struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu);
+
+		sum += READ_ONCE(cpuc->lock_count[idx]);
+	}
+	return sum;
+}
+
+/*
+ * Returns approximate total of the readers' ->unlock_count[] values for the
+ * rank of per-CPU counters specified by idx.
+ */
+static unsigned long srcu_readers_unlock_idx(struct srcu_struct *sp, int idx)
+{
+	int cpu;
+	unsigned long sum = 0;
+
+	for_each_possible_cpu(cpu) {
+		struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu);
+
+		sum += READ_ONCE(cpuc->unlock_count[idx]);
+	}
+	return sum;
+}
+
+/*
+ * Return true if the number of pre-existing readers is determined to
+ * be zero.
+ */
+static bool srcu_readers_active_idx_check(struct srcu_struct *sp, int idx)
+{
+	unsigned long unlocks;
+
+	unlocks = srcu_readers_unlock_idx(sp, idx);
+
+	/*
+	 * Make sure that a lock is always counted if the corresponding
+	 * unlock is counted. Needs to be a smp_mb() as the read side may
+	 * contain a read from a variable that is written to before the
+	 * synchronize_srcu() in the write side. In this case smp_mb()s
+	 * A and B act like the store buffering pattern.
+	 *
+	 * This smp_mb() also pairs with smp_mb() C to prevent accesses
+	 * after the synchronize_srcu() from being executed before the
+	 * grace period ends.
+	 */
+	smp_mb(); /* A */
+
+	/*
+	 * If the locks are the same as the unlocks, then there must have
+	 * been no readers on this index at some time in between. This does
+	 * not mean that there are no more readers, as one could have read
+	 * the current index but not have incremented the lock counter yet.
+	 *
+	 * Possible bug: There is no guarantee that there haven't been
+	 * ULONG_MAX increments of ->lock_count[] since the unlocks were
+	 * counted, meaning that this could return true even if there are
+	 * still active readers.  Since there are no memory barriers around
+	 * srcu_flip(), the CPU is not required to increment ->completed
+	 * before running srcu_readers_unlock_idx(), which means that there
+	 * could be an arbitrarily large number of critical sections that
+	 * execute after srcu_readers_unlock_idx() but use the old value
+	 * of ->completed.
+	 */
+	return srcu_readers_lock_idx(sp, idx) == unlocks;
+}
+
+/**
+ * srcu_readers_active - returns true if there are readers. and false
+ *                       otherwise
+ * @sp: which srcu_struct to count active readers (holding srcu_read_lock).
+ *
+ * Note that this is not an atomic primitive, and can therefore suffer
+ * severe errors when invoked on an active srcu_struct.  That said, it
+ * can be useful as an error check at cleanup time.
+ */
+static bool srcu_readers_active(struct srcu_struct *sp)
+{
+	int cpu;
+	unsigned long sum = 0;
+
+	for_each_possible_cpu(cpu) {
+		struct srcu_array *cpuc = per_cpu_ptr(sp->per_cpu_ref, cpu);
+
+		sum += READ_ONCE(cpuc->lock_count[0]);
+		sum += READ_ONCE(cpuc->lock_count[1]);
+		sum -= READ_ONCE(cpuc->unlock_count[0]);
+		sum -= READ_ONCE(cpuc->unlock_count[1]);
+	}
+	return sum;
+}
+
+#define SRCU_INTERVAL		1
+
+/**
+ * cleanup_srcu_struct - deconstruct a sleep-RCU structure
+ * @sp: structure to clean up.
+ *
+ * Must invoke this after you are finished using a given srcu_struct that
+ * was initialized via init_srcu_struct(), else you leak memory.
+ */
+void cleanup_srcu_struct(struct srcu_struct *sp)
+{
+	WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt));
+	if (WARN_ON(srcu_readers_active(sp)))
+		return; /* Leakage unless caller handles error. */
+	if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist)))
+		return; /* Leakage unless caller handles error. */
+	flush_delayed_work(&sp->work);
+	if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) {
+		pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)));
+		return; /* Caller forgot to stop doing call_srcu()? */
+	}
+	free_percpu(sp->per_cpu_ref);
+	sp->per_cpu_ref = NULL;
+}
+EXPORT_SYMBOL_GPL(cleanup_srcu_struct);
+
+/*
+ * Counts the new reader in the appropriate per-CPU element of the
+ * srcu_struct.  Must be called from process context.
+ * Returns an index that must be passed to the matching srcu_read_unlock().
+ */
+int __srcu_read_lock(struct srcu_struct *sp)
+{
+	int idx;
+
+	idx = READ_ONCE(sp->completed) & 0x1;
+	__this_cpu_inc(sp->per_cpu_ref->lock_count[idx]);
+	smp_mb(); /* B */  /* Avoid leaking the critical section. */
+	return idx;
+}
+EXPORT_SYMBOL_GPL(__srcu_read_lock);
+
+/*
+ * Removes the count for the old reader from the appropriate per-CPU
+ * element of the srcu_struct.  Note that this may well be a different
+ * CPU than that which was incremented by the corresponding srcu_read_lock().
+ * Must be called from process context.
+ */
+void __srcu_read_unlock(struct srcu_struct *sp, int idx)
+{
+	smp_mb(); /* C */  /* Avoid leaking the critical section. */
+	this_cpu_inc(sp->per_cpu_ref->unlock_count[idx]);
+}
+EXPORT_SYMBOL_GPL(__srcu_read_unlock);
+
+/*
+ * We use an adaptive strategy for synchronize_srcu() and especially for
+ * synchronize_srcu_expedited().  We spin for a fixed time period
+ * (defined below) to allow SRCU readers to exit their read-side critical
+ * sections.  If there are still some readers after a few microseconds,
+ * we repeatedly block for 1-millisecond time periods.
+ */
+#define SRCU_RETRY_CHECK_DELAY		5
+
+/*
+ * Start an SRCU grace period.
+ */
+static void srcu_gp_start(struct srcu_struct *sp)
+{
+	int state;
+
+	rcu_segcblist_accelerate(&sp->srcu_cblist,
+				 rcu_seq_snap(&sp->srcu_gp_seq));
+	rcu_seq_start(&sp->srcu_gp_seq);
+	state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
+	WARN_ON_ONCE(state != SRCU_STATE_SCAN1);
+}
+
+/*
+ * Wait until all readers counted by array index idx complete, but
+ * loop an additional time if there is an expedited grace period pending.
+ * The caller must ensure that ->completed is not changed while checking.
+ */
+static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
+{
+	for (;;) {
+		if (srcu_readers_active_idx_check(sp, idx))
+			return true;
+		if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0)
+			return false;
+		udelay(SRCU_RETRY_CHECK_DELAY);
+	}
+}
+
+/*
+ * Increment the ->completed counter so that future SRCU readers will
+ * use the other rank of the ->(un)lock_count[] arrays.  This allows
+ * us to wait for pre-existing readers in a starvation-free manner.
+ */
+static void srcu_flip(struct srcu_struct *sp)
+{
+	WRITE_ONCE(sp->completed, sp->completed + 1);
+
+	/*
+	 * Ensure that if the updater misses an __srcu_read_unlock()
+	 * increment, that task's next __srcu_read_lock() will see the
+	 * above counter update.  Note that both this memory barrier
+	 * and the one in srcu_readers_active_idx_check() provide the
+	 * guarantee for __srcu_read_lock().
+	 */
+	smp_mb(); /* D */  /* Pairs with C. */
+}
+
+/*
+ * End an SRCU grace period.
+ */
+static void srcu_gp_end(struct srcu_struct *sp)
+{
+	rcu_seq_end(&sp->srcu_gp_seq);
+
+	spin_lock_irq(&sp->queue_lock);
+	rcu_segcblist_advance(&sp->srcu_cblist,
+			      rcu_seq_current(&sp->srcu_gp_seq));
+	spin_unlock_irq(&sp->queue_lock);
+}
+
+/*
+ * Enqueue an SRCU callback on the specified srcu_struct structure,
+ * initiating grace-period processing if it is not already running.
+ *
+ * Note that all CPUs must agree that the grace period extended beyond
+ * all pre-existing SRCU read-side critical section.  On systems with
+ * more than one CPU, this means that when "func()" is invoked, each CPU
+ * is guaranteed to have executed a full memory barrier since the end of
+ * its last corresponding SRCU read-side critical section whose beginning
+ * preceded the call to call_rcu().  It also means that each CPU executing
+ * an SRCU read-side critical section that continues beyond the start of
+ * "func()" must have executed a memory barrier after the call_rcu()
+ * but before the beginning of that SRCU read-side critical section.
+ * Note that these guarantees include CPUs that are offline, idle, or
+ * executing in user mode, as well as CPUs that are executing in the kernel.
+ *
+ * Furthermore, if CPU A invoked call_rcu() and CPU B invoked the
+ * resulting SRCU callback function "func()", then both CPU A and CPU
+ * B are guaranteed to execute a full memory barrier during the time
+ * interval between the call to call_rcu() and the invocation of "func()".
+ * This guarantee applies even if CPU A and CPU B are the same CPU (but
+ * again only if the system has more than one CPU).
+ *
+ * Of course, these guarantees apply only for invocations of call_srcu(),
+ * srcu_read_lock(), and srcu_read_unlock() that are all passed the same
+ * srcu_struct structure.
+ */
+void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
+	       rcu_callback_t func)
+{
+	unsigned long flags;
+
+	head->next = NULL;
+	head->func = func;
+	spin_lock_irqsave(&sp->queue_lock, flags);
+	smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
+	rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
+	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
+		srcu_gp_start(sp);
+		queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
+	}
+	spin_unlock_irqrestore(&sp->queue_lock, flags);
+}
+EXPORT_SYMBOL_GPL(call_srcu);
+
+static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
+
+/*
+ * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
+ */
+static void __synchronize_srcu(struct srcu_struct *sp)
+{
+	struct rcu_synchronize rcu;
+	struct rcu_head *head = &rcu.head;
+
+	RCU_LOCKDEP_WARN(lock_is_held(&sp->dep_map) ||
+			 lock_is_held(&rcu_bh_lock_map) ||
+			 lock_is_held(&rcu_lock_map) ||
+			 lock_is_held(&rcu_sched_lock_map),
+			 "Illegal synchronize_srcu() in same-type SRCU (or in RCU) read-side critical section");
+
+	if (rcu_scheduler_active == RCU_SCHEDULER_INACTIVE)
+		return;
+	might_sleep();
+	init_completion(&rcu.completion);
+
+	head->next = NULL;
+	head->func = wakeme_after_rcu;
+	spin_lock_irq(&sp->queue_lock);
+	smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
+	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
+		/* steal the processing owner */
+		rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
+		srcu_gp_start(sp);
+		spin_unlock_irq(&sp->queue_lock);
+		/* give the processing owner to work_struct */
+		srcu_reschedule(sp, 0);
+	} else {
+		rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
+		spin_unlock_irq(&sp->queue_lock);
+	}
+
+	wait_for_completion(&rcu.completion);
+	smp_mb(); /* Caller's later accesses after GP. */
+}
+
+/**
+ * synchronize_srcu_expedited - Brute-force SRCU grace period
+ * @sp: srcu_struct with which to synchronize.
+ *
+ * Wait for an SRCU grace period to elapse, but be more aggressive about
+ * spinning rather than blocking when waiting.
+ *
+ * Note that synchronize_srcu_expedited() has the same deadlock and
+ * memory-ordering properties as does synchronize_srcu().
+ */
+void synchronize_srcu_expedited(struct srcu_struct *sp)
+{
+	bool do_norm = rcu_gp_is_normal();
+
+	if (!do_norm) {
+		atomic_inc(&sp->srcu_exp_cnt);
+		smp_mb__after_atomic(); /* increment before GP. */
+	}
+	__synchronize_srcu(sp);
+	if (!do_norm) {
+		smp_mb__before_atomic(); /* GP before decrement. */
+		atomic_dec(&sp->srcu_exp_cnt);
+	}
+}
+EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
+
+/**
+ * synchronize_srcu - wait for prior SRCU read-side critical-section completion
+ * @sp: srcu_struct with which to synchronize.
+ *
+ * Wait for the count to drain to zero of both indexes. To avoid the
+ * possible starvation of synchronize_srcu(), it waits for the count of
+ * the index=((->completed & 1) ^ 1) to drain to zero at first,
+ * and then flip the completed and wait for the count of the other index.
+ *
+ * Can block; must be called from process context.
+ *
+ * Note that it is illegal to call synchronize_srcu() from the corresponding
+ * SRCU read-side critical section; doing so will result in deadlock.
+ * However, it is perfectly legal to call synchronize_srcu() on one
+ * srcu_struct from some other srcu_struct's read-side critical section,
+ * as long as the resulting graph of srcu_structs is acyclic.
+ *
+ * There are memory-ordering constraints implied by synchronize_srcu().
+ * On systems with more than one CPU, when synchronize_srcu() returns,
+ * each CPU is guaranteed to have executed a full memory barrier since
+ * the end of its last corresponding SRCU-sched read-side critical section
+ * whose beginning preceded the call to synchronize_srcu().  In addition,
+ * each CPU having an SRCU read-side critical section that extends beyond
+ * the return from synchronize_srcu() is guaranteed to have executed a
+ * full memory barrier after the beginning of synchronize_srcu() and before
+ * the beginning of that SRCU read-side critical section.  Note that these
+ * guarantees include CPUs that are offline, idle, or executing in user mode,
+ * as well as CPUs that are executing in the kernel.
+ *
+ * Furthermore, if CPU A invoked synchronize_srcu(), which returned
+ * to its caller on CPU B, then both CPU A and CPU B are guaranteed
+ * to have executed a full memory barrier during the execution of
+ * synchronize_srcu().  This guarantee applies even if CPU A and CPU B
+ * are the same CPU, but again only if the system has more than one CPU.
+ *
+ * Of course, these memory-ordering guarantees apply only when
+ * synchronize_srcu(), srcu_read_lock(), and srcu_read_unlock() are
+ * passed the same srcu_struct structure.
+ */
+void synchronize_srcu(struct srcu_struct *sp)
+{
+	if (rcu_gp_is_expedited())
+		synchronize_srcu_expedited(sp);
+	else
+		__synchronize_srcu(sp);
+}
+EXPORT_SYMBOL_GPL(synchronize_srcu);
+
+/**
+ * srcu_barrier - Wait until all in-flight call_srcu() callbacks complete.
+ * @sp: srcu_struct on which to wait for in-flight callbacks.
+ */
+void srcu_barrier(struct srcu_struct *sp)
+{
+	synchronize_srcu(sp);
+}
+EXPORT_SYMBOL_GPL(srcu_barrier);
+
+/**
+ * srcu_batches_completed - return batches completed.
+ * @sp: srcu_struct on which to report batch completion.
+ *
+ * Report the number of batches, correlated with, but not necessarily
+ * precisely the same as, the number of grace periods that have elapsed.
+ */
+unsigned long srcu_batches_completed(struct srcu_struct *sp)
+{
+	return sp->completed;
+}
+EXPORT_SYMBOL_GPL(srcu_batches_completed);
+
+/*
+ * Core SRCU state machine.  Advance callbacks from ->batch_check0 to
+ * ->batch_check1 and then to ->batch_done as readers drain.
+ */
+static void srcu_advance_batches(struct srcu_struct *sp)
+{
+	int idx;
+
+	/*
+	 * Because readers might be delayed for an extended period after
+	 * fetching ->completed for their index, at any point in time there
+	 * might well be readers using both idx=0 and idx=1.  We therefore
+	 * need to wait for readers to clear from both index values before
+	 * invoking a callback.
+	 *
+	 * The load-acquire ensures that we see the accesses performed
+	 * by the prior grace period.
+	 */
+	idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */
+	if (idx == SRCU_STATE_IDLE) {
+		spin_lock_irq(&sp->queue_lock);
+		if (rcu_segcblist_empty(&sp->srcu_cblist)) {
+			spin_unlock_irq(&sp->queue_lock);
+			return;
+		}
+		idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
+		if (idx == SRCU_STATE_IDLE)
+			srcu_gp_start(sp);
+		spin_unlock_irq(&sp->queue_lock);
+		if (idx != SRCU_STATE_IDLE)
+			return; /* Someone else started the grace period. */
+	}
+
+	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) {
+		idx = 1 ^ (sp->completed & 1);
+		if (!try_check_zero(sp, idx, 1))
+			return; /* readers present, retry later. */
+		srcu_flip(sp);
+		rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2);
+	}
+
+	if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) {
+
+		/*
+		 * SRCU read-side critical sections are normally short,
+		 * so check at least twice in quick succession after a flip.
+		 */
+		idx = 1 ^ (sp->completed & 1);
+		if (!try_check_zero(sp, idx, 2))
+			return; /* readers present, retry after later. */
+		srcu_gp_end(sp);
+	}
+}
+
+/*
+ * Invoke a limited number of SRCU callbacks that have passed through
+ * their grace period.  If there are more to do, SRCU will reschedule
+ * the workqueue.  Note that needed memory barriers have been executed
+ * in this task's context by srcu_readers_active_idx_check().
+ */
+static void srcu_invoke_callbacks(struct srcu_struct *sp)
+{
+	struct rcu_cblist ready_cbs;
+	struct rcu_head *rhp;
+
+	spin_lock_irq(&sp->queue_lock);
+	if (!rcu_segcblist_ready_cbs(&sp->srcu_cblist)) {
+		spin_unlock_irq(&sp->queue_lock);
+		return;
+	}
+	rcu_cblist_init(&ready_cbs);
+	rcu_segcblist_extract_done_cbs(&sp->srcu_cblist, &ready_cbs);
+	spin_unlock_irq(&sp->queue_lock);
+	rhp = rcu_cblist_dequeue(&ready_cbs);
+	for (; rhp != NULL; rhp = rcu_cblist_dequeue(&ready_cbs)) {
+		local_bh_disable();
+		rhp->func(rhp);
+		local_bh_enable();
+	}
+	spin_lock_irq(&sp->queue_lock);
+	rcu_segcblist_insert_count(&sp->srcu_cblist, &ready_cbs);
+	spin_unlock_irq(&sp->queue_lock);
+}
+
+/*
+ * Finished one round of SRCU grace period.  Start another if there are
+ * more SRCU callbacks queued, otherwise put SRCU into not-running state.
+ */
+static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
+{
+	bool pending = true;
+	int state;
+
+	if (rcu_segcblist_empty(&sp->srcu_cblist)) {
+		spin_lock_irq(&sp->queue_lock);
+		state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
+		if (rcu_segcblist_empty(&sp->srcu_cblist) &&
+		    state == SRCU_STATE_IDLE)
+			pending = false;
+		spin_unlock_irq(&sp->queue_lock);
+	}
+
+	if (pending)
+		queue_delayed_work(system_power_efficient_wq, &sp->work, delay);
+}
+
+/*
+ * This is the work-queue function that handles SRCU grace periods.
+ */
+void process_srcu(struct work_struct *work)
+{
+	struct srcu_struct *sp;
+
+	sp = container_of(work, struct srcu_struct, work.work);
+
+	srcu_advance_batches(sp);
+	srcu_invoke_callbacks(sp);
+	srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+}
+EXPORT_SYMBOL_GPL(process_srcu);
-- 
2.5.2

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ