lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1492621117-13939-13-git-send-email-paulmck@linux.vnet.ibm.com>
Date:   Wed, 19 Apr 2017 09:58:10 -0700
From:   "Paul E. McKenney" <paulmck@...ux.vnet.ibm.com>
To:     linux-kernel@...r.kernel.org
Cc:     mingo@...nel.org, jiangshanlai@...il.com, dipankar@...ibm.com,
        akpm@...ux-foundation.org, mathieu.desnoyers@...icios.com,
        josh@...htriplett.org, tglx@...utronix.de, peterz@...radead.org,
        rostedt@...dmis.org, dhowells@...hat.com, edumazet@...gle.com,
        fweisbec@...il.com, oleg@...hat.com, bobby.prani@...il.com,
        "Paul E. McKenney" <paulmck@...ux.vnet.ibm.com>
Subject: [PATCH v3 tip/core/rcu 13/40] srcu: Abstract multi-tail callback list handling

RCU has only one multi-tail callback list, which is implemented via
the nxtlist, nxttail, nxtcompleted, qlen_lazy, and qlen fields in the
rcu_data structure, and whose operations are open-code throughout the
Tree RCU implementation.  This has been more or less OK in the past,
but upcoming callback-list optimizations in SRCU could really use
a multi-tail callback list there as well.

This commit therefore abstracts the multi-tail callback list handling
into a new kernel/rcu/rcu_segcblist.h file, and uses this new API.
The simple head-and-tail pointer callback list is also abstracted and
applied everywhere except for the NOCB callback-offload lists.  (Yes,
the plan is to apply them there as well, but this commit is already
bigger than would be good.)

Signed-off-by: Paul E. McKenney <paulmck@...ux.vnet.ibm.com>
---
 kernel/rcu/rcu_segcblist.h | 625 +++++++++++++++++++++++++++++++++++++++++++++
 kernel/rcu/tree.c          | 348 ++++++++-----------------
 kernel/rcu/tree.h          |  41 +--
 kernel/rcu/tree_plugin.h   |  54 ++--
 kernel/rcu/tree_trace.c    |  21 +-
 5 files changed, 780 insertions(+), 309 deletions(-)
 create mode 100644 kernel/rcu/rcu_segcblist.h

