lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Date:	Tue, 21 Jul 2015 21:22:47 +0200
From:	Oleg Nesterov <oleg@...hat.com>
To:	Ingo Molnar <mingo@...nel.org>
Cc:	Peter Zijlstra <peterz@...radead.org>,
	Rik van Riel <riel@...hat.com>, Tejun Heo <tj@...nel.org>,
	linux-kernel@...r.kernel.org
Subject: [PATCH v2 6/6] stop_machine: kill stop_cpus_lock and
	lg_double_lock/unlock()

stop_two_cpus() and stop_cpus() use stop_cpus_lock to avoid the
deadlock, we need to ensure that the stopper functions can't be
queued "backwards" from one another.

Instead, we can change stop_two_cpus() to take 2 stopper->lock's
and queue both works "atomically"; just we need to check that both
->stop_work's on these CPU's are either free or already queued.

Note: this patch preserves the cpu_active() checks, but I think
we need to shift them into migrate_swap_stop(). However, we can't
do this without another cleanup: currently stopper->enabled does
not guarantee that work->fn() will be actually executed if we race
with cpu_down().

Signed-off-by: Oleg Nesterov <oleg@...hat.com>
---
 include/linux/lglock.h  |    5 --
 kernel/locking/lglock.c |   22 ---------
 kernel/stop_machine.c   |  120 ++++++++++++++++++++++++++++-------------------
 3 files changed, 71 insertions(+), 76 deletions(-)

diff --git a/include/linux/lglock.h b/include/linux/lglock.h
index c92ebd1..0081f00 100644
--- a/include/linux/lglock.h
+++ b/include/linux/lglock.h
@@ -52,15 +52,10 @@ struct lglock {
 	static struct lglock name = { .lock = &name ## _lock }
 
 void lg_lock_init(struct lglock *lg, char *name);
-
 void lg_local_lock(struct lglock *lg);
 void lg_local_unlock(struct lglock *lg);
 void lg_local_lock_cpu(struct lglock *lg, int cpu);
 void lg_local_unlock_cpu(struct lglock *lg, int cpu);
-
-void lg_double_lock(struct lglock *lg, int cpu1, int cpu2);
-void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2);
-
 void lg_global_lock(struct lglock *lg);
 void lg_global_unlock(struct lglock *lg);
 
diff --git a/kernel/locking/lglock.c b/kernel/locking/lglock.c
index 951cfcd..86ae2ae 100644
--- a/kernel/locking/lglock.c
+++ b/kernel/locking/lglock.c
@@ -60,28 +60,6 @@ void lg_local_unlock_cpu(struct lglock *lg, int cpu)
 }
 EXPORT_SYMBOL(lg_local_unlock_cpu);
 
-void lg_double_lock(struct lglock *lg, int cpu1, int cpu2)
-{
-	BUG_ON(cpu1 == cpu2);
-
-	/* lock in cpu order, just like lg_global_lock */
-	if (cpu2 < cpu1)
-		swap(cpu1, cpu2);
-
-	preempt_disable();
-	lock_acquire_shared(&lg->lock_dep_map, 0, 0, NULL, _RET_IP_);
-	arch_spin_lock(per_cpu_ptr(lg->lock, cpu1));
-	arch_spin_lock(per_cpu_ptr(lg->lock, cpu2));
-}
-
-void lg_double_unlock(struct lglock *lg, int cpu1, int cpu2)
-{
-	lock_release(&lg->lock_dep_map, 1, _RET_IP_);
-	arch_spin_unlock(per_cpu_ptr(lg->lock, cpu1));
-	arch_spin_unlock(per_cpu_ptr(lg->lock, cpu2));
-	preempt_enable();
-}
-
 void lg_global_lock(struct lglock *lg)
 {
 	int i;
diff --git a/kernel/stop_machine.c b/kernel/stop_machine.c
index 12484e5..20fb291 100644
--- a/kernel/stop_machine.c
+++ b/kernel/stop_machine.c
@@ -20,7 +20,6 @@
 #include <linux/kallsyms.h>
 #include <linux/smpboot.h>
 #include <linux/atomic.h>
-#include <linux/lglock.h>
 
 /*
  * Structure to determine completion condition and record errors.  May
@@ -47,14 +46,6 @@ struct cpu_stopper {
 static DEFINE_PER_CPU(struct cpu_stopper, cpu_stopper);
 static bool stop_machine_initialized = false;
 
-/*
- * Avoids a race between stop_two_cpus and global stop_cpus, where
- * the stoppers could get queued up in reverse order, leading to
- * system deadlock. Using an lglock means stop_two_cpus remains
- * relatively cheap.
- */
-DEFINE_STATIC_LGLOCK(stop_cpus_lock);
-
 static void cpu_stop_init_done(struct cpu_stop_done *done, unsigned int nr_todo)
 {
 	memset(done, 0, sizeof(*done));
@@ -73,21 +64,29 @@ static void cpu_stop_signal_done(struct cpu_stop_done *done, bool executed)
 	}
 }
 
