lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <20170426143145.GA5683@linux.vnet.ibm.com>
Date:   Wed, 26 Apr 2017 07:31:45 -0700
From:   "Paul E. McKenney" <paulmck@...ux.vnet.ibm.com>
To:     Mike Galbraith <efault@....de>
Cc:     LKML <linux-kernel@...r.kernel.org>
Subject: Re: TREE_SRCU slows hotplug by factor ~16

On Tue, Apr 25, 2017 at 03:36:42PM -0700, Paul E. McKenney wrote:
> On Mon, Apr 24, 2017 at 09:24:42AM -0700, Paul E. McKenney wrote:
> > On Mon, Apr 24, 2017 at 09:35:03AM +0200, Mike Galbraith wrote:
> > > On Sun, 2017-04-23 at 23:22 -0700, Paul E. McKenney wrote:
> > > 
> > > > Could you please collect an ftrace (or whatever) showing the timestamp
> > > > sequence of calls to synchronize_srcu(), synchronize_srcu_expedited(),
> > > > and call_srcu() during the execution of the stress script?  If it is easy
> > > > to do, also the timestamp sequence of returns from synchronize_srcu()
> > > > and synchronize_srcu_expedited()?
> > > 
> > > The first two minutes below config bits, trace_printk([enter/exit]) in
> > > each function.  If you (unlikely, but..) want the whole trace, holler.
> > > 
> > > # RCU Subsystem
> > > CONFIG_TREE_RCU=y
> > > CONFIG_RCU_EXPERT=y
> > > CONFIG_SRCU=y
> > > # CONFIG_CLASSIC_SRCU is not set
> > > CONFIG_TREE_SRCU=y
> > > # CONFIG_TASKS_RCU is not set
> > > CONFIG_RCU_STALL_COMMON=y
> > > CONFIG_RCU_FANOUT=64
> > > CONFIG_RCU_FANOUT_LEAF=16
> > > # CONFIG_RCU_FAST_NO_HZ is not set
> > > # CONFIG_TREE_RCU_TRACE is not set
> > > CONFIG_RCU_KTHREAD_PRIO=0
> > > CONFIG_RCU_NOCB_CPU=y
> > > CONFIG_RCU_NOCB_CPU_NONE=y
> > > # CONFIG_RCU_NOCB_CPU_ZERO is not set
> > > # CONFIG_RCU_NOCB_CPU_ALL is not set
> > > # RCU Debugging
> > > # CONFIG_PROVE_RCU is not set
> > > # CONFIG_SPARSE_RCU_POINTER is not set
> > > # CONFIG_RCU_PERF_TEST is not set
> > > # CONFIG_RCU_TORTURE_TEST is not set
> > > CONFIG_RCU_CPU_STALL_TIMEOUT=60
> > > # CONFIG_RCU_TRACE is not set
> > > # CONFIG_RCU_EQS_DEBUG is not set
> > 
> > Thank you for the data, extremely helpful!  I now know that I must
> > immediately fix this the hard way rather than trying several simpler
> > possibilities that, according to your trace data, would be complete
> > wastes of time.
> > 
> > My initial calculation was actually optimistic.  There was one
> > exit-to-enter interval at about 80 milliseconds, but the second
> > longest is 342 microseconds, and the vast majority are under 200
> > microseconds.
> > 
> > Back to the drawing board!
> > 
> > This will take some time, but I should have a patch for you in a few days,
> > hopefully sooner.
> 
> Making progress, might have something late tomorrow (Wednesday) Pacific
> Time, will definitely have something Thursday Pacific Time.

And a sneak preview, semi-tested.  If you get a chance to run this, please
let me know now it goes.

This should apply on -tip or on -rcu.  Probably on recent -next as well.

							Thanx, Paul

------------------------------------------------------------------------

diff --git a/Documentation/admin-guide/kernel-parameters.txt b/Documentation/admin-guide/kernel-parameters.txt
index facc20a3f962..4a4b9266c4de 100644
--- a/Documentation/admin-guide/kernel-parameters.txt
+++ b/Documentation/admin-guide/kernel-parameters.txt
@@ -3779,6 +3779,14 @@
 	spia_pedr=
 	spia_peddr=
 