diff --git a/kernel/rcu/rcu_segcblist.h b/kernel/rcu/rcu_segcblist.h
new file mode 100644
index 000000000000..24078f3c0218
--- /dev/null
+++ b/kernel/rcu/rcu_segcblist.h
@@ -0,0 +1,625 @@
+/*
+ * RCU segmented callback lists
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, you can access it online at
+ * http://www.gnu.org/licenses/gpl-2.0.html.
+ *
+ * Copyright IBM Corporation, 2017
+ *
+ * Authors: Paul E. McKenney <paulmck@...ux.vnet.ibm.com>
+ */
+
+#ifndef __KERNEL_RCU_SEGCBLIST_H
+#define __KERNEL_RCU_SEGCBLIST_H
+
+/* Simple unsegmented callback lists. */
+struct rcu_cblist {
+	struct rcu_head *head;
+	struct rcu_head **tail;
+	long len;
+	long len_lazy;
+};
+
+#define RCU_CBLIST_INITIALIZER(n) { .head = NULL, .tail = &n.head }
+
+/* Initialize simple callback list. */
+static inline void rcu_cblist_init(struct rcu_cblist *rclp)
+{
+	rclp->head = NULL;
+	rclp->tail = &rclp->head;
+	rclp->len = 0;
+	rclp->len_lazy = 0;
+}
+
+/* Is simple callback list empty? */
+static inline bool rcu_cblist_empty(struct rcu_cblist *rclp)
+{
+	return !rclp->head;
+}
+
+/* Return number of callbacks in simple callback list. */
+static inline long rcu_cblist_n_cbs(struct rcu_cblist *rclp)
+{
+	return rclp->len;
+}
+
+/* Return number of lazy callbacks in simple callback list. */
+static inline long rcu_cblist_n_lazy_cbs(struct rcu_cblist *rclp)
+{
+	return rclp->len_lazy;
+}
+
+/*
+ * Debug function to actually count the number of callbacks.
+ * If the number exceeds the limit specified, return -1.
+ */
+static inline long rcu_cblist_count_cbs(struct rcu_cblist *rclp, long lim)
+{
+	int cnt = 0;
+	struct rcu_head **rhpp = &rclp->head;
+
+	for (;;) {
+		if (!*rhpp)
+			return cnt;
+		if (++cnt > lim)
+			return -1;
+		rhpp = &(*rhpp)->next;
+	}
+}
+
+/*
+ * Dequeue the oldest rcu_head structure from the specified callback
+ * list.  This function assumes that the callback is non-lazy, but
+ * the caller can later invoke rcu_cblist_dequeued_lazy() if it
+ * finds otherwise (and if it cares about laziness).  This allows
+ * different users to have different ways of determining laziness.
+ */
+static inline struct rcu_head *rcu_cblist_dequeue(struct rcu_cblist *rclp)
+{
+	struct rcu_head *rhp;
+
+	rhp = rclp->head;
+	if (!rhp)
+		return NULL;
+	prefetch(rhp);
+	rclp->len--;
+	rclp->head = rhp->next;
+	if (!rclp->head)
+		rclp->tail = &rclp->head;
+	return rhp;
+}
+
+/*
+ * Account for the fact that a previously dequeued callback turned out
+ * to be marked as lazy.
+ */
+static inline void rcu_cblist_dequeued_lazy(struct rcu_cblist *rclp)
+{
+	rclp->len_lazy--;
+}
+
+/*
+ * Interim function to return rcu_cblist head pointer.  Longer term, the
+ * rcu_cblist will be used more pervasively, removing the need for this
+ * function.
+ */
+static inline struct rcu_head *rcu_cblist_head(struct rcu_cblist *rclp)
+{
+	return rclp->head;
+}
+
+/*
+ * Interim function to return rcu_cblist head pointer.  Longer term, the
+ * rcu_cblist will be used more pervasively, removing the need for this
+ * function.
+ */
+static inline struct rcu_head **rcu_cblist_tail(struct rcu_cblist *rclp)
+{
+	WARN_ON_ONCE(rcu_cblist_empty(rclp));
+	return rclp->tail;
+}
+
+/* Complicated segmented callback lists.  ;-) */
+
+/*
+ * Index values for segments in rcu_segcblist structure.
+ *
+ * The segments are as follows:
+ *
+ * [head, *tails[RCU_DONE_TAIL]):
+ *	Callbacks whose grace period has elapsed, and thus can be invoked.
+ * [*tails[RCU_DONE_TAIL], *tails[RCU_WAIT_TAIL]):
+ *	Callbacks waiting for the current GP from the current CPU's viewpoint.
+ * [*tails[RCU_WAIT_TAIL], *tails[RCU_NEXT_READY_TAIL]):
+ *	Callbacks that arrived before the next GP started, again from
+ *	the current CPU's viewpoint.  These can be handled by the next GP.
+ * [*tails[RCU_NEXT_READY_TAIL], *tails[RCU_NEXT_TAIL]):
+ *	Callbacks that might have arrived after the next GP started.
+ *	There is some uncertainty as to when a given GP starts and
+ *	ends, but a CPU knows the exact times if it is the one starting
+ *	or ending the GP.  Other CPUs know that the previous GP ends
+ *	before the next one starts.
+ *
+ * Note that RCU_WAIT_TAIL cannot be empty unless RCU_NEXT_READY_TAIL is also
+ * empty.
+ *
+ * The ->gp_seq[] array contains the grace-period number at which the
+ * corresponding segment of callbacks will be ready to invoke.  A given
+ * element of this array is meaningful only when the corresponding segment
+ * is non-empty, and it is never valid for RCU_DONE_TAIL (whose callbacks
+ * are already ready to invoke) or for RCU_NEXT_TAIL (whose callbacks have
+ * not yet been assigned a grace-period number).
+ */
+#define RCU_DONE_TAIL		0	/* Also RCU_WAIT head. */
+#define RCU_WAIT_TAIL		1	/* Also RCU_NEXT_READY head. */
+#define RCU_NEXT_READY_TAIL	2	/* Also RCU_NEXT head. */
+#define RCU_NEXT_TAIL		3
+#define RCU_CBLIST_NSEGS	4
+
+struct rcu_segcblist {
+	struct rcu_head *head;
+	struct rcu_head **tails[RCU_CBLIST_NSEGS];
+	unsigned long gp_seq[RCU_CBLIST_NSEGS];
+	long len;
+	long len_lazy;
+};
+
+/*
+ * Initialize an rcu_segcblist structure.
+ */
+static inline void rcu_segcblist_init(struct rcu_segcblist *rsclp)
+{
+	int i;
+
+	BUILD_BUG_ON(RCU_NEXT_TAIL + 1 != ARRAY_SIZE(rsclp->gp_seq));
+	BUILD_BUG_ON(ARRAY_SIZE(rsclp->tails) != ARRAY_SIZE(rsclp->gp_seq));
+	rsclp->head = NULL;
+	for (i = 0; i < RCU_CBLIST_NSEGS; i++)
+		rsclp->tails[i] = &rsclp->head;
+	rsclp->len = 0;
+	rsclp->len_lazy = 0;
+}
+
+/*
+ * Is the specified rcu_segcblist structure empty?
+ *
+ * But careful!  The fact that the ->head field is NULL does not
+ * necessarily imply that there are no callbacks associated with
+ * this structure.  When callbacks are being invoked, they are
+ * removed as a group.  If callback invocation must be preempted,
+ * the remaining callbacks will be added back to the list.  Either
+ * way, the counts are updated later.
+ *
+ * So it is often the case that rcu_segcblist_n_cbs() should be used
+ * instead.
+ */
+static inline bool rcu_segcblist_empty(struct rcu_segcblist *rsclp)
+{
+	return !rsclp->head;
+}
+
+/* Return number of callbacks in segmented callback list. */
+static inline long rcu_segcblist_n_cbs(struct rcu_segcblist *rsclp)
+{
+	return READ_ONCE(rsclp->len);
+}
+
+/* Return number of lazy callbacks in segmented callback list. */
+static inline long rcu_segcblist_n_lazy_cbs(struct rcu_segcblist *rsclp)
+{
+	return rsclp->len_lazy;
+}
+
+/* Return number of lazy callbacks in segmented callback list. */
+static inline long rcu_segcblist_n_nonlazy_cbs(struct rcu_segcblist *rsclp)
+{
+	return rsclp->len - rsclp->len_lazy;
+}
+
+/*
+ * Is the specified rcu_segcblist enabled, for example, not corresponding
+ * to an offline or callback-offloaded CPU?
+ */
+static inline bool rcu_segcblist_is_enabled(struct rcu_segcblist *rsclp)
+{
+	return !!rsclp->tails[RCU_NEXT_TAIL];
+}
+
+/*
+ * Disable the specified rcu_segcblist structure, so that callbacks can
+ * no longer be posted to it.  This structure must be empty.
+ */
+static inline void rcu_segcblist_disable(struct rcu_segcblist *rsclp)
+{
+	WARN_ON_ONCE(!rcu_segcblist_empty(rsclp));
+	WARN_ON_ONCE(rcu_segcblist_n_cbs(rsclp));
+	WARN_ON_ONCE(rcu_segcblist_n_lazy_cbs(rsclp));
+	rsclp->tails[RCU_NEXT_TAIL] = NULL;
+}
+
+/*
+ * Is the specified segment of the specified rcu_segcblist structure
+ * empty of callbacks?
+ */
+static inline bool rcu_segcblist_segempty(struct rcu_segcblist *rsclp, int seg)
+{
+	if (seg == RCU_DONE_TAIL)
+		return &rsclp->head == rsclp->tails[RCU_DONE_TAIL];
+	return rsclp->tails[seg - 1] == rsclp->tails[seg];
+}
+
+/*
+ * Are all segments following the specified segment of the specified
+ * rcu_segcblist structure empty of callbacks?  (The specified
+ * segment might well contain callbacks.)
+ */
+static inline bool rcu_segcblist_restempty(struct rcu_segcblist *rsclp, int seg)
+{
+	return !*rsclp->tails[seg];
+}
+
+/*
+ * Does the specified rcu_segcblist structure contain callbacks that
+ * are ready to be invoked?
+ */
+static inline bool rcu_segcblist_ready_cbs(struct rcu_segcblist *rsclp)
+{
+	return rcu_segcblist_is_enabled(rsclp) &&
+	       &rsclp->head != rsclp->tails[RCU_DONE_TAIL];
+}
+
+/*
+ * Does the specified rcu_segcblist structure contain callbacks that
+ * are still pending, that is, not yet ready to be invoked?
+ */
+static inline bool rcu_segcblist_pend_cbs(struct rcu_segcblist *rsclp)
+{
+	return rcu_segcblist_is_enabled(rsclp) &&
+	       !rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL);
+}
+
+/*
+ * Return a pointer to the first callback in the specified rcu_segcblist
+ * structure.  This is useful for diagnostics.
+ */
+static inline struct rcu_head *
+rcu_segcblist_first_cb(struct rcu_segcblist *rsclp)
+{
+	if (rcu_segcblist_is_enabled(rsclp))
+		return rsclp->head;
+	return NULL;
+}
+
+/*
+ * Return a pointer to the first pending callback in the specified
+ * rcu_segcblist structure.  This is useful just after posting a given
+ * callback -- if that callback is the first pending callback, then
+ * you cannot rely on someone else having already started up the required
+ * grace period.
+ */
+static inline struct rcu_head *
+rcu_segcblist_first_pend_cb(struct rcu_segcblist *rsclp)
+{
+	if (rcu_segcblist_is_enabled(rsclp))
+		return *rsclp->tails[RCU_DONE_TAIL];
+	return NULL;
+}
+
+/*
+ * Does the specified rcu_segcblist structure contain callbacks that
+ * have not yet been processed beyond having been posted, that is,
+ * does it contain callbacks in its last segment?
+ */
+static inline bool rcu_segcblist_new_cbs(struct rcu_segcblist *rsclp)
+{
+	return rcu_segcblist_is_enabled(rsclp) &&
+	       !rcu_segcblist_restempty(rsclp, RCU_NEXT_READY_TAIL);
+}
+
+/*
+ * Enqueue the specified callback onto the specified rcu_segcblist
+ * structure, updating accounting as needed.  Note that the ->len
+ * field may be accessed locklessly, hence the WRITE_ONCE().
+ * The ->len field is used by rcu_barrier() and friends to determine
+ * if it must post a callback on this structure, and it is OK
+ * for rcu_barrier() to sometimes post callbacks needlessly, but
+ * absolutely not OK for it to ever miss posting a callback.
+ */
+static inline void rcu_segcblist_enqueue(struct rcu_segcblist *rsclp,
+					 struct rcu_head *rhp, bool lazy)
+{
+	WRITE_ONCE(rsclp->len, rsclp->len + 1); /* ->len sampled locklessly. */
+	if (lazy)
+		rsclp->len_lazy++;
+	smp_mb(); /* Ensure counts are updated before callback is enqueued. */
+	rhp->next = NULL;
+	*rsclp->tails[RCU_NEXT_TAIL] = rhp;
+	rsclp->tails[RCU_NEXT_TAIL] = &rhp->next;
+}
+
+/*
+ * Extract only the counts from the specified rcu_segcblist structure,
+ * and place them in the specified rcu_cblist structure.  This function
+ * supports both callback orphaning and invocation, hence the separation
+ * of counts and callbacks.  (Callbacks ready for invocation must be
+ * orphaned and adopted separately from pending callbacks, but counts
+ * apply to all callbacks.  Locking must be used to make sure that
+ * both orphaned-callbacks lists are consistent.)
+ */
+static inline void rcu_segcblist_extract_count(struct rcu_segcblist *rsclp,
+					       struct rcu_cblist *rclp)
+{
+	rclp->len_lazy += rsclp->len_lazy;
+	rclp->len += rsclp->len;
+	rsclp->len_lazy = 0;
+	WRITE_ONCE(rsclp->len, 0); /* ->len sampled locklessly. */
+}
+
+/*
+ * Extract only those callbacks ready to be invoked from the specified
+ * rcu_segcblist structure and place them in the specified rcu_cblist
+ * structure.
+ */
+static inline void rcu_segcblist_extract_done_cbs(struct rcu_segcblist *rsclp,
+						  struct rcu_cblist *rclp)
+{
+	int i;
+
+	if (!rcu_segcblist_ready_cbs(rsclp))
+		return; /* Nothing to do. */
+	*rclp->tail = rsclp->head;
+	rsclp->head = *rsclp->tails[RCU_DONE_TAIL];
+	*rsclp->tails[RCU_DONE_TAIL] = NULL;
+	rclp->tail = rsclp->tails[RCU_DONE_TAIL];
+	for (i = RCU_CBLIST_NSEGS - 1; i >= RCU_DONE_TAIL; i--)
+		if (rsclp->tails[i] == rsclp->tails[RCU_DONE_TAIL])
+			rsclp->tails[i] = &rsclp->head;
+}
+
+/*
+ * Extract only those callbacks still pending (not yet ready to be
+ * invoked) from the specified rcu_segcblist structure and place them in
+ * the specified rcu_cblist structure.  Note that this loses information
+ * about any callbacks that might have been partway done waiting for
+ * their grace period.  Too bad!  They will have to start over.
+ */
+static inline void
+rcu_segcblist_extract_pend_cbs(struct rcu_segcblist *rsclp,
+			       struct rcu_cblist *rclp)
+{
+	int i;
+
+	if (!rcu_segcblist_pend_cbs(rsclp))
+		return; /* Nothing to do. */
+	*rclp->tail = *rsclp->tails[RCU_DONE_TAIL];
+	rclp->tail = rsclp->tails[RCU_NEXT_TAIL];
+	*rsclp->tails[RCU_DONE_TAIL] = NULL;
+	for (i = RCU_DONE_TAIL + 1; i < RCU_CBLIST_NSEGS; i++)
+		rsclp->tails[i] = rsclp->tails[RCU_DONE_TAIL];
+}
+
+/*
+ * Move the entire contents of the specified rcu_segcblist structure,
+ * counts, callbacks, and all, to the specified rcu_cblist structure.
+ * @@@ Why do we need this???  Moving early-boot CBs to NOCB lists?
+ * @@@ Memory barrier needed?  (Not if only used at boot time...)
+ */
+static inline void rcu_segcblist_extract_all(struct rcu_segcblist *rsclp,
+					     struct rcu_cblist *rclp)
+{
+	rcu_segcblist_extract_done_cbs(rsclp, rclp);
+	rcu_segcblist_extract_pend_cbs(rsclp, rclp);
+	rcu_segcblist_extract_count(rsclp, rclp);
+}
+
+/*
+ * Insert counts from the specified rcu_cblist structure in the
+ * specified rcu_segcblist structure.
+ */
+static inline void rcu_segcblist_insert_count(struct rcu_segcblist *rsclp,
+					      struct rcu_cblist *rclp)
+{
+	rsclp->len_lazy += rclp->len_lazy;
+	/* ->len sampled locklessly. */
+	WRITE_ONCE(rsclp->len, rsclp->len + rclp->len);
+	rclp->len_lazy = 0;
+	rclp->len = 0;
+}
+
+/*
+ * Move callbacks from the specified rcu_cblist to the beginning of the
+ * done-callbacks segment of the specified rcu_segcblist.
+ */
+static inline void rcu_segcblist_insert_done_cbs(struct rcu_segcblist *rsclp,
+						 struct rcu_cblist *rclp)
+{
+	int i;
+
+	if (!rclp->head)
+		return; /* No callbacks to move. */
+	*rclp->tail = rsclp->head;
+	rsclp->head = rclp->head;
+	for (i = RCU_DONE_TAIL; i < RCU_CBLIST_NSEGS; i++)
+		if (&rsclp->head == rsclp->tails[i])
+			rsclp->tails[i] = rclp->tail;
+		else
+			break;
+	rclp->head = NULL;
+	rclp->tail = &rclp->head;
+}
+
+/*
+ * Move callbacks from the specified rcu_cblist to the end of the
+ * new-callbacks segment of the specified rcu_segcblist.
+ */
+static inline void rcu_segcblist_insert_pend_cbs(struct rcu_segcblist *rsclp,
+						 struct rcu_cblist *rclp)
+{
+	if (!rclp->head)
+		return; /* Nothing to do. */
+	*rsclp->tails[RCU_NEXT_TAIL] = rclp->head;
+	rsclp->tails[RCU_NEXT_TAIL] = rclp->tail;
+	rclp->head = NULL;
+	rclp->tail = &rclp->head;
+}
+
+/*
+ * Advance the callbacks in the specified rcu_segcblist structure based
+ * on the current value passed in for the grace-period counter.
+ */
+static inline void rcu_segcblist_advance(struct rcu_segcblist *rsclp,
+					 unsigned long seq)
+{
+	int i, j;
+
+	WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp));
+	WARN_ON_ONCE(rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL));
+
+	/*
+	 * Find all callbacks whose ->gp_seq numbers indicate that they
+	 * are ready to invoke, and put them into the RCU_DONE_TAIL segment.
+	 */
+	for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) {
+		if (ULONG_CMP_LT(seq, rsclp->gp_seq[i]))
+			break;
+		rsclp->tails[RCU_DONE_TAIL] = rsclp->tails[i];
+	}
+
+	/* If no callbacks moved, nothing more need be done. */
+	if (i == RCU_WAIT_TAIL)
+		return;
+
+	/* Clean up tail pointers that might have been misordered above. */
+	for (j = RCU_WAIT_TAIL; j < i; j++)
+		rsclp->tails[j] = rsclp->tails[RCU_DONE_TAIL];
+
+	/*
+	 * Callbacks moved, so clean up the misordered ->tails[] pointers
+	 * that now point into the middle of the list of ready-to-invoke
+	 * callbacks.  The overall effect is to copy down the later pointers
+	 * into the gap that was created by the now-ready segments.
+	 */
+	for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) {
+		if (rsclp->tails[j] == rsclp->tails[RCU_NEXT_TAIL])
+			break;  /* No more callbacks. */
+		rsclp->tails[j] = rsclp->tails[i];
+		rsclp->gp_seq[j] = rsclp->gp_seq[i];
+	}
+}
+
+/*
+ * "Accelerate" callbacks based on more-accurate grace-period information.
+ * The reason for this is that RCU does not synchronize the beginnings and
+ * ends of grace periods, and that callbacks are posted locally.  This in
+ * turn means that the callbacks must be labelled conservatively early
+ * on, as getting exact information would degrade both performance and
+ * scalability.  When more accurate grace-period information becomes
+ * available, previously posted callbacks can be "accelerated", marking
+ * them to complete at the end of the earlier grace period.
+ *
+ * This function operates on an rcu_segcblist structure, and also the
+ * grace-period sequence number at which new callbacks would become
+ * ready to invoke.
+ */
+static inline bool rcu_segcblist_accelerate(struct rcu_segcblist *rsclp,
+					    unsigned long seq)
+{
+	int i;
+
+	WARN_ON_ONCE(!rcu_segcblist_is_enabled(rsclp));
+	WARN_ON_ONCE(rcu_segcblist_restempty(rsclp, RCU_DONE_TAIL));
+
+	/*
+	 * Find the segment preceding the oldest segment of callbacks
+	 * whose ->gp_seq[] completion is at or after that passed in via
+	 * "seq", skipping any empty segments.  This oldest segment, along
+	 * with any later segments, can be merged in with any newly arrived
+	 * callbacks in the RCU_NEXT_TAIL segment, and assigned "seq"
+	 * as their ->gp_seq[] grace-period completion sequence number.
+	 */
+	for (i = RCU_NEXT_READY_TAIL; i > RCU_DONE_TAIL; i--)
+		if (rsclp->tails[i] != rsclp->tails[i - 1] &&
+		    ULONG_CMP_LT(rsclp->gp_seq[i], seq))
+			break;
+
+	/*
+	 * If all the segments contain callbacks that correspond to
+	 * earlier grace-period sequence numbers than "seq", leave.
+	 * Assuming that the rcu_segcblist structure has enough
+	 * segments in its arrays, this can only happen if some of
+	 * the non-done segments contain callbacks that really are
+	 * ready to invoke.  This situation will get straightened
+	 * out by the next call to rcu_segcblist_advance().
+	 *
+	 * Also advance to the oldest segment of callbacks whose
+	 * ->gp_seq[] completion is at or after that passed in via "seq",
+	 * skipping any empty segments.
+	 */
+	if (++i >= RCU_NEXT_TAIL)
+		return false;
+
+	/*
+	 * Merge all later callbacks, including newly arrived callbacks,
+	 * into the segment located by the for-loop above.  Assign "seq"
+	 * as the ->gp_seq[] value in order to correctly handle the case
+	 * where there were no pending callbacks in the rcu_segcblist
+	 * structure other than in the RCU_NEXT_TAIL segment.
+	 */
+	for (; i < RCU_NEXT_TAIL; i++) {
+		rsclp->tails[i] = rsclp->tails[RCU_NEXT_TAIL];
+		rsclp->gp_seq[i] = seq;
+	}
+	return true;
+}
+
+/*
+ * Scan the specified rcu_segcblist structure for callbacks that need
+ * a grace period later than the one specified by "seq".  We don't look
+ * at the RCU_DONE_TAIL or RCU_NEXT_TAIL segments because they don't
+ * have a grace-period sequence number.
+ */
+static inline bool rcu_segcblist_future_gp_needed(struct rcu_segcblist *rsclp,
+						  unsigned long seq)
+{
+	int i;
+
+	for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++)
+		if (rsclp->tails[i - 1] != rsclp->tails[i] &&
+		    ULONG_CMP_LT(seq, rsclp->gp_seq[i]))
+			return true;
+	return false;
+}
+
+/*
+ * Interim function to return rcu_segcblist head pointer.  Longer term, the
+ * rcu_segcblist will be used more pervasively, removing the need for this
+ * function.
+ */
+static inline struct rcu_head *rcu_segcblist_head(struct rcu_segcblist *rsclp)
+{
+	return rsclp->head;
+}
+
+/*
+ * Interim function to return rcu_segcblist head pointer.  Longer term, the
+ * rcu_segcblist will be used more pervasively, removing the need for this
+ * function.
+ */
+static inline struct rcu_head **rcu_segcblist_tail(struct rcu_segcblist *rsclp)
+{
+	WARN_ON_ONCE(rcu_segcblist_empty(rsclp));
+	return rsclp->tails[RCU_NEXT_TAIL];
+}
+
+#endif /* __KERNEL_RCU_SEGCBLIST_H */
diff --git a/kernel/rcu/tree.c b/kernel/rcu/tree.c
index 530ab6cf7a0b..8cc9d40b41ea 100644
--- a/kernel/rcu/tree.c
+++ b/kernel/rcu/tree.c
@@ -97,8 +97,8 @@ struct rcu_state sname##_state = { \
 	.gpnum = 0UL - 300UL, \
 	.completed = 0UL - 300UL, \
 	.orphan_lock = __RAW_SPIN_LOCK_UNLOCKED(&sname##_state.orphan_lock), \
-	.orphan_nxttail = &sname##_state.orphan_nxtlist, \
-	.orphan_donetail = &sname##_state.orphan_donelist, \
+	.orphan_pend = RCU_CBLIST_INITIALIZER(sname##_state.orphan_pend), \
+	.orphan_done = RCU_CBLIST_INITIALIZER(sname##_state.orphan_done), \
 	.barrier_mutex = __MUTEX_INITIALIZER(sname##_state.barrier_mutex), \
 	.name = RCU_STATE_NAME(sname), \
 	.abbr = sabbr, \
@@ -726,16 +726,6 @@ void rcutorture_record_progress(unsigned long vernum)
 EXPORT_SYMBOL_GPL(rcutorture_record_progress);
 
 /*
- * Does the CPU have callbacks ready to be invoked?
- */
-static int
-cpu_has_callbacks_ready_to_invoke(struct rcu_data *rdp)
-{
-	return &rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL] &&
-	       rdp->nxttail[RCU_NEXT_TAIL] != NULL;
-}
-
-/*
  * Return the root node of the specified rcu_state structure.
  */
 static struct rcu_node *rcu_get_root(struct rcu_state *rsp)
@@ -765,21 +755,17 @@ static int rcu_future_needs_gp(struct rcu_state *rsp)
 static bool
 cpu_needs_another_gp(struct rcu_state *rsp, struct rcu_data *rdp)
 {
-	int i;
-
 	if (rcu_gp_in_progress(rsp))
 		return false;  /* No, a grace period is already in progress. */
 	if (rcu_future_needs_gp(rsp))
 		return true;  /* Yes, a no-CBs CPU needs one. */
-	if (!rdp->nxttail[RCU_NEXT_TAIL])
+	if (!rcu_segcblist_is_enabled(&rdp->cblist))
 		return false;  /* No, this is a no-CBs (or offline) CPU. */
-	if (*rdp->nxttail[RCU_NEXT_READY_TAIL])
+	if (!rcu_segcblist_restempty(&rdp->cblist, RCU_NEXT_READY_TAIL))
 		return true;  /* Yes, CPU has newly registered callbacks. */
-	for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++)
-		if (rdp->nxttail[i - 1] != rdp->nxttail[i] &&
-		    ULONG_CMP_LT(READ_ONCE(rsp->completed),
-				 rdp->nxtcompleted[i]))
-			return true;  /* Yes, CBs for future grace period. */
+	if (rcu_segcblist_future_gp_needed(&rdp->cblist,
+					   READ_ONCE(rsp->completed)))
+		return true;  /* Yes, CBs for future grace period. */
 	return false; /* No grace period needed. */
 }
 
@@ -1490,7 +1476,8 @@ static void print_other_cpu_stall(struct rcu_state *rsp, unsigned long gpnum)
 
 	print_cpu_stall_info_end();
 	for_each_possible_cpu(cpu)
-		totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+		totqlen += rcu_segcblist_n_cbs(&per_cpu_ptr(rsp->rda,
+							    cpu)->cblist);
 	pr_cont("(detected by %d, t=%ld jiffies, g=%ld, c=%ld, q=%lu)\n",
 	       smp_processor_id(), (long)(jiffies - rsp->gp_start),
 	       (long)rsp->gpnum, (long)rsp->completed, totqlen);
@@ -1544,7 +1531,8 @@ static void print_cpu_stall(struct rcu_state *rsp)
 	print_cpu_stall_info(rsp, smp_processor_id());
 	print_cpu_stall_info_end();
 	for_each_possible_cpu(cpu)
-		totqlen += per_cpu_ptr(rsp->rda, cpu)->qlen;
+		totqlen += rcu_segcblist_n_cbs(&per_cpu_ptr(rsp->rda,
+							    cpu)->cblist);
 	pr_cont(" (t=%lu jiffies g=%ld c=%ld q=%lu)\n",
 		jiffies - rsp->gp_start,
 		(long)rsp->gpnum, (long)rsp->completed, totqlen);
@@ -1647,30 +1635,6 @@ void rcu_cpu_stall_reset(void)
 }
 
 /*
- * Initialize the specified rcu_data structure's default callback list
- * to empty.  The default callback list is the one that is not used by
- * no-callbacks CPUs.
- */
-static void init_default_callback_list(struct rcu_data *rdp)
-{
-	int i;
-
-	rdp->nxtlist = NULL;
-	for (i = 0; i < RCU_NEXT_SIZE; i++)
-		rdp->nxttail[i] = &rdp->nxtlist;
-}
-
-/*
- * Initialize the specified rcu_data structure's callback list to empty.
- */
-static void init_callback_list(struct rcu_data *rdp)
-{
-	if (init_nocb_callback_list(rdp))
-		return;
-	init_default_callback_list(rdp);
-}
-
-/*
  * Determine the value that ->completed will have at the end of the
  * next subsequent grace period.  This is used to tag callbacks so that
  * a CPU can invoke callbacks in a timely fashion even if that CPU has
@@ -1724,7 +1688,6 @@ rcu_start_future_gp(struct rcu_node *rnp, struct rcu_data *rdp,
 		    unsigned long *c_out)
 {
 	unsigned long c;
-	int i;
 	bool ret = false;
 	struct rcu_node *rnp_root = rcu_get_root(rdp->rsp);
 
@@ -1770,13 +1733,11 @@ rcu_start_future_gp(struct rcu_node *rnp, struct rcu_data *rdp,
 	/*
 	 * Get a new grace-period number.  If there really is no grace
 	 * period in progress, it will be smaller than the one we obtained
-	 * earlier.  Adjust callbacks as needed.  Note that even no-CBs
-	 * CPUs have a ->nxtcompleted[] array, so no no-CBs checks needed.
+	 * earlier.  Adjust callbacks as needed.
 	 */
 	c = rcu_cbs_completed(rdp->rsp, rnp_root);
-	for (i = RCU_DONE_TAIL; i < RCU_NEXT_TAIL; i++)
-		if (ULONG_CMP_LT(c, rdp->nxtcompleted[i]))
-			rdp->nxtcompleted[i] = c;
+	if (!rcu_is_nocb_cpu(rdp->cpu))
+		(void)rcu_segcblist_accelerate(&rdp->cblist, c);
 
 	/*
 	 * If the needed for the required grace period is already
@@ -1856,57 +1817,27 @@ static void rcu_gp_kthread_wake(struct rcu_state *rsp)
 static bool rcu_accelerate_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
 			       struct rcu_data *rdp)
 {
-	unsigned long c;
-	int i;
-	bool ret;
-
-	/* If the CPU has no callbacks, nothing to do. */
-	if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
-		return false;
-
-	/*
-	 * Starting from the sublist containing the callbacks most
-	 * recently assigned a ->completed number and working down, find the
-	 * first sublist that is not assignable to an upcoming grace period.
-	 * Such a sublist has something in it (first two tests) and has
-	 * a ->completed number assigned that will complete sooner than
-	 * the ->completed number for newly arrived callbacks (last test).
-	 *
-	 * The key point is that any later sublist can be assigned the
-	 * same ->completed number as the newly arrived callbacks, which
-	 * means that the callbacks in any of these later sublist can be
-	 * grouped into a single sublist, whether or not they have already
-	 * been assigned a ->completed number.
-	 */
-	c = rcu_cbs_completed(rsp, rnp);
-	for (i = RCU_NEXT_TAIL - 1; i > RCU_DONE_TAIL; i--)
-		if (rdp->nxttail[i] != rdp->nxttail[i - 1] &&
-		    !ULONG_CMP_GE(rdp->nxtcompleted[i], c))
-			break;
+	bool ret = false;
 
-	/*
-	 * If there are no sublist for unassigned callbacks, leave.
-	 * At the same time, advance "i" one sublist, so that "i" will
-	 * index into the sublist where all the remaining callbacks should
-	 * be grouped into.
-	 */
-	if (++i >= RCU_NEXT_TAIL)
+	/* If no pending (not yet ready to invoke) callbacks, nothing to do. */
+	if (!rcu_segcblist_pend_cbs(&rdp->cblist))
 		return false;
 
 	/*
-	 * Assign all subsequent callbacks' ->completed number to the next
-	 * full grace period and group them all in the sublist initially
-	 * indexed by "i".
+	 * Callbacks are often registered with incomplete grace-period
+	 * information.  Something about the fact that getting exact
+	 * information requires acquiring a global lock...  RCU therefore
+	 * makes a conservative estimate of the grace period number at which
+	 * a given callback will become ready to invoke.	The following
+	 * code checks this estimate and improves it when possible, thus
+	 * accelerating callback invocation to an earlier grace-period
+	 * number.
 	 */
-	for (; i <= RCU_NEXT_TAIL; i++) {
-		rdp->nxttail[i] = rdp->nxttail[RCU_NEXT_TAIL];
-		rdp->nxtcompleted[i] = c;
-	}
-	/* Record any needed additional grace periods. */
-	ret = rcu_start_future_gp(rnp, rdp, NULL);
+	if (rcu_segcblist_accelerate(&rdp->cblist, rcu_cbs_completed(rsp, rnp)))
+		ret = rcu_start_future_gp(rnp, rdp, NULL);
 
 	/* Trace depending on how much we were able to accelerate. */
-	if (!*rdp->nxttail[RCU_WAIT_TAIL])
+	if (rcu_segcblist_restempty(&rdp->cblist, RCU_WAIT_TAIL))
 		trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("AccWaitCB"));
 	else
 		trace_rcu_grace_period(rsp->name, rdp->gpnum, TPS("AccReadyCB"));
