[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <1492018825-25634-35-git-send-email-paulmck@linux.vnet.ibm.com>
Date: Wed, 12 Apr 2017 10:40:20 -0700
From: "Paul E. McKenney" <paulmck@...ux.vnet.ibm.com>
To: linux-kernel@...r.kernel.org
Cc: mingo@...nel.org, jiangshanlai@...il.com, dipankar@...ibm.com,
akpm@...ux-foundation.org, mathieu.desnoyers@...icios.com,
josh@...htriplett.org, tglx@...utronix.de, peterz@...radead.org,
rostedt@...dmis.org, dhowells@...hat.com, edumazet@...gle.com,
fweisbec@...il.com, oleg@...hat.com, bobby.prani@...il.com,
"Paul E. McKenney" <paulmck@...ux.vnet.ibm.com>
Subject: [PATCH tip/core/rcu 35/40] srcu: Merge ->srcu_state into ->srcu_gp_seq
Updating ->srcu_state and ->srcu_gp_seq will lead to extremely complex
race conditions given multiple callback queues, so this commit takes
advantage of the two-bit state now available in rcu_seq counters to
store the state in the bottom two bits of ->srcu_gp_seq.
Signed-off-by: Paul E. McKenney <paulmck@...ux.vnet.ibm.com>
---
include/linux/srcu.h | 5 +----
kernel/rcu/rcu.h | 10 ++++++++++
kernel/rcu/srcu.c | 55 +++++++++++++++++++++++++++++++++-------------------
3 files changed, 46 insertions(+), 24 deletions(-)
diff --git a/include/linux/srcu.h b/include/linux/srcu.h
index ad154a7bc114..e7dbc01b61a1 100644
--- a/include/linux/srcu.h
+++ b/include/linux/srcu.h
@@ -43,8 +43,7 @@ struct srcu_struct {
unsigned long completed;
unsigned long srcu_gp_seq;
struct srcu_array __percpu *per_cpu_ref;
- spinlock_t queue_lock; /* protect ->srcu_cblist, ->srcu_state */
- int srcu_state;
+ spinlock_t queue_lock; /* protect ->srcu_cblist */
struct rcu_segcblist srcu_cblist;
struct delayed_work work;
#ifdef CONFIG_DEBUG_LOCK_ALLOC
@@ -56,7 +55,6 @@ struct srcu_struct {
#define SRCU_STATE_IDLE 0
#define SRCU_STATE_SCAN1 1
#define SRCU_STATE_SCAN2 2
-#define SRCU_STATE_DONE 3
#ifdef CONFIG_DEBUG_LOCK_ALLOC
@@ -85,7 +83,6 @@ void process_srcu(struct work_struct *work);
.completed = -300, \
.per_cpu_ref = &name##_srcu_array, \
.queue_lock = __SPIN_LOCK_UNLOCKED(name.queue_lock), \
- .srcu_state = SRCU_STATE_IDLE, \
.srcu_cblist = RCU_SEGCBLIST_INITIALIZER(name.srcu_cblist),\
.work = __DELAYED_WORK_INITIALIZER(name.work, process_srcu, 0),\
__SRCU_DEP_MAP_INIT(name) \
diff --git a/kernel/rcu/rcu.h b/kernel/rcu/rcu.h
index 87a0ac95b551..73e16ec4054b 100644
--- a/kernel/rcu/rcu.h
+++ b/kernel/rcu/rcu.h
@@ -82,6 +82,16 @@ static inline int rcu_seq_state(unsigned long s)
return s & RCU_SEQ_STATE_MASK;
}
+/*
+ * Set the state portion of the pointed-to sequence number.
+ * The caller is responsible for preventing conflicting updates.
+ */
+static inline void rcu_seq_set_state(unsigned long *sp, int newstate)
+{
+ WARN_ON_ONCE(newstate & ~RCU_SEQ_STATE_MASK);
+ WRITE_ONCE(*sp, (*sp & ~RCU_SEQ_STATE_MASK) + newstate);
+}
+
/* Adjust sequence number for start of update-side operation. */
static inline void rcu_seq_start(unsigned long *sp)
{
diff --git a/kernel/rcu/srcu.c b/kernel/rcu/srcu.c
index 5aeeaecfb673..97aec5d7b316 100644
--- a/kernel/rcu/srcu.c
+++ b/kernel/rcu/srcu.c
@@ -44,7 +44,6 @@ static int init_srcu_struct_fields(struct srcu_struct *sp)
sp->completed = 0;
sp->srcu_gp_seq = 0;
spin_lock_init(&sp->queue_lock);
- sp->srcu_state = SRCU_STATE_IDLE;
rcu_segcblist_init(&sp->srcu_cblist);
INIT_DELAYED_WORK(&sp->work, process_srcu);
sp->per_cpu_ref = alloc_percpu(struct srcu_array);
@@ -180,6 +179,9 @@ static bool srcu_readers_active(struct srcu_struct *sp)
return sum;
}
+#define SRCU_CALLBACK_BATCH 10
+#define SRCU_INTERVAL 1
+
/**
* cleanup_srcu_struct - deconstruct a sleep-RCU structure
* @sp: structure to clean up.
@@ -194,8 +196,10 @@ void cleanup_srcu_struct(struct srcu_struct *sp)
if (WARN_ON(!rcu_segcblist_empty(&sp->srcu_cblist)))
return; /* Leakage unless caller handles error. */
flush_delayed_work(&sp->work);
- if (WARN_ON(READ_ONCE(sp->srcu_state) != SRCU_STATE_IDLE))
+ if (WARN_ON(rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) != SRCU_STATE_IDLE)) {
+ pr_info("cleanup_srcu_struct: Active srcu_struct %lu CBs %c state: %d\n", rcu_segcblist_n_cbs(&sp->srcu_cblist), ".E"[rcu_segcblist_empty(&sp->srcu_cblist)], rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)));
return; /* Caller forgot to stop doing call_srcu()? */
+ }
free_percpu(sp->per_cpu_ref);
sp->per_cpu_ref = NULL;
}
@@ -247,10 +251,13 @@ EXPORT_SYMBOL_GPL(__srcu_read_unlock);
*/
static void srcu_gp_start(struct srcu_struct *sp)
{
+ int state;
+
rcu_segcblist_accelerate(&sp->srcu_cblist,
rcu_seq_snap(&sp->srcu_gp_seq));
- WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN1);
rcu_seq_start(&sp->srcu_gp_seq);
+ state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
+ WARN_ON_ONCE(state != SRCU_STATE_SCAN1);
}
/*
@@ -294,7 +301,6 @@ static void srcu_flip(struct srcu_struct *sp)
static void srcu_gp_end(struct srcu_struct *sp)
{
rcu_seq_end(&sp->srcu_gp_seq);
- WRITE_ONCE(sp->srcu_state, SRCU_STATE_DONE);
spin_lock_irq(&sp->queue_lock);
rcu_segcblist_advance(&sp->srcu_cblist,
@@ -339,7 +345,7 @@ void call_srcu(struct srcu_struct *sp, struct rcu_head *head,
spin_lock_irqsave(&sp->queue_lock, flags);
smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
- if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) {
+ if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
srcu_gp_start(sp);
queue_delayed_work(system_power_efficient_wq, &sp->work, 0);
}
@@ -372,7 +378,7 @@ static void __synchronize_srcu(struct srcu_struct *sp, int trycount)
head->func = wakeme_after_rcu;
spin_lock_irq(&sp->queue_lock);
smp_mb__after_unlock_lock(); /* Caller's prior accesses before GP. */
- if (READ_ONCE(sp->srcu_state) == SRCU_STATE_IDLE) {
+ if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_IDLE) {
/* steal the processing owner */
rcu_segcblist_enqueue(&sp->srcu_cblist, head, false);
srcu_gp_start(sp);
@@ -474,9 +480,6 @@ unsigned long srcu_batches_completed(struct srcu_struct *sp)
}
EXPORT_SYMBOL_GPL(srcu_batches_completed);
-#define SRCU_CALLBACK_BATCH 10
-#define SRCU_INTERVAL 1
-
/*
* Core SRCU state machine. Advance callbacks from ->batch_check0 to
* ->batch_check1 and then to ->batch_done as readers drain.
@@ -485,28 +488,40 @@ static void srcu_advance_batches(struct srcu_struct *sp, int trycount)
{
int idx;
- WARN_ON_ONCE(sp->srcu_state == SRCU_STATE_IDLE);
-
/*
* Because readers might be delayed for an extended period after
* fetching ->completed for their index, at any point in time there
* might well be readers using both idx=0 and idx=1. We therefore
* need to wait for readers to clear from both index values before
* invoking a callback.
+ *
+ * The load-acquire ensures that we see the accesses performed
+ * by the prior grace period.
*/
+ idx = rcu_seq_state(smp_load_acquire(&sp->srcu_gp_seq)); /* ^^^ */
+ if (idx == SRCU_STATE_IDLE) {
+ spin_lock_irq(&sp->queue_lock);
+ if (rcu_segcblist_empty(&sp->srcu_cblist)) {
+ spin_unlock_irq(&sp->queue_lock);
+ return;
+ }
+ idx = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
+ if (idx == SRCU_STATE_IDLE)
+ srcu_gp_start(sp);
+ spin_unlock_irq(&sp->queue_lock);
+ if (idx != SRCU_STATE_IDLE)
+ return; /* Someone else started the grace period. */
+ }
- if (sp->srcu_state == SRCU_STATE_DONE)
- srcu_gp_start(sp);
-
- if (sp->srcu_state == SRCU_STATE_SCAN1) {
+ if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN1) {
idx = 1 ^ (sp->completed & 1);
if (!try_check_zero(sp, idx, trycount))
return; /* readers present, retry after SRCU_INTERVAL */
srcu_flip(sp);
- WRITE_ONCE(sp->srcu_state, SRCU_STATE_SCAN2);
+ rcu_seq_set_state(&sp->srcu_gp_seq, SRCU_STATE_SCAN2);
}
- if (sp->srcu_state == SRCU_STATE_SCAN2) {
+ if (rcu_seq_state(READ_ONCE(sp->srcu_gp_seq)) == SRCU_STATE_SCAN2) {
/*
* SRCU read-side critical sections are normally short,
@@ -557,14 +572,14 @@ static void srcu_invoke_callbacks(struct srcu_struct *sp)
static void srcu_reschedule(struct srcu_struct *sp, unsigned long delay)
{
bool pending = true;
+ int state;
if (rcu_segcblist_empty(&sp->srcu_cblist)) {
spin_lock_irq(&sp->queue_lock);
+ state = rcu_seq_state(READ_ONCE(sp->srcu_gp_seq));
if (rcu_segcblist_empty(&sp->srcu_cblist) &&
- READ_ONCE(sp->srcu_state) == SRCU_STATE_DONE) {
- WRITE_ONCE(sp->srcu_state, SRCU_STATE_IDLE);
+ state == SRCU_STATE_IDLE)
pending = false;
- }
spin_unlock_irq(&sp->queue_lock);
}
--
2.5.2
Powered by blists - more mailing lists