+	srcutree.exp_holdoff [KNL]
+			Specifies how many nanoseconds must elapse
+			since the end of the last SRCU grace period for
+			a given srcu_struct until the next normal SRCU
+			grace period will be considered for automatic
+			expediting.  Set to zero to disable automatic
+			expediting.
+
 	stacktrace	[FTRACE]
 			Enabled the stack tracer on boot up.
 
diff --git a/include/linux/srcuclassic.h b/include/linux/srcuclassic.h
index 41cf99930f34..5753f7322262 100644
--- a/include/linux/srcuclassic.h
+++ b/include/linux/srcuclassic.h
@@ -98,4 +98,18 @@ void synchronize_srcu_expedited(struct srcu_struct *sp);
 void srcu_barrier(struct srcu_struct *sp);
 unsigned long srcu_batches_completed(struct srcu_struct *sp);
 
+static inline void srcutorture_get_gp_data(enum rcutorture_type test_type,
+					   struct srcu_struct *sp, int *flags,
+					   unsigned long *gpnum,
+					   unsigned long *completed)
+{
+	if (test_type != SRCU_FLAVOR)
+		return;
+	*flags = 0;
+	*completed = sp->completed;
+	*gpnum = *completed;
+	if (sp->batch_queue.head || sp->batch_check0.head || sp->batch_check0.head)
+		(*gpnum)++;
+}
+
 #endif
diff --git a/include/linux/srcutiny.h b/include/linux/srcutiny.h
index 4f284e4f4d8c..42311ee0334f 100644
--- a/include/linux/srcutiny.h
+++ b/include/linux/srcutiny.h
@@ -78,4 +78,16 @@ static inline unsigned long srcu_batches_completed(struct srcu_struct *sp)
 	return 0;
 }
 
+static inline void srcutorture_get_gp_data(enum rcutorture_type test_type,
+					   struct srcu_struct *sp, int *flags,
+					   unsigned long *gpnum,
+					   unsigned long *completed)
+{
+	if (test_type != SRCU_FLAVOR)
+		return;
+	*flags = 0;
+	*completed = sp->srcu_gp_seq;
+	*gpnum = *completed;
+}
+
 #endif
diff --git a/include/linux/srcutree.h b/include/linux/srcutree.h
index 0400e211aa44..32e86d85fd11 100644
--- a/include/linux/srcutree.h
+++ b/include/linux/srcutree.h
@@ -43,10 +43,13 @@ struct srcu_data {
 	spinlock_t lock ____cacheline_internodealigned_in_smp;
 	struct rcu_segcblist srcu_cblist;	/* List of callbacks.*/
 	unsigned long srcu_gp_seq_needed;	/* Furthest future GP needed. */
+	unsigned long srcu_gp_seq_needed_exp;	/* Furthest future exp GP. */
 	bool srcu_cblist_invoking;		/* Invoking these CBs? */
 	struct delayed_work work;		/* Context for CB invoking. */
 	struct rcu_head srcu_barrier_head;	/* For srcu_barrier() use. */
 	struct srcu_node *mynode;		/* Leaf srcu_node. */
+	unsigned long grpmask;			/* Mask for leaf srcu_node */
+						/*  ->srcu_data_have_cbs[]. */
 	int cpu;
 	struct srcu_struct *sp;
 };