@@ -1926,32 +1857,15 @@ static bool rcu_accelerate_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
 static bool rcu_advance_cbs(struct rcu_state *rsp, struct rcu_node *rnp,
 			    struct rcu_data *rdp)
 {
-	int i, j;
-
-	/* If the CPU has no callbacks, nothing to do. */
-	if (!rdp->nxttail[RCU_NEXT_TAIL] || !*rdp->nxttail[RCU_DONE_TAIL])
+	/* If no pending (not yet ready to invoke) callbacks, nothing to do. */
+	if (!rcu_segcblist_pend_cbs(&rdp->cblist))
 		return false;
 
 	/*
 	 * Find all callbacks whose ->completed numbers indicate that they
 	 * are ready to invoke, and put them into the RCU_DONE_TAIL sublist.
 	 */
-	for (i = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++) {
-		if (ULONG_CMP_LT(rnp->completed, rdp->nxtcompleted[i]))
-			break;
-		rdp->nxttail[RCU_DONE_TAIL] = rdp->nxttail[i];
-	}
-	/* Clean up any sublist tail pointers that were misordered above. */
-	for (j = RCU_WAIT_TAIL; j < i; j++)
-		rdp->nxttail[j] = rdp->nxttail[RCU_DONE_TAIL];
-
-	/* Copy down callbacks to fill in empty sublists. */
-	for (j = RCU_WAIT_TAIL; i < RCU_NEXT_TAIL; i++, j++) {
-		if (rdp->nxttail[j] == rdp->nxttail[RCU_NEXT_TAIL])
-			break;
-		rdp->nxttail[j] = rdp->nxttail[i];
-		rdp->nxtcompleted[j] = rdp->nxtcompleted[i];
-	}
+	rcu_segcblist_advance(&rdp->cblist, rnp->completed);
 
 	/* Classify any remaining callbacks. */
 	return rcu_accelerate_cbs(rsp, rnp, rdp);
@@ -2668,13 +2582,8 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
 	 * because _rcu_barrier() excludes CPU-hotplug operations, so it
 	 * cannot be running now.  Thus no memory barrier is required.
 	 */
-	if (rdp->nxtlist != NULL) {
-		rsp->qlen_lazy += rdp->qlen_lazy;
-		rsp->qlen += rdp->qlen;
-		rdp->n_cbs_orphaned += rdp->qlen;
-		rdp->qlen_lazy = 0;
-		WRITE_ONCE(rdp->qlen, 0);
-	}
+	rdp->n_cbs_orphaned += rcu_segcblist_n_cbs(&rdp->cblist);
+	rcu_segcblist_extract_count(&rdp->cblist, &rsp->orphan_done);
 
 	/*
 	 * Next, move those callbacks still needing a grace period to
@@ -2682,31 +2591,18 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
 	 * Some of the callbacks might have gone partway through a grace
 	 * period, but that is too bad.  They get to start over because we
 	 * cannot assume that grace periods are synchronized across CPUs.
-	 * We don't bother updating the ->nxttail[] array yet, instead
-	 * we just reset the whole thing later on.
 	 */
-	if (*rdp->nxttail[RCU_DONE_TAIL] != NULL) {
-		*rsp->orphan_nxttail = *rdp->nxttail[RCU_DONE_TAIL];
-		rsp->orphan_nxttail = rdp->nxttail[RCU_NEXT_TAIL];
-		*rdp->nxttail[RCU_DONE_TAIL] = NULL;
-	}
+	rcu_segcblist_extract_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
 
 	/*
 	 * Then move the ready-to-invoke callbacks to the orphanage,
 	 * where some other CPU will pick them up.  These will not be
 	 * required to pass though another grace period: They are done.
 	 */
-	if (rdp->nxtlist != NULL) {
-		*rsp->orphan_donetail = rdp->nxtlist;
-		rsp->orphan_donetail = rdp->nxttail[RCU_DONE_TAIL];
-	}
+	rcu_segcblist_extract_done_cbs(&rdp->cblist, &rsp->orphan_done);
 
-	/*
-	 * Finally, initialize the rcu_data structure's list to empty and
-	 * disallow further callbacks on this CPU.
-	 */
-	init_callback_list(rdp);
-	rdp->nxttail[RCU_NEXT_TAIL] = NULL;
+	/* Finally, disallow further callbacks on this CPU.  */
+	rcu_segcblist_disable(&rdp->cblist);
 }
 
 /*
@@ -2715,7 +2611,6 @@ rcu_send_cbs_to_orphanage(int cpu, struct rcu_state *rsp,
  */
 static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
 {
-	int i;
 	struct rcu_data *rdp = raw_cpu_ptr(rsp->rda);
 
 	/* No-CBs CPUs are handled specially. */
@@ -2724,13 +2619,11 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
 		return;
 
 	/* Do the accounting first. */
-	rdp->qlen_lazy += rsp->qlen_lazy;
-	rdp->qlen += rsp->qlen;
-	rdp->n_cbs_adopted += rsp->qlen;
-	if (rsp->qlen_lazy != rsp->qlen)
+	rdp->n_cbs_adopted += rcu_cblist_n_cbs(&rsp->orphan_done);
+	if (rcu_cblist_n_lazy_cbs(&rsp->orphan_done) !=
+	    rcu_cblist_n_cbs(&rsp->orphan_done))
 		rcu_idle_count_callbacks_posted();
-	rsp->qlen_lazy = 0;
-	rsp->qlen = 0;
+	rcu_segcblist_insert_count(&rdp->cblist, &rsp->orphan_done);
 
 	/*
 	 * We do not need a memory barrier here because the only way we
@@ -2738,24 +2631,13 @@ static void rcu_adopt_orphan_cbs(struct rcu_state *rsp, unsigned long flags)
 	 * we are the task doing the rcu_barrier().
 	 */
 
-	/* First adopt the ready-to-invoke callbacks. */
-	if (rsp->orphan_donelist != NULL) {
-		*rsp->orphan_donetail = *rdp->nxttail[RCU_DONE_TAIL];
-		*rdp->nxttail[RCU_DONE_TAIL] = rsp->orphan_donelist;
-		for (i = RCU_NEXT_SIZE - 1; i >= RCU_DONE_TAIL; i--)
-			if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
-				rdp->nxttail[i] = rsp->orphan_donetail;
-		rsp->orphan_donelist = NULL;
-		rsp->orphan_donetail = &rsp->orphan_donelist;
-	}
-
-	/* And then adopt the callbacks that still need a grace period. */
-	if (rsp->orphan_nxtlist != NULL) {
-		*rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxtlist;
-		rdp->nxttail[RCU_NEXT_TAIL] = rsp->orphan_nxttail;
-		rsp->orphan_nxtlist = NULL;
-		rsp->orphan_nxttail = &rsp->orphan_nxtlist;
-	}
+	/* First adopt the ready-to-invoke callbacks, then the done ones. */
+	rcu_segcblist_insert_done_cbs(&rdp->cblist, &rsp->orphan_done);
+	WARN_ON_ONCE(!rcu_cblist_empty(&rsp->orphan_done));
+	rcu_segcblist_insert_pend_cbs(&rdp->cblist, &rsp->orphan_pend);
+	WARN_ON_ONCE(!rcu_cblist_empty(&rsp->orphan_pend));
+	WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) !=
+		     !rcu_segcblist_n_cbs(&rdp->cblist));
 }
 
 /*
@@ -2843,9 +2725,11 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 	rcu_adopt_orphan_cbs(rsp, flags);
 	raw_spin_unlock_irqrestore(&rsp->orphan_lock, flags);
 
-	WARN_ONCE(rdp->qlen != 0 || rdp->nxtlist != NULL,
-		  "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, nxtlist=%p\n",
-		  cpu, rdp->qlen, rdp->nxtlist);
+	WARN_ONCE(rcu_segcblist_n_cbs(&rdp->cblist) != 0 ||
+		  !rcu_segcblist_empty(&rdp->cblist),
+		  "rcu_cleanup_dead_cpu: Callbacks on offline CPU %d: qlen=%lu, 1stCB=%p\n",
+		  cpu, rcu_segcblist_n_cbs(&rdp->cblist),
+		  rcu_segcblist_first_cb(&rdp->cblist));
 }
 
 /*
@@ -2855,14 +2739,17 @@ static void rcu_cleanup_dead_cpu(int cpu, struct rcu_state *rsp)
 static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 {
 	unsigned long flags;
-	struct rcu_head *next, *list, **tail;
-	long bl, count, count_lazy;
-	int i;
+	struct rcu_head *rhp;
+	struct rcu_cblist rcl = RCU_CBLIST_INITIALIZER(rcl);
+	long bl, count;
 
 	/* If no callbacks are ready, just return. */