+static inline bool stop_work_pending(struct cpu_stopper *stopper)
+{
+	return !list_empty(&stopper->stop_work.list);
+}
+
+static void __cpu_stop_queue_work(struct cpu_stopper *stopper,
+					struct cpu_stop_work *work)
+{
+	list_add_tail(&work->list, &stopper->works);
+	wake_up_process(stopper->thread);
+}
+
 /* queue @work to @stopper.  if offline, @work is completed immediately */
 static void cpu_stop_queue_work(unsigned int cpu, struct cpu_stop_work *work)
 {
 	struct cpu_stopper *stopper = &per_cpu(cpu_stopper, cpu);
-
 	unsigned long flags;
 
 	spin_lock_irqsave(&stopper->lock, flags);
-
-	if (stopper->enabled) {
-		list_add_tail(&work->list, &stopper->works);
-		wake_up_process(stopper->thread);
-	} else
+	if (stopper->enabled)
+		__cpu_stop_queue_work(stopper, work);
+	else
 		cpu_stop_signal_done(work->done, false);
-
 	spin_unlock_irqrestore(&stopper->lock, flags);
 }
 
@@ -213,6 +212,48 @@ static int multi_cpu_stop(void *data)
 	return err;
 }
 
+static int cpu_stop_queue_two_works(int cpu1, struct cpu_stop_work *work1,
+				    int cpu2, struct cpu_stop_work *work2)
+{
+	struct cpu_stopper *stopper1 = per_cpu_ptr(&cpu_stopper, cpu1);
+	struct cpu_stopper *stopper2 = per_cpu_ptr(&cpu_stopper, cpu2);
+	int err;
+retry:
+	spin_lock_irq(&stopper1->lock);
+	spin_lock_nested(&stopper2->lock, SINGLE_DEPTH_NESTING);
+	/*
+	 * If we observe both CPUs active we know _cpu_down() cannot yet have
+	 * queued its stop_machine works and therefore ours will get executed
+	 * first. Or its not either one of our CPUs that's getting unplugged,
+	 * in which case we don't care.
+	 */
+	err = -ENOENT;
+	if (!cpu_active(cpu1) || !cpu_active(cpu2))
+		goto unlock;
+
+	WARN_ON(!stopper1->enabled || !stopper2->enabled);
+	/*
+	 * Ensure that if we race with stop_cpus() the stoppers won't
+	 * get queued up in reverse order, leading to system deadlock.
+	 */
+	err = -EDEADLK;
+	if (stop_work_pending(stopper1) != stop_work_pending(stopper2))
+		goto unlock;
+
+	err = 0;
+	__cpu_stop_queue_work(stopper1, work1);
+	__cpu_stop_queue_work(stopper2, work2);
+unlock:
+	spin_unlock(&stopper2->lock);
+	spin_unlock_irq(&stopper1->lock);
+
+	if (unlikely(err == -EDEADLK)) {
+		cond_resched();
+		goto retry;
+	}
+	return err;
+}
+
 /**
  * stop_two_cpus - stops two cpus
  * @cpu1: the cpu to stop
@@ -228,48 +269,28 @@ int stop_two_cpus(unsigned int cpu1, unsigned int cpu2, cpu_stop_fn_t fn, void *
 {
 	struct cpu_stop_done done;
 	struct cpu_stop_work work1, work2;
-	struct multi_stop_data msdata;
-
-	preempt_disable();
-	msdata = (struct multi_stop_data){
+	struct multi_stop_data msdata = {
 		.fn = fn,
 		.data = arg,
 		.num_threads = 2,
 		.active_cpus = cpumask_of(cpu1),
 	};
 
-	work1 = work2 = (struct cpu_stop_work){
-		.fn = multi_cpu_stop,
-		.arg = &msdata,
-		.done = &done
-	};
-
-	cpu_stop_init_done(&done, 2);
 	set_state(&msdata, MULTI_STOP_PREPARE);
+	cpu_stop_init_done(&done, 2);
 
-	/*
-	 * If we observe both CPUs active we know _cpu_down() cannot yet have
-	 * queued its stop_machine works and therefore ours will get executed
-	 * first. Or its not either one of our CPUs that's getting unplugged,
-	 * in which case we don't care.
-	 *
-	 * This relies on the stopper workqueues to be FIFO.
-	 */
-	if (!cpu_active(cpu1) || !cpu_active(cpu2)) {
-		preempt_enable();
-		return -ENOENT;
-	}
-
-	lg_double_lock(&stop_cpus_lock, cpu1, cpu2);
-	cpu_stop_queue_work(cpu1, &work1);
-	cpu_stop_queue_work(cpu2, &work2);
-	lg_double_unlock(&stop_cpus_lock, cpu1, cpu2);
+	work1.fn   = work2.fn   = multi_cpu_stop;
+	work1.arg  = work2.arg  = &msdata;
+	work1.done = work2.done = &done;
 