@@ -59,6 +62,9 @@ struct srcu_node {
 	unsigned long srcu_have_cbs[4];		/* GP seq for children */
 						/*  having CBs, but only */
 						/*  is > ->srcu_gq_seq. */
+	unsigned long srcu_data_have_cbs[4];	/* Which srcu_data structs */
+						/*  have CBs for given GP? */
+	unsigned long srcu_gp_seq_needed_exp;	/* Furthest future exp GP. */
 	struct srcu_node *srcu_parent;		/* Next up in tree. */
 	int grplo;				/* Least CPU for node. */
 	int grphi;				/* Biggest CPU for node. */
@@ -77,7 +83,8 @@ struct srcu_struct {
 	unsigned int srcu_idx;			/* Current rdr array element. */
 	unsigned long srcu_gp_seq;		/* Grace-period seq #. */
 	unsigned long srcu_gp_seq_needed;	/* Latest gp_seq needed. */
-	atomic_t srcu_exp_cnt;			/* # ongoing expedited GPs. */
+	unsigned long srcu_gp_seq_needed_exp;	/* Furthest future exp GP. */
+	unsigned long srcu_last_gp_end;		/* Last GP end timestamp (ns) */
 	struct srcu_data __percpu *sda;		/* Per-CPU srcu_data array. */
 	unsigned long srcu_barrier_seq;		/* srcu_barrier seq #. */
 	struct mutex srcu_barrier_mutex;	/* Serialize barrier ops. */
@@ -136,4 +143,8 @@ void synchronize_srcu_expedited(struct srcu_struct *sp);
 void srcu_barrier(struct srcu_struct *sp);
 unsigned long srcu_batches_completed(struct srcu_struct *sp);
 
+void srcutorture_get_gp_data(enum rcutorture_type test_type,
+			     struct srcu_struct *sp, int *flags,
+			     unsigned long *gpnum, unsigned long *completed);
+
 #endif
diff --git a/kernel/rcu/rcutorture.c b/kernel/rcu/rcutorture.c
index e9d4527cdd43..ae6e574d4cf5 100644
--- a/kernel/rcu/rcutorture.c
+++ b/kernel/rcu/rcutorture.c
@@ -1360,12 +1360,14 @@ rcu_torture_stats_print(void)
 		cur_ops->stats();
 	if (rtcv_snap == rcu_torture_current_version &&
 	    rcu_torture_current != NULL) {
-		int __maybe_unused flags;
-		unsigned long __maybe_unused gpnum;
-		unsigned long __maybe_unused completed;
+		int __maybe_unused flags = 0;
+		unsigned long __maybe_unused gpnum = 0;
+		unsigned long __maybe_unused completed = 0;
 
 		rcutorture_get_gp_data(cur_ops->ttype,
 				       &flags, &gpnum, &completed);
+		srcutorture_get_gp_data(cur_ops->ttype, srcu_ctlp,
+					&flags, &gpnum, &completed);
 		wtp = READ_ONCE(writer_task);
 		pr_alert("??? Writer stall state %s(%d) g%lu c%lu f%#x ->state %#lx\n",
 			 rcu_torture_writer_state_getname(),
diff --git a/kernel/rcu/srcutree.c b/kernel/rcu/srcutree.c
index 9ecf0acc18eb..b09221912f48 100644
--- a/kernel/rcu/srcutree.c
+++ b/kernel/rcu/srcutree.c
@@ -34,10 +34,14 @@
 #include <linux/sched.h>
 #include <linux/smp.h>
 #include <linux/delay.h>
+#include <linux/module.h>
 #include <linux/srcu.h>
 
 #include "rcu.h"
 
+ulong exp_holdoff = 50 * 1000; /* Holdoff (ns) for auto-expediting. */
+module_param(exp_holdoff, ulong, 0444);
+
 static void srcu_invoke_callbacks(struct work_struct *work);
 static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay);
 
@@ -66,8 +70,13 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static)
 	/* Each pass through this loop initializes one srcu_node structure. */
 	rcu_for_each_node_breadth_first(sp, snp) {
 		spin_lock_init(&snp->lock);
-		for (i = 0; i < ARRAY_SIZE(snp->srcu_have_cbs); i++)
+		WARN_ON_ONCE(ARRAY_SIZE(snp->srcu_have_cbs) !=
+			     ARRAY_SIZE(snp->srcu_data_have_cbs));
+		for (i = 0; i < ARRAY_SIZE(snp->srcu_have_cbs); i++) {
 			snp->srcu_have_cbs[i] = 0;
+			snp->srcu_data_have_cbs[i] = 0;
+		}
+		snp->srcu_gp_seq_needed_exp = 0;
 		snp->grplo = -1;
 		snp->grphi = -1;
 		if (snp == &sp->node[0]) {
@@ -98,6 +107,7 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static)
 		rcu_segcblist_init(&sdp->srcu_cblist);
 		sdp->srcu_cblist_invoking = false;
 		sdp->srcu_gp_seq_needed = sp->srcu_gp_seq;
+		sdp->srcu_gp_seq_needed_exp = sp->srcu_gp_seq;
 		sdp->mynode = &snp_first[cpu / levelspread[level]];
 		for (snp = sdp->mynode; snp != NULL; snp = snp->srcu_parent) {
 			if (snp->grplo < 0)
@@ -107,6 +117,7 @@ static void init_srcu_struct_nodes(struct srcu_struct *sp, bool is_static)
 		sdp->cpu = cpu;
 		INIT_DELAYED_WORK(&sdp->work, srcu_invoke_callbacks);
 		sdp->sp = sp;
+		sdp->grpmask = 1 << (cpu - sdp->mynode->grplo);
 		if (is_static)
 			continue;
 
@@ -130,7 +141,6 @@ static int init_srcu_struct_fields(struct srcu_struct *sp, bool is_static)
 	mutex_init(&sp->srcu_gp_mutex);
 	sp->srcu_idx = 0;
 	sp->srcu_gp_seq = 0;
-	atomic_set(&sp->srcu_exp_cnt, 0);
 	sp->srcu_barrier_seq = 0;
 	mutex_init(&sp->srcu_barrier_mutex);
 	atomic_set(&sp->srcu_barrier_cpu_cnt, 0);
@@ -138,6 +148,8 @@ static int init_srcu_struct_fields(struct srcu_struct *sp, bool is_static)
 	if (!is_static)
 		sp->sda = alloc_percpu(struct srcu_data);
 	init_srcu_struct_nodes(sp, is_static);
+	sp->srcu_gp_seq_needed_exp = 0;
+	sp->srcu_last_gp_end = ktime_get_mono_fast_ns();
 	smp_store_release(&sp->srcu_gp_seq_needed, 0); /* Init done. */
 	return sp->sda ? 0 : -ENOMEM;
 }
@@ -302,6 +314,18 @@ static bool srcu_readers_active(struct srcu_struct *sp)
 
 #define SRCU_INTERVAL		1
 
+/*
+ * Return grace-period delay, zero if there are expedited grace
+ * periods pending, SRCU_INTERVAL otherwise.
+ */
+static unsigned long srcu_get_delay(struct srcu_struct *sp)
+{
+	if (ULONG_CMP_LT(READ_ONCE(sp->srcu_gp_seq),
+			 READ_ONCE(sp->srcu_gp_seq_needed_exp)))
+		return 0;
+	return SRCU_INTERVAL;
+}
+
 /**
  * cleanup_srcu_struct - deconstruct a sleep-RCU structure
  * @sp: structure to clean up.
@@ -313,7 +337,8 @@ void cleanup_srcu_struct(struct srcu_struct *sp)
 {
 	int cpu;
 
-	WARN_ON_ONCE(atomic_read(&sp->srcu_exp_cnt));
+	if (WARN_ON(!srcu_get_delay(sp)))
+		return; /* Leakage unless caller handles error. */
 	if (WARN_ON(srcu_readers_active(sp)))
 		return; /* Leakage unless caller handles error. */
 	flush_delayed_work(&sp->work);
@@ -382,6 +407,7 @@ static void srcu_gp_start(struct srcu_struct *sp)
 			      rcu_seq_current(&sp->srcu_gp_seq));
 	(void)rcu_segcblist_accelerate(&sdp->srcu_cblist,
 				       rcu_seq_snap(&sp->srcu_gp_seq));
+	smp_mb(); /* Order prior store to ->srcu_gp_seq_needed vs. GP start. */
 	rcu_seq_start(&sp->srcu_gp_seq);
 	state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
 	WARN_ON_ONCE(state != SRCU_STATE_SCAN1);
@@ -434,16 +460,20 @@ static void srcu_schedule_cbs_sdp(struct srcu_data *sdp, unsigned long delay)
 
 /*
  * Schedule callback invocation for all srcu_data structures associated
- * with the specified srcu_node structure, if possible, on the corresponding
- * CPUs.
+ * with the specified srcu_node structure that have callbacks for the
+ * just-completed grace period, the one corresponding to idx.  If possible,
+ * schedule this invocation on the corresponding CPUs.
  */
-static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp)
+static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp,
+				  unsigned long mask, unsigned long delay)
 {
 	int cpu;
 
-	for (cpu = snp->grplo; cpu <= snp->grphi; cpu++)
-		srcu_schedule_cbs_sdp(per_cpu_ptr(sp->sda, cpu),
-				      atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+	for (cpu = snp->grplo; cpu <= snp->grphi; cpu++) {
+		if (!(mask & (1 << (cpu - snp->grplo))))
+			continue;
+		srcu_schedule_cbs_sdp(per_cpu_ptr(sp->sda, cpu), delay);
+	}
 }
 
 /*
@@ -457,10 +487,12 @@ static void srcu_schedule_cbs_snp(struct srcu_struct *sp, struct srcu_node *snp)
  */
 static void srcu_gp_end(struct srcu_struct *sp)
 {
+	unsigned long cbdelay;
 	bool cbs;
 	unsigned long gpseq;
 	int idx;
 	int idxnext;
+	unsigned long mask;
 	struct srcu_node *snp;
 
 	/* Prevent more than one additional grace period. */
@@ -470,8 +502,12 @@ static void srcu_gp_end(struct srcu_struct *sp)
 	spin_lock_irq(&sp->gp_lock);
 	idx = rcu_seq_state(sp->srcu_gp_seq);
 	WARN_ON_ONCE(idx != SRCU_STATE_SCAN2);
+	cbdelay = srcu_get_delay(sp);
+	sp->srcu_last_gp_end = ktime_get_mono_fast_ns();
 	rcu_seq_end(&sp->srcu_gp_seq);
 	gpseq = rcu_seq_current(&sp->srcu_gp_seq);
+	if (ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, gpseq))
+		sp->srcu_gp_seq_needed_exp = gpseq;
 	spin_unlock_irq(&sp->gp_lock);
 	mutex_unlock(&sp->srcu_gp_mutex);
 	/* A new grace period can start at this point.  But only one. */
@@ -486,10 +522,14 @@ static void srcu_gp_end(struct srcu_struct *sp)
 			cbs = snp->srcu_have_cbs[idx] == gpseq;
 		snp->srcu_have_cbs[idx] = gpseq;
 		rcu_seq_set_state(&snp->srcu_have_cbs[idx], 1);
+		if (ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, gpseq))
+			snp->srcu_gp_seq_needed_exp = gpseq;
+		mask = snp->srcu_data_have_cbs[idx];
+		snp->srcu_data_have_cbs[idx] = 0;
 		spin_unlock_irq(&snp->lock);
 		if (cbs) {
 			smp_mb(); /* GP end before CB invocation. */
-			srcu_schedule_cbs_snp(sp, snp);
+			srcu_schedule_cbs_snp(sp, snp, mask, cbdelay);
 		}
 	}
 
