lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <20230327202413.1955856-4-longman@redhat.com>
Date:   Mon, 27 Mar 2023 16:24:08 -0400
From:   Waiman Long <longman@...hat.com>
To:     Peter Zijlstra <peterz@...radead.org>,
        Ingo Molnar <mingo@...hat.com>, Will Deacon <will@...nel.org>,
        Boqun Feng <boqun.feng@...il.com>
Cc:     linux-kernel@...r.kernel.org
Subject: [PATCH v2 3/8] locking/rwsem: Rework writer wakeup

From: Peter Zijlstra <peterz@...radead.org>

Currently readers and writers have distinctly different wait/wake
methods. For readers the ->count adjustment happens on the wakeup
side, while for writers the ->count adjustment happens on the wait
side.

This asymmetry is unfortunate since the wake side has an additional
guarantee -- specifically, the wake side has observed the unlocked
state, and thus it can know that speculative READER_BIAS perbutations
on ->count are just that, they will be undone.

Additionally, unifying the wait/wake methods allows sharing code.

As such, do a straight-forward transform of the writer wakeup into the
wake side.

Signed-off-by: Peter Zijlstra (Intel) <peterz@...radead.org>
---
 kernel/locking/rwsem.c | 259 +++++++++++++++++++----------------------
 1 file changed, 123 insertions(+), 136 deletions(-)

diff --git a/kernel/locking/rwsem.c b/kernel/locking/rwsem.c
index 4b9e492abd59..0cc0aa566a6b 100644
--- a/kernel/locking/rwsem.c
+++ b/kernel/locking/rwsem.c
@@ -394,6 +394,108 @@ rwsem_del_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter)
 	return false;
 }
 
+static inline void
+rwsem_waiter_wake(struct rwsem_waiter *waiter, struct wake_q_head *wake_q)
+{
+	struct task_struct *tsk;
+
+	tsk = waiter->task;
+	get_task_struct(tsk);
+
+	/*
+	 * Ensure calling get_task_struct() before setting the reader
+	 * waiter to nil such that rwsem_down_read_slowpath() cannot
+	 * race with do_exit() by always holding a reference count
+	 * to the task to wakeup.
+	 */
+	smp_store_release(&waiter->task, NULL);
+	/*
+	 * Ensure issuing the wakeup (either by us or someone else)
+	 * after setting the reader waiter to nil.
+	 */
+	wake_q_add_safe(wake_q, tsk);
+}
+
+/*
+ * This function must be called with the sem->wait_lock held to prevent
+ * race conditions between checking the rwsem wait list and setting the
+ * sem->count accordingly.
+ *
+ * Implies rwsem_del_waiter() on success.
+ */
+static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
+					struct rwsem_waiter *waiter)
+{
+
+	struct rwsem_waiter *first = rwsem_first_waiter(sem);
+	long count, new;
+
+	lockdep_assert_held(&sem->wait_lock);
+
+	count = atomic_long_read(&sem->count);
+	do {
+		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
+
+		if (has_handoff) {
+			/*
+			 * Honor handoff bit and yield only when the first
+			 * waiter is the one that set it. Otherwisee, we
+			 * still try to acquire the rwsem.
+			 */
+			if (first->handoff_set && (waiter != first))
+				return false;
+		}
+
+		new = count;
+
+		if (count & RWSEM_LOCK_MASK) {
+			/*
+			 * A waiter (first or not) can set the handoff bit
+			 * if it is an RT task or wait in the wait queue
+			 * for too long.
+			 */
+			if (has_handoff || (!rt_task(waiter->task) &&
+					    !time_after(jiffies, waiter->timeout)))
+				return false;
+
+			new |= RWSEM_FLAG_HANDOFF;
+		} else {
+			new |= RWSEM_WRITER_LOCKED;
+			new &= ~RWSEM_FLAG_HANDOFF;
+
+			if (list_is_singular(&sem->wait_list))
+				new &= ~RWSEM_FLAG_WAITERS;
+		}
+	} while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
+
+	/*
+	 * We have either acquired the lock with handoff bit cleared or set
+	 * the handoff bit. Only the first waiter can have its handoff_set
+	 * set here to enable optimistic spinning in slowpath loop.
+	 */
+	if (new & RWSEM_FLAG_HANDOFF) {
+		first->handoff_set = true;
+		lockevent_inc(rwsem_wlock_handoff);
+		return false;
+	}
+
+	/*
+	 * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
+	 * success.
+	 */
+	list_del(&waiter->list);
+	atomic_long_set(&sem->owner, (long)waiter->task);
+	return true;
+}
+
+static void rwsem_writer_wake(struct rw_semaphore *sem,
+			      struct rwsem_waiter *waiter,
+			      struct wake_q_head *wake_q)
+{
+	if (rwsem_try_write_lock(sem, waiter))
+		rwsem_waiter_wake(waiter, wake_q);
+}
+
 /*
  * handle the lock release when processes blocked on it that can now run
  * - if we come here from up_xxxx(), then the RWSEM_FLAG_WAITERS bit must
@@ -424,23 +526,12 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
 	 */
 	waiter = rwsem_first_waiter(sem);
 