-	preempt_enable();
+	if (cpu1 > cpu2)
+		swap(cpu1, cpu2);
+	if (cpu_stop_queue_two_works(cpu1, &work1, cpu2, &work2))
+		return -ENOENT;
 
 	wait_for_completion(&done.completion);
-
-	return done.executed ? done.ret : -ENOENT;
+	WARN_ON(!done.executed);
+	return done.ret;
 }
 
 /**
@@ -308,7 +329,7 @@ static void queue_stop_cpus_work(const struct cpumask *cpumask,
 	 * preempted by a stopper which might wait for other stoppers
 	 * to enter @fn which can lead to deadlock.
 	 */
-	lg_global_lock(&stop_cpus_lock);
+	preempt_disable();
 	for_each_cpu(cpu, cpumask) {
 		work = &per_cpu(cpu_stopper.stop_work, cpu);
 		work->fn = fn;
@@ -316,7 +337,7 @@ static void queue_stop_cpus_work(const struct cpumask *cpumask,
 		work->done = done;
 		cpu_stop_queue_work(cpu, work);
 	}
-	lg_global_unlock(&stop_cpus_lock);
+	preempt_enable();
 }
 
 static int __stop_cpus(const struct cpumask *cpumask,
@@ -505,6 +526,7 @@ static int __init cpu_stop_init(void)
 
 		spin_lock_init(&stopper->lock);
 		INIT_LIST_HEAD(&stopper->works);
+		INIT_LIST_HEAD(&stopper->stop_work.list);
 	}
 
 	BUG_ON(smpboot_register_percpu_thread(&cpu_stop_threads));
-- 
1.7.1

--
To unsubscribe from this list: send the line "unsubscribe linux-kernel" in
the body of a message to majordomo@...r.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html
Please read the FAQ at  http://www.tux.org/lkml/

Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