@@ -504,25 +544,52 @@ static void srcu_gp_end(struct srcu_struct *sp)
 		srcu_gp_start(sp);
 		spin_unlock_irq(&sp->gp_lock);
 		/* Throttle expedited grace periods: Should be rare! */
-		srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) &&
-				    rcu_seq_ctr(gpseq) & 0xf
-				    ? 0
-				    : SRCU_INTERVAL);
+		srcu_reschedule(sp, rcu_seq_ctr(gpseq) & 0x3ff
+				    ? 0 : SRCU_INTERVAL);
 	} else {
 		spin_unlock_irq(&sp->gp_lock);
 	}
 }
 
 /*
+ * Funnel-locking scheme to scalably mediate many concurrent expedited
+ * grace-period requests.  This function is invoked for the first known
+ * expedited request for a grace period that has already been requested,
+ * but without expediting.  To start a completely new grace period,
+ * whether expedited or not, use srcu_funnel_gp_start() instead.
+ */
+static void srcu_funnel_exp_start(struct srcu_struct *sp, struct srcu_node *snp,
+				  unsigned long s)
+{
+	unsigned long flags;
+
+	for (; snp != NULL; snp = snp->srcu_parent) {
+		if (rcu_seq_done(&sp->srcu_gp_seq, s) ||
+		    ULONG_CMP_GE(READ_ONCE(snp->srcu_gp_seq_needed_exp), s))
+			return;
+		spin_lock_irqsave(&snp->lock, flags);
+		if (ULONG_CMP_GE(snp->srcu_gp_seq_needed_exp, s)) {
+			spin_unlock_irqrestore(&snp->lock, flags);
+			return;
+		}
+		WRITE_ONCE(snp->srcu_gp_seq_needed_exp, s);
+		spin_unlock_irqrestore(&snp->lock, flags);
+	}
+	spin_lock_irqsave(&sp->gp_lock, flags);
+	if (!ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, s))
+		sp->srcu_gp_seq_needed_exp = s;
+	spin_unlock_irqrestore(&sp->gp_lock, flags);
+}
+
+/*
  * Funnel-locking scheme to scalably mediate many concurrent grace-period
  * requests.  The winner has to do the work of actually starting grace
  * period s.  Losers must either ensure that their desired grace-period
  * number is recorded on at least their leaf srcu_node structure, or they
  * must take steps to invoke their own callbacks.
  */