-	if (waiter->type != RWSEM_WAITING_FOR_WRITE)
-		goto wake_readers;
-
-	if (wake_type == RWSEM_WAKE_ANY) {
-		/*
-		 * Mark writer at the front of the queue for wakeup.
-		 * Until the task is actually later awoken later by
-		 * the caller, other writers are able to steal it.
-		 * Readers, on the other hand, will block as they
-		 * will notice the queued writer.
-		 */
-		wake_q_add(wake_q, waiter->task);
-		lockevent_inc(rwsem_wake_writer);
+	if (waiter->type == RWSEM_WAITING_FOR_WRITE) {
+		if (wake_type == RWSEM_WAKE_ANY)
+			rwsem_writer_wake(sem, waiter, wake_q);
+		return;
 	}
-	return;
 
-wake_readers:
 	/*
 	 * No reader wakeup if there are too many of them already.
 	 */
@@ -552,25 +643,8 @@ static void rwsem_mark_wake(struct rw_semaphore *sem,
 		atomic_long_add(adjustment, &sem->count);
 
 	/* 2nd pass */
-	list_for_each_entry_safe(waiter, tmp, &wlist, list) {
-		struct task_struct *tsk;
-
-		tsk = waiter->task;
-		get_task_struct(tsk);
-
-		/*
-		 * Ensure calling get_task_struct() before setting the reader
-		 * waiter to nil such that rwsem_down_read_slowpath() cannot
-		 * race with do_exit() by always holding a reference count
-		 * to the task to wakeup.
-		 */
-		smp_store_release(&waiter->task, NULL);
-		/*
-		 * Ensure issuing the wakeup (either by us or someone else)
-		 * after setting the reader waiter to nil.
-		 */
-		wake_q_add_safe(wake_q, tsk);
-	}
+	list_for_each_entry_safe(waiter, tmp, &wlist, list)
+		rwsem_waiter_wake(waiter, wake_q);
 }
 
 /*
@@ -600,77 +674,6 @@ rwsem_del_wake_waiter(struct rw_semaphore *sem, struct rwsem_waiter *waiter,
 		wake_up_q(wake_q);
 }
 
-/*
- * This function must be called with the sem->wait_lock held to prevent
- * race conditions between checking the rwsem wait list and setting the
- * sem->count accordingly.
- *
- * Implies rwsem_del_waiter() on success.
- */
-static inline bool rwsem_try_write_lock(struct rw_semaphore *sem,
-					struct rwsem_waiter *waiter)
-{
-	struct rwsem_waiter *first = rwsem_first_waiter(sem);
-	long count, new;
-
-	lockdep_assert_held(&sem->wait_lock);
-
-	count = atomic_long_read(&sem->count);
-	do {
-		bool has_handoff = !!(count & RWSEM_FLAG_HANDOFF);
-
-		if (has_handoff) {
-			/*
-			 * Honor handoff bit and yield only when the first
-			 * waiter is the one that set it. Otherwisee, we
-			 * still try to acquire the rwsem.
-			 */
-			if (first->handoff_set && (waiter != first))
-				return false;
-		}
-
-		new = count;
-
-		if (count & RWSEM_LOCK_MASK) {
-			/*
-			 * A waiter (first or not) can set the handoff bit
-			 * if it is an RT task or wait in the wait queue
-			 * for too long.
-			 */
-			if (has_handoff || (!rt_task(waiter->task) &&
-					    !time_after(jiffies, waiter->timeout)))
-				return false;
-
-			new |= RWSEM_FLAG_HANDOFF;
-		} else {
-			new |= RWSEM_WRITER_LOCKED;
-			new &= ~RWSEM_FLAG_HANDOFF;
-
-			if (list_is_singular(&sem->wait_list))
-				new &= ~RWSEM_FLAG_WAITERS;
-		}
-	} while (!atomic_long_try_cmpxchg_acquire(&sem->count, &count, new));
-
-	/*
-	 * We have either acquired the lock with handoff bit cleared or set
-	 * the handoff bit. Only the first waiter can have its handoff_set
-	 * set here to enable optimistic spinning in slowpath loop.
-	 */
-	if (new & RWSEM_FLAG_HANDOFF) {
-		first->handoff_set = true;
-		lockevent_inc(rwsem_wlock_handoff);
-		return false;
-	}
-
-	/*
-	 * Have rwsem_try_write_lock() fully imply rwsem_del_waiter() on
-	 * success.
-	 */
-	list_del(&waiter->list);
-	rwsem_set_owner(sem);
-	return true;
-}
-
 /*
  * The rwsem_spin_on_owner() function returns the following 4 values
  * depending on the lock owner state.
@@ -1081,7 +1084,7 @@ rwsem_down_read_slowpath(struct rw_semaphore *sem, long count, unsigned int stat
 	for (;;) {
 		set_current_state(state);
 		if (!smp_load_acquire(&waiter.task)) {
-			/* Matches rwsem_mark_wake()'s smp_store_release(). */
+			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
 			break;
 		}
 		if (signal_pending_state(state, current)) {
@@ -1151,55 +1154,39 @@ rwsem_down_write_slowpath(struct rw_semaphore *sem, int state)
 		}
 	} else {
 		atomic_long_or(RWSEM_FLAG_WAITERS, &sem->count);
+		if (rwsem_try_write_lock(sem, &waiter))
+			waiter.task = NULL;
 	}