-	if (!cpu_has_callbacks_ready_to_invoke(rdp)) {
-		trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, 0);
-		trace_rcu_batch_end(rsp->name, 0, !!READ_ONCE(rdp->nxtlist),
+	if (!rcu_segcblist_ready_cbs(&rdp->cblist)) {
+		trace_rcu_batch_start(rsp->name,
+				      rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+				      rcu_segcblist_n_cbs(&rdp->cblist), 0);
+		trace_rcu_batch_end(rsp->name, 0,
+				    !rcu_segcblist_empty(&rdp->cblist),
 				    need_resched(), is_idle_task(current),
 				    rcu_is_callbacks_kthread());
 		return;
@@ -2870,73 +2757,62 @@ static void rcu_do_batch(struct rcu_state *rsp, struct rcu_data *rdp)
 
 	/*
 	 * Extract the list of ready callbacks, disabling to prevent
-	 * races with call_rcu() from interrupt handlers.
+	 * races with call_rcu() from interrupt handlers.  Leave the
+	 * callback counts, as rcu_barrier() needs to be conservative.
 	 */
 	local_irq_save(flags);
 	WARN_ON_ONCE(cpu_is_offline(smp_processor_id()));
 	bl = rdp->blimit;
-	trace_rcu_batch_start(rsp->name, rdp->qlen_lazy, rdp->qlen, bl);
-	list = rdp->nxtlist;
-	rdp->nxtlist = *rdp->nxttail[RCU_DONE_TAIL];
-	*rdp->nxttail[RCU_DONE_TAIL] = NULL;
-	tail = rdp->nxttail[RCU_DONE_TAIL];
-	for (i = RCU_NEXT_SIZE - 1; i >= 0; i--)
-		if (rdp->nxttail[i] == rdp->nxttail[RCU_DONE_TAIL])
-			rdp->nxttail[i] = &rdp->nxtlist;
+	trace_rcu_batch_start(rsp->name, rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+			      rcu_segcblist_n_cbs(&rdp->cblist), bl);
+	rcu_segcblist_extract_done_cbs(&rdp->cblist, &rcl);
 	local_irq_restore(flags);
 
 	/* Invoke callbacks. */
-	count = count_lazy = 0;
-	while (list) {
-		next = list->next;
-		prefetch(next);
-		debug_rcu_head_unqueue(list);
-		if (__rcu_reclaim(rsp->name, list))
-			count_lazy++;
-		list = next;
-		/* Stop only if limit reached and CPU has something to do. */
-		if (++count >= bl &&
+	rhp = rcu_cblist_dequeue(&rcl);
+	for (; rhp; rhp = rcu_cblist_dequeue(&rcl)) {
+		debug_rcu_head_unqueue(rhp);
+		if (__rcu_reclaim(rsp->name, rhp))
+			rcu_cblist_dequeued_lazy(&rcl);
+		/*
+		 * Stop only if limit reached and CPU has something to do.
+		 * Note: The rcl structure counts down from zero.
+		 */
+		if (-rcu_cblist_n_cbs(&rcl) >= bl &&
 		    (need_resched() ||
 		     (!is_idle_task(current) && !rcu_is_callbacks_kthread())))
 			break;
 	}
 
 	local_irq_save(flags);
-	trace_rcu_batch_end(rsp->name, count, !!list, need_resched(),
-			    is_idle_task(current),
+	count = -rcu_cblist_n_cbs(&rcl);
+	trace_rcu_batch_end(rsp->name, count, !rcu_cblist_empty(&rcl),
+			    need_resched(), is_idle_task(current),
 			    rcu_is_callbacks_kthread());
 
-	/* Update count, and requeue any remaining callbacks. */
-	if (list != NULL) {
-		*tail = rdp->nxtlist;
-		rdp->nxtlist = list;
-		for (i = 0; i < RCU_NEXT_SIZE; i++)
-			if (&rdp->nxtlist == rdp->nxttail[i])
-				rdp->nxttail[i] = tail;
-			else
-				break;
-	}
+	/* Update counts and requeue any remaining callbacks. */
+	rcu_segcblist_insert_done_cbs(&rdp->cblist, &rcl);
 	smp_mb(); /* List handling before counting for rcu_barrier(). */
-	rdp->qlen_lazy -= count_lazy;
-	WRITE_ONCE(rdp->qlen, rdp->qlen - count);
 	rdp->n_cbs_invoked += count;
+	rcu_segcblist_insert_count(&rdp->cblist, &rcl);
 
 	/* Reinstate batch limit if we have worked down the excess. */
-	if (rdp->blimit == LONG_MAX && rdp->qlen <= qlowmark)
+	count = rcu_segcblist_n_cbs(&rdp->cblist);
+	if (rdp->blimit == LONG_MAX && count <= qlowmark)
 		rdp->blimit = blimit;
 
 	/* Reset ->qlen_last_fqs_check trigger if enough CBs have drained. */
-	if (rdp->qlen == 0 && rdp->qlen_last_fqs_check != 0) {
+	if (count == 0 && rdp->qlen_last_fqs_check != 0) {
 		rdp->qlen_last_fqs_check = 0;
 		rdp->n_force_qs_snap = rsp->n_force_qs;
-	} else if (rdp->qlen < rdp->qlen_last_fqs_check - qhimark)
-		rdp->qlen_last_fqs_check = rdp->qlen;
-	WARN_ON_ONCE((rdp->nxtlist == NULL) != (rdp->qlen == 0));
+	} else if (count < rdp->qlen_last_fqs_check - qhimark)
+		rdp->qlen_last_fqs_check = count;
+	WARN_ON_ONCE(rcu_segcblist_empty(&rdp->cblist) != (count == 0));
 
 	local_irq_restore(flags);
 
 	/* Re-invoke RCU core processing if there are callbacks remaining. */
-	if (cpu_has_callbacks_ready_to_invoke(rdp))
+	if (rcu_segcblist_ready_cbs(&rdp->cblist))
 		invoke_rcu_core();
 }
 
@@ -3120,7 +2996,7 @@ __rcu_process_callbacks(struct rcu_state *rsp)
 	}
 
 	/* If there are callbacks ready, invoke them. */
-	if (cpu_has_callbacks_ready_to_invoke(rdp))
+	if (rcu_segcblist_ready_cbs(&rdp->cblist))
 		invoke_rcu_callbacks(rsp, rdp);
 
 	/* Do any needed deferred wakeups of rcuo kthreads. */
@@ -3192,7 +3068,8 @@ static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
 	 * invoking force_quiescent_state() if the newly enqueued callback
 	 * is the only one waiting for a grace period to complete.
 	 */
-	if (unlikely(rdp->qlen > rdp->qlen_last_fqs_check + qhimark)) {
+	if (unlikely(rcu_segcblist_n_cbs(&rdp->cblist) >
+		     rdp->qlen_last_fqs_check + qhimark)) {
 
 		/* Are we ignoring a completed grace period? */
 		note_gp_changes(rsp, rdp);
@@ -3210,10 +3087,10 @@ static void __call_rcu_core(struct rcu_state *rsp, struct rcu_data *rdp,
 			/* Give the grace period a kick. */
 			rdp->blimit = LONG_MAX;
 			if (rsp->n_force_qs == rdp->n_force_qs_snap &&
-			    *rdp->nxttail[RCU_DONE_TAIL] != head)
+			    rcu_segcblist_first_pend_cb(&rdp->cblist) != head)
 				force_quiescent_state(rsp);
 			rdp->n_force_qs_snap = rsp->n_force_qs;
-			rdp->qlen_last_fqs_check = rdp->qlen;
+			rdp->qlen_last_fqs_check = rcu_segcblist_n_cbs(&rdp->cblist);
 		}
 	}
 }
@@ -3253,7 +3130,7 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func,
 	rdp = this_cpu_ptr(rsp->rda);
 
 	/* Add the callback to our list. */
-	if (unlikely(rdp->nxttail[RCU_NEXT_TAIL] == NULL) || cpu != -1) {
+	if (unlikely(!rcu_segcblist_is_enabled(&rdp->cblist)) || cpu != -1) {
 		int offline;
 
 		if (cpu != -1)
@@ -3272,23 +3149,21 @@ __call_rcu(struct rcu_head *head, rcu_callback_t func,
 		 */
 		BUG_ON(cpu != -1);
 		WARN_ON_ONCE(!rcu_is_watching());
-		if (!likely(rdp->nxtlist))
-			init_default_callback_list(rdp);
+		if (rcu_segcblist_empty(&rdp->cblist))
+			rcu_segcblist_init(&rdp->cblist);
 	}
-	WRITE_ONCE(rdp->qlen, rdp->qlen + 1);
-	if (lazy)
-		rdp->qlen_lazy++;
-	else
+	rcu_segcblist_enqueue(&rdp->cblist, head, lazy);
+	if (!lazy)
 		rcu_idle_count_callbacks_posted();
-	smp_mb();  /* Count before adding callback for rcu_barrier(). */
-	*rdp->nxttail[RCU_NEXT_TAIL] = head;
-	rdp->nxttail[RCU_NEXT_TAIL] = &head->next;
 
 	if (__is_kfree_rcu_offset((unsigned long)func))
 		trace_rcu_kfree_callback(rsp->name, head, (unsigned long)func,
-					 rdp->qlen_lazy, rdp->qlen);
+					 rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+					 rcu_segcblist_n_cbs(&rdp->cblist));
 	else
-		trace_rcu_callback(rsp->name, head, rdp->qlen_lazy, rdp->qlen);
+		trace_rcu_callback(rsp->name, head,
+				   rcu_segcblist_n_lazy_cbs(&rdp->cblist),
+				   rcu_segcblist_n_cbs(&rdp->cblist));
 
 	/* Go handle any RCU core processing required. */
 	__call_rcu_core(rsp, rdp, head, flags);
@@ -3600,7 +3475,7 @@ static int __rcu_pending(struct rcu_state *rsp, struct rcu_data *rdp)
 	}
 
 	/* Does this CPU have callbacks ready to invoke? */
-	if (cpu_has_callbacks_ready_to_invoke(rdp)) {
+	if (rcu_segcblist_ready_cbs(&rdp->cblist)) {
 		rdp->n_rp_cb_ready++;
 		return 1;
 	}
@@ -3664,10 +3539,10 @@ static bool __maybe_unused rcu_cpu_has_callbacks(bool *all_lazy)
 
 	for_each_rcu_flavor(rsp) {
 		rdp = this_cpu_ptr(rsp->rda);
-		if (!rdp->nxtlist)
+		if (rcu_segcblist_empty(&rdp->cblist))
 			continue;
 		hc = true;
-		if (rdp->qlen != rdp->qlen_lazy || !all_lazy) {
+		if (rcu_segcblist_n_nonlazy_cbs(&rdp->cblist) || !all_lazy) {
 			al = false;
 			break;
 		}
@@ -3776,7 +3651,7 @@ static void _rcu_barrier(struct rcu_state *rsp)
 				__call_rcu(&rdp->barrier_head,
 					   rcu_barrier_callback, rsp, cpu, 0);
 			}
-		} else if (READ_ONCE(rdp->qlen)) {
+		} else if (rcu_segcblist_n_cbs(&rdp->cblist)) {
 			_rcu_barrier_trace(rsp, "OnlineQ", cpu,
 					   rsp->barrier_sequence);
 			smp_call_function_single(cpu, rcu_barrier_func, rsp, 1);
@@ -3885,8 +3760,9 @@ rcu_init_percpu_data(int cpu, struct rcu_state *rsp)
 	rdp->qlen_last_fqs_check = 0;
 	rdp->n_force_qs_snap = rsp->n_force_qs;
 	rdp->blimit = blimit;
-	if (!rdp->nxtlist)
-		init_callback_list(rdp);  /* Re-enable callbacks on this CPU. */
+	if (rcu_segcblist_empty(&rdp->cblist) && /* No early-boot CBs? */
+	    !init_nocb_callback_list(rdp))
+		rcu_segcblist_init(&rdp->cblist);  /* Re-enable callbacks. */
 	rdp->dynticks->dynticks_nesting = DYNTICK_TASK_EXIT_IDLE;
 	rcu_sysidle_init_percpu_data(rdp->dynticks);
 	rcu_dynticks_eqs_online();
diff --git a/kernel/rcu/tree.h b/kernel/rcu/tree.h
index 376c01e539c7..93889ff21dbb 100644
--- a/kernel/rcu/tree.h
+++ b/kernel/rcu/tree.h
@@ -30,6 +30,7 @@
 #include <linux/seqlock.h>
 #include <linux/swait.h>
 #include <linux/stop_machine.h>
+#include "rcu_segcblist.h"
 
 /*
  * Define shape of hierarchy based on NR_CPUS, CONFIG_RCU_FANOUT, and
@@ -335,34 +336,9 @@ struct rcu_data {
 					/* period it is aware of. */
 
 	/* 2) batch handling */
-	/*
-	 * If nxtlist is not NULL, it is partitioned as follows.
-	 * Any of the partitions might be empty, in which case the
-	 * pointer to that partition will be equal to the pointer for
-	 * the following partition.  When the list is empty, all of
-	 * the nxttail elements point to the ->nxtlist pointer itself,
-	 * which in that case is NULL.
-	 *
-	 * [nxtlist, *nxttail[RCU_DONE_TAIL]):
-	 *	Entries that batch # <= ->completed
-	 *	The grace period for these entries has completed, and
-	 *	the other grace-period-completed entries may be moved
-	 *	here temporarily in rcu_process_callbacks().
-	 * [*nxttail[RCU_DONE_TAIL], *nxttail[RCU_WAIT_TAIL]):
-	 *	Entries that batch # <= ->completed - 1: waiting for current GP
-	 * [*nxttail[RCU_WAIT_TAIL], *nxttail[RCU_NEXT_READY_TAIL]):
-	 *	Entries known to have arrived before current GP ended
-	 * [*nxttail[RCU_NEXT_READY_TAIL], *nxttail[RCU_NEXT_TAIL]):
-	 *	Entries that might have arrived after current GP ended
-	 *	Note that the value of *nxttail[RCU_NEXT_TAIL] will
-	 *	always be NULL, as this is the end of the list.
-	 */
-	struct rcu_head *nxtlist;
-	struct rcu_head **nxttail[RCU_NEXT_SIZE];
-	unsigned long	nxtcompleted[RCU_NEXT_SIZE];
-					/* grace periods for sublists. */
-	long		qlen_lazy;	/* # of lazy queued callbacks */
-	long		qlen;		/* # of queued callbacks, incl lazy */
+	struct rcu_segcblist cblist;	/* Segmented callback list, with */
+					/* different callbacks waiting for */
+					/* different grace periods. */
 	long		qlen_last_fqs_check;
 					/* qlen at last check for QS forcing */
 	unsigned long	n_cbs_invoked;	/* count of RCU cbs invoked. */
@@ -500,14 +476,11 @@ struct rcu_state {
 
 	raw_spinlock_t orphan_lock ____cacheline_internodealigned_in_smp;
 						/* Protect following fields. */
-	struct rcu_head *orphan_nxtlist;	/* Orphaned callbacks that */
+	struct rcu_cblist orphan_pend;		/* Orphaned callbacks that */
 						/*  need a grace period. */
-	struct rcu_head **orphan_nxttail;	/* Tail of above. */
-	struct rcu_head *orphan_donelist;	/* Orphaned callbacks that */
+	struct rcu_cblist orphan_done;		/* Orphaned callbacks that */
 						/*  are ready to invoke. */
-	struct rcu_head **orphan_donetail;	/* Tail of above. */
-	long qlen_lazy;				/* Number of lazy callbacks. */
-	long qlen;				/* Total number of callbacks. */
+						/* (Contains counts.) */
 	/* End of fields guarded by orphan_lock. */
 
 	struct mutex barrier_mutex;		/* Guards barrier fields. */
diff --git a/kernel/rcu/tree_plugin.h b/kernel/rcu/tree_plugin.h
index 621296a6694b..f88356652dcf 100644
--- a/kernel/rcu/tree_plugin.h
+++ b/kernel/rcu/tree_plugin.h
@@ -1350,10 +1350,10 @@ static bool __maybe_unused rcu_try_advance_all_cbs(void)
 		 */
 		if ((rdp->completed != rnp->completed ||
 		     unlikely(READ_ONCE(rdp->gpwrap))) &&
-		    rdp->nxttail[RCU_DONE_TAIL] != rdp->nxttail[RCU_NEXT_TAIL])
+		    rcu_segcblist_pend_cbs(&rdp->cblist))
 			note_gp_changes(rsp, rdp);
 
-		if (cpu_has_callbacks_ready_to_invoke(rdp))
+		if (rcu_segcblist_ready_cbs(&rdp->cblist))
 			cbs_ready = true;
 	}
 	return cbs_ready;
@@ -1461,7 +1461,7 @@ static void rcu_prepare_for_idle(void)
 	rdtp->last_accelerate = jiffies;
 	for_each_rcu_flavor(rsp) {
 		rdp = this_cpu_ptr(rsp->rda);
-		if (!*rdp->nxttail[RCU_DONE_TAIL])
+		if (rcu_segcblist_pend_cbs(&rdp->cblist))
 			continue;
 		rnp = rdp->mynode;
 		raw_spin_lock_rcu_node(rnp); /* irqs already disabled. */
@@ -1529,7 +1529,7 @@ static void rcu_oom_notify_cpu(void *unused)
 
 	for_each_rcu_flavor(rsp) {
 		rdp = raw_cpu_ptr(rsp->rda);
-		if (rdp->qlen_lazy != 0) {
+		if (rcu_segcblist_n_lazy_cbs(&rdp->cblist)) {
 			atomic_inc(&oom_callback_count);
 			rsp->call(&rdp->oom_head, rcu_oom_callback);
 		}
@@ -1934,30 +1934,26 @@ static bool __maybe_unused rcu_nocb_adopt_orphan_cbs(struct rcu_state *rsp,
 						     struct rcu_data *rdp,
 						     unsigned long flags)
 {
-	long ql = rsp->qlen;
-	long qll = rsp->qlen_lazy;
+	long ql = rcu_cblist_n_cbs(&rsp->orphan_done);
+	long qll = rcu_cblist_n_lazy_cbs(&rsp->orphan_done);
 
 	/* If this is not a no-CBs CPU, tell the caller to do it the old way. */
 	if (!rcu_is_nocb_cpu(smp_processor_id()))
 		return false;
-	rsp->qlen = 0;
-	rsp->qlen_lazy = 0;
 
 	/* First, enqueue the donelist, if any.  This preserves CB ordering. */
-	if (rsp->orphan_donelist != NULL) {
-		__call_rcu_nocb_enqueue(rdp, rsp->orphan_donelist,
-					rsp->orphan_donetail, ql, qll, flags);
-		ql = qll = 0;
-		rsp->orphan_donelist = NULL;
-		rsp->orphan_donetail = &rsp->orphan_donelist;
+	if (!rcu_cblist_empty(&rsp->orphan_done)) {
+		__call_rcu_nocb_enqueue(rdp, rcu_cblist_head(&rsp->orphan_done),
+					rcu_cblist_tail(&rsp->orphan_done),
+					ql, qll, flags);
 	}
-	if (rsp->orphan_nxtlist != NULL) {
-		__call_rcu_nocb_enqueue(rdp, rsp->orphan_nxtlist,
-					rsp->orphan_nxttail, ql, qll, flags);
-		ql = qll = 0;
-		rsp->orphan_nxtlist = NULL;
-		rsp->orphan_nxttail = &rsp->orphan_nxtlist;
+	if (!rcu_cblist_empty(&rsp->orphan_pend)) {
+		__call_rcu_nocb_enqueue(rdp, rcu_cblist_head(&rsp->orphan_pend),
+					rcu_cblist_tail(&rsp->orphan_pend),
+					ql, qll, flags);
 	}
+	rcu_cblist_init(&rsp->orphan_done);
+	rcu_cblist_init(&rsp->orphan_pend);
 	return true;
 }
 
@@ -2399,16 +2395,16 @@ static bool init_nocb_callback_list(struct rcu_data *rdp)
 		return false;
 
 	/* If there are early-boot callbacks, move them to nocb lists. */
-	if (rdp->nxtlist) {
-		rdp->nocb_head = rdp->nxtlist;
-		rdp->nocb_tail = rdp->nxttail[RCU_NEXT_TAIL];
-		atomic_long_set(&rdp->nocb_q_count, rdp->qlen);
-		atomic_long_set(&rdp->nocb_q_count_lazy, rdp->qlen_lazy);
-		rdp->nxtlist = NULL;
-		rdp->qlen = 0;
-		rdp->qlen_lazy = 0;
+	if (!rcu_segcblist_empty(&rdp->cblist)) {
+		rdp->nocb_head = rcu_segcblist_head(&rdp->cblist);
+		rdp->nocb_tail = rcu_segcblist_tail(&rdp->cblist);
+		atomic_long_set(&rdp->nocb_q_count,
+				rcu_segcblist_n_cbs(&rdp->cblist));
+		atomic_long_set(&rdp->nocb_q_count_lazy,
+				rcu_segcblist_n_lazy_cbs(&rdp->cblist));
+		rcu_segcblist_init(&rdp->cblist);
 	}
-	rdp->nxttail[RCU_NEXT_TAIL] = NULL;
+	rcu_segcblist_disable(&rdp->cblist);
 	return true;
 }
 
diff --git a/kernel/rcu/tree_trace.c b/kernel/rcu/tree_trace.c
index 65b43be38e68..066c64071a7b 100644
--- a/kernel/rcu/tree_trace.c
+++ b/kernel/rcu/tree_trace.c
@@ -41,6 +41,7 @@
 #include <linux/mutex.h>
 #include <linux/debugfs.h>
 #include <linux/seq_file.h>
+#include <linux/prefetch.h>
 
 #define RCU_TREE_NONCORE
 #include "tree.h"
@@ -128,17 +129,15 @@ static void print_one_rcu_data(struct seq_file *m, struct rcu_data *rdp)
 		   rdp->dynticks_fqs);
 	seq_printf(m, " of=%lu", rdp->offline_fqs);
 	rcu_nocb_q_lengths(rdp, &ql, &qll);
-	qll += rdp->qlen_lazy;
-	ql += rdp->qlen;
+	qll += rcu_segcblist_n_lazy_cbs(&rdp->cblist);
+	ql += rcu_segcblist_n_cbs(&rdp->cblist);
 	seq_printf(m, " ql=%ld/%ld qs=%c%c%c%c",
 		   qll, ql,
-		   ".N"[rdp->nxttail[RCU_NEXT_READY_TAIL] !=
-			rdp->nxttail[RCU_NEXT_TAIL]],
-		   ".R"[rdp->nxttail[RCU_WAIT_TAIL] !=
-			rdp->nxttail[RCU_NEXT_READY_TAIL]],
-		   ".W"[rdp->nxttail[RCU_DONE_TAIL] !=
-			rdp->nxttail[RCU_WAIT_TAIL]],
-		   ".D"[&rdp->nxtlist != rdp->nxttail[RCU_DONE_TAIL]]);
+		   ".N"[!rcu_segcblist_segempty(&rdp->cblist, RCU_NEXT_TAIL)],
+		   ".R"[!rcu_segcblist_segempty(&rdp->cblist,
+						RCU_NEXT_READY_TAIL)],
+		   ".W"[!rcu_segcblist_segempty(&rdp->cblist, RCU_WAIT_TAIL)],
+		   ".D"[!rcu_segcblist_segempty(&rdp->cblist, RCU_DONE_TAIL)]);
 #ifdef CONFIG_RCU_BOOST
 	seq_printf(m, " kt=%d/%c ktl=%x",
 		   per_cpu(rcu_cpu_has_work, rdp->cpu),
@@ -276,7 +275,9 @@ static void print_one_rcu_state(struct seq_file *m, struct rcu_state *rsp)
 	seq_printf(m, "nfqs=%lu/nfqsng=%lu(%lu) fqlh=%lu oqlen=%ld/%ld\n",
 		   rsp->n_force_qs, rsp->n_force_qs_ngp,
 		   rsp->n_force_qs - rsp->n_force_qs_ngp,
-		   READ_ONCE(rsp->n_force_qs_lh), rsp->qlen_lazy, rsp->qlen);
+		   READ_ONCE(rsp->n_force_qs_lh),
+		   rcu_cblist_n_lazy_cbs(&rsp->orphan_done),
+		   rcu_cblist_n_cbs(&rsp->orphan_done));
 	for (rnp = &rsp->node[0]; rnp - &rsp->node[0] < rcu_num_nodes; rnp++) {
 		if (rnp->level != level) {
 			seq_puts(m, "\n");
-- 
2.5.2

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