-static void srcu_funnel_gp_start(struct srcu_struct *sp,
-				 struct srcu_data *sdp,
-				 unsigned long s)
+static void srcu_funnel_gp_start(struct srcu_struct *sp, struct srcu_data *sdp,
+				 unsigned long s, bool do_norm)
 {
 	unsigned long flags;
 	int idx = rcu_seq_ctr(s) % ARRAY_SIZE(sdp->mynode->srcu_have_cbs);
@@ -536,14 +603,25 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp,
 		spin_lock_irqsave(&snp->lock, flags);
 		if (ULONG_CMP_GE(snp->srcu_have_cbs[idx], s)) {
 			snp_seq = snp->srcu_have_cbs[idx];
+			if (snp == sdp->mynode && snp_seq == s)
+				snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
 			spin_unlock_irqrestore(&snp->lock, flags);
 			if (snp == sdp->mynode && snp_seq != s) {
 				smp_mb(); /* CBs after GP! */
-				srcu_schedule_cbs_sdp(sdp, 0);
+				srcu_schedule_cbs_sdp(sdp, do_norm
+							   ? SRCU_INTERVAL
+							   : 0);
+				return;
 			}
+			if (!do_norm)
+				srcu_funnel_exp_start(sp, snp, s);
 			return;
 		}
 		snp->srcu_have_cbs[idx] = s;
+		if (snp == sdp->mynode)
+			snp->srcu_data_have_cbs[idx] |= sdp->grpmask;
+		if (!do_norm && ULONG_CMP_LT(snp->srcu_gp_seq_needed_exp, s))
+			snp->srcu_gp_seq_needed_exp = s;
 		spin_unlock_irqrestore(&snp->lock, flags);
 	}
 