+	raw_spin_unlock_irq(&sem->wait_lock);
 
 	/* wait until we successfully acquire the lock */
-	set_current_state(state);
 	trace_contention_begin(sem, LCB_F_WRITE);
 
 	for (;;) {
-		if (rwsem_try_write_lock(sem, &waiter)) {
-			/* rwsem_try_write_lock() implies ACQUIRE on success */
+		set_current_state(state);
+		if (!smp_load_acquire(&waiter.task)) {
+			/* Matches rwsem_waiter_wake()'s smp_store_release(). */
 			break;
 		}
-
-		raw_spin_unlock_irq(&sem->wait_lock);
-
-		if (signal_pending_state(state, current))
-			goto out_nolock;
-
-		/*
-		 * After setting the handoff bit and failing to acquire
-		 * the lock, attempt to spin on owner to accelerate lock
-		 * transfer. If the previous owner is a on-cpu writer and it
-		 * has just released the lock, OWNER_NULL will be returned.
-		 * In this case, we attempt to acquire the lock again
-		 * without sleeping.
-		 */
-		if (waiter.handoff_set) {
-			enum owner_state owner_state;
-
-			owner_state = rwsem_spin_on_owner(sem);
-			if (owner_state == OWNER_NULL)
-				goto trylock_again;
+		if (signal_pending_state(state, current)) {
+			raw_spin_lock_irq(&sem->wait_lock);
+			if (waiter.task)
+				goto out_nolock;
+			raw_spin_unlock_irq(&sem->wait_lock);
+			/* Ordered by sem->wait_lock against rwsem_mark_wake(). */
+			break;
 		}
-
 		schedule_preempt_disabled();
 		lockevent_inc(rwsem_sleep_writer);
-		set_current_state(state);
-trylock_again:
-		raw_spin_lock_irq(&sem->wait_lock);
 	}
 	__set_current_state(TASK_RUNNING);
-	raw_spin_unlock_irq(&sem->wait_lock);
 	lockevent_inc(rwsem_wlock);
 	trace_contention_end(sem, 0);
 	return sem;
 
 out_nolock:
-	__set_current_state(TASK_RUNNING);
-	raw_spin_lock_irq(&sem->wait_lock);
 	rwsem_del_wake_waiter(sem, &waiter, &wake_q);
+	__set_current_state(TASK_RUNNING);
 	lockevent_inc(rwsem_wlock_fail);
 	trace_contention_end(sem, -EINTR);
 	return ERR_PTR(-EINTR);
-- 
2.31.1

Powered by blists - more mailing lists