@@ -556,6 +634,8 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp,
 		 */
 		smp_store_release(&sp->srcu_gp_seq_needed, s); /*^^^*/
 	}
+	if (!do_norm && ULONG_CMP_LT(sp->srcu_gp_seq_needed_exp, s))
+		sp->srcu_gp_seq_needed_exp = s;
 
 	/* If grace period not already done and none in progress, start it. */
 	if (!rcu_seq_done(&sp->srcu_gp_seq, s) &&
@@ -563,9 +643,7 @@ static void srcu_funnel_gp_start(struct srcu_struct *sp,
 		WARN_ON_ONCE(ULONG_CMP_GE(sp->srcu_gp_seq, sp->srcu_gp_seq_needed));
 		srcu_gp_start(sp);
 		queue_delayed_work(system_power_efficient_wq, &sp->work,
-				   atomic_read(&sp->srcu_exp_cnt)
-				   ? 0
-				   : SRCU_INTERVAL);
+				   srcu_get_delay(sp));
 	}
 	spin_unlock_irqrestore(&sp->gp_lock, flags);
 }
@@ -580,7 +658,7 @@ static bool try_check_zero(struct srcu_struct *sp, int idx, int trycount)
 	for (;;) {
 		if (srcu_readers_active_idx_check(sp, idx))
 			return true;
-		if (--trycount + !!atomic_read(&sp->srcu_exp_cnt) <= 0)
+		if (--trycount + !srcu_get_delay(sp) <= 0)
 			return false;
 		udelay(SRCU_RETRY_CHECK_DELAY);
 	}
@@ -606,6 +684,67 @@ static void srcu_flip(struct srcu_struct *sp)
 }
 
 /*
+ * If SRCU is likely idle, return true, otherwise return false.
+ *
+ * Note that it is OK for several current from-idle requests for a new
+ * grace period from idle to specify expediting because they will all end
+ * up requesting the same grace period anyhow.  So no loss.
+ *
+ * Note also that if any CPU (including the current one) is still invoking
+ * callbacks, this function will nevertheless say "idle".  This is not
+ * ideal, but the overhead of checking all CPUs' callback lists is even
+ * less ideal, especially on large systems.  Furthermore, the wakeup
+ * can happen before the callback is fully removed, so we have no choice
+ * but to accept this type of error.
+ *
+ * This function is also subject to counter-wrap errors, but let's face
+ * it, if this function was preempted for enough time for the counters
+ * to wrap, it really doesn't matter whether or not we expedite the grace
+ * period.  The extra overhead of a needlessly expedited grace period is
+ * negligible when amoritized over that time period, and the extra latency
+ * of a needlessly non-expedited grace period is similarly negligible.
+ */
+static bool srcu_might_be_idle(struct srcu_struct *sp)
+{
+	unsigned long curseq;
+	unsigned long flags;
+	struct srcu_data *sdp;
+	unsigned long t;
+
+	/* If the local srcu_data structure has callbacks, not idle.  */
+	local_irq_save(flags);
+	sdp = this_cpu_ptr(sp->sda);
+	if (rcu_segcblist_pend_cbs(&sdp->srcu_cblist)) {
+		local_irq_restore(flags);
+		return false; /* Callbacks already present, so not idle. */
+	}
+	local_irq_restore(flags);
+
+	/*
+	 * No local callbacks, so probabalistically probe global state.
+	 * Exact information would require acquiring locks, which would
+	 * kill scalability, hence the probabalistic nature of the probe.
+	 */
+
+	/* First, see if enough time has passed since the last GP. */
+	t = ktime_get_mono_fast_ns();
+	if (exp_holdoff == 0 ||
+	    time_in_range_open(t, sp->srcu_last_gp_end,
+			       sp->srcu_last_gp_end + exp_holdoff))
+		return false; /* Too soon after last GP. */
+
+	/* Next, check for probable idleness. */
+	curseq = rcu_seq_current(&sp->srcu_gp_seq);
+	smp_mb(); /* Order ->srcu_gp_seq with ->srcu_gp_seq_needed. */
+	if (ULONG_CMP_LT(curseq, READ_ONCE(sp->srcu_gp_seq_needed)))
+		return false; /* Grace period in progress, so not idle. */
+	smp_mb(); /* Order ->srcu_gp_seq with prior access. */
+	if (curseq != rcu_seq_current(&sp->srcu_gp_seq))
+		return false; /* GP # changed, so not idle. */
+	return true; /* With reasonable probability, idle! */
+}
+
+/*
  * Enqueue an SRCU callback on the srcu_data structure associated with
  * the current CPU and the specified srcu_struct structure, initiating
  * grace-period processing if it is not already running.
@@ -633,10 +772,11 @@ static void srcu_flip(struct srcu_struct *sp)
  * srcu_read_lock(), and srcu_read_unlock() that are all passed the same
  * srcu_struct structure.
  */
-void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
-	       rcu_callback_t func)
+void __call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
+		 rcu_callback_t func, bool do_norm)
 {
 	unsigned long flags;
+	bool needexp = false;
 	bool needgp = false;
 	unsigned long s;
 	struct srcu_data *sdp;
@@ -655,16 +795,28 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
 		sdp->srcu_gp_seq_needed = s;
 		needgp = true;
 	}
+	if (ULONG_CMP_LT(sdp->srcu_gp_seq_needed_exp, s)) {
+		sdp->srcu_gp_seq_needed_exp = s;
+		needexp = true;
+	}
 	spin_unlock_irqrestore(&sdp->lock, flags);
 	if (needgp)
-		srcu_funnel_gp_start(sp, sdp, s);
+		srcu_funnel_gp_start(sp, sdp, s, do_norm);
+	else
+		srcu_funnel_exp_start(sp, sdp->mynode, s);
+}
+
+void call_srcu(struct srcu_struct *sp, struct rcu_head *rhp,
+	       rcu_callback_t func)
+{
+	__call_srcu(sp, rhp, func, true);
 }
 EXPORT_SYMBOL_GPL(call_srcu);
 
 /*
  * Helper function for synchronize_srcu() and synchronize_srcu_expedited().
  */
-static void __synchronize_srcu(struct srcu_struct *sp)
+static void __synchronize_srcu(struct srcu_struct *sp, bool do_norm)
 {
 	struct rcu_synchronize rcu;
 
@@ -680,7 +832,7 @@ static void __synchronize_srcu(struct srcu_struct *sp)
 	check_init_srcu_struct(sp);
 	init_completion(&rcu.completion);
 	init_rcu_head_on_stack(&rcu.head);
-	call_srcu(sp, &rcu.head, wakeme_after_rcu);
+	__call_srcu(sp, &rcu.head, wakeme_after_rcu, do_norm);
 	wait_for_completion(&rcu.completion);
 	destroy_rcu_head_on_stack(&rcu.head);
 }
@@ -697,18 +849,7 @@ static void __synchronize_srcu(struct srcu_struct *sp)
  */
 void synchronize_srcu_expedited(struct srcu_struct *sp)
 {
-	bool do_norm = rcu_gp_is_normal();
-
-	check_init_srcu_struct(sp);
-	if (!do_norm) {
-		atomic_inc(&sp->srcu_exp_cnt);
-		smp_mb__after_atomic(); /* increment before GP. */
-	}
-	__synchronize_srcu(sp);
-	if (!do_norm) {
-		smp_mb__before_atomic(); /* GP before decrement. */
-		WARN_ON_ONCE(atomic_dec_return(&sp->srcu_exp_cnt) < 0);
-	}
+	__synchronize_srcu(sp, rcu_gp_is_normal());
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
 
@@ -750,13 +891,18 @@ EXPORT_SYMBOL_GPL(synchronize_srcu_expedited);
  * Of course, these memory-ordering guarantees apply only when
  * synchronize_srcu(), srcu_read_lock(), and srcu_read_unlock() are
  * passed the same srcu_struct structure.
+ *
+ * If SRCU is likely idle, expedite the first request.  This semantic
+ * was provided by Classic SRCU, and is relied upon by its users, so TREE
+ * SRCU must also provide it.  Note that detecting idleness is heuristic
+ * and subject to both false positives and negatives.
  */
 void synchronize_srcu(struct srcu_struct *sp)
 {
-	if (rcu_gp_is_expedited())
+	if (srcu_might_be_idle(sp) || rcu_gp_is_expedited())
 		synchronize_srcu_expedited(sp);
 	else
-		__synchronize_srcu(sp);
+		__synchronize_srcu(sp, true);
 }
 EXPORT_SYMBOL_GPL(synchronize_srcu);
 
@@ -991,6 +1137,18 @@ void process_srcu(struct work_struct *work)
 	sp = container_of(work, struct srcu_struct, work.work);
 
 	srcu_advance_state(sp);
-	srcu_reschedule(sp, atomic_read(&sp->srcu_exp_cnt) ? 0 : SRCU_INTERVAL);
+	srcu_reschedule(sp, srcu_get_delay(sp));
 }
 EXPORT_SYMBOL_GPL(process_srcu);
+
+void srcutorture_get_gp_data(enum rcutorture_type test_type,
+			     struct srcu_struct *sp, int *flags,
+			     unsigned long *gpnum, unsigned long *completed)
+{
+	if (test_type != SRCU_FLAVOR)
+		return;
+	*flags = 0;
+	*completed = rcu_seq_ctr(sp->srcu_gp_seq);
+	*gpnum = rcu_seq_ctr(sp->srcu_gp_seq_needed);
+}
+EXPORT_SYMBOL_GPL(srcutorture_get_gp_data);
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 23aa02587d0f..91fff49d5869 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -704,15 +704,11 @@ void rcutorture_get_gp_data(enum rcutorture_type test_type, int *flags,
 	default:
 		break;
 	}
-	if (rsp != NULL) {
-		*flags = READ_ONCE(rsp->gp_flags);
-		*gpnum = READ_ONCE(rsp->gpnum);
-		*completed = READ_ONCE(rsp->completed);
+	if (rsp == NULL)
 		return;
-	}
-	*flags = 0;
-	*gpnum = 0;
-	*completed = 0;
+	*flags = READ_ONCE(rsp->gp_flags);
+	*gpnum = READ_ONCE(rsp->gpnum);
+	*completed = READ_ONCE(rsp->completed);
 }
 EXPORT_SYMBOL_GPL(rcutorture_get_gp_data);
 

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