[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20251004020049.918665-4-kent.overstreet@linux.dev>
Date: Fri, 3 Oct 2025 22:00:45 -0400
From: Kent Overstreet <kent.overstreet@...ux.dev>
To: linux-kernel@...r.kernel.org
Cc: akpm@...ux-foundation.org,
Kent Overstreet <kent.overstreet@...ux.dev>
Subject: [PATCH 3/7] closures: CLOSURE_SLEEPING
Part two of fixing __closure_sync_timeout().
Add a bit in cl->remaining (protected by cmpxchg()) for indicating
whether a closure has a sleeper that should be woken up after the final
put; we can now read the pointer to the waiting task_struct before the
final put
This elimating accesses to another thread's closure after the final put,
which are racy because it may be unwinding after the timeout elapses.
Signed-off-by: Kent Overstreet <kent.overstreet@...ux.dev>
---
include/linux/closure.h | 9 +--
lib/closure.c | 118 +++++++++++++++++++---------------------
2 files changed, 62 insertions(+), 65 deletions(-)
diff --git a/include/linux/closure.h b/include/linux/closure.h
index f626044d6ca2..d0195570514a 100644
--- a/include/linux/closure.h
+++ b/include/linux/closure.h
@@ -128,14 +128,15 @@ enum closure_state {
* annotate where references are being transferred.
*/
- CLOSURE_BITS_START = (1U << 26),
- CLOSURE_DESTRUCTOR = (1U << 26),
+ CLOSURE_BITS_START = (1U << 24),
+ CLOSURE_DESTRUCTOR = (1U << 24),
+ CLOSURE_SLEEPING = (1U << 26),
CLOSURE_WAITING = (1U << 28),
CLOSURE_RUNNING = (1U << 30),
};
#define CLOSURE_GUARD_MASK \
- (((CLOSURE_DESTRUCTOR|CLOSURE_WAITING|CLOSURE_RUNNING) << 1)|(CLOSURE_BITS_START >> 1))
+ (((CLOSURE_DESTRUCTOR|CLOSURE_SLEEPING|CLOSURE_WAITING|CLOSURE_RUNNING) << 1)|(CLOSURE_BITS_START >> 1))
#define CLOSURE_REMAINING_MASK (CLOSURE_BITS_START - 1)
#define CLOSURE_REMAINING_INITIALIZER (1|CLOSURE_RUNNING)
@@ -144,7 +145,7 @@ struct closure {
union {
struct {
struct workqueue_struct *wq;
- struct closure_syncer *s;
+ struct task_struct *sleeper;
struct llist_node list;
closure_fn *fn;
};
diff --git a/lib/closure.c b/lib/closure.c
index 21fadd12093c..c49d49916788 100644
--- a/lib/closure.c
+++ b/lib/closure.c
@@ -23,7 +23,7 @@ static void closure_val_checks(struct closure *cl, unsigned new, int d)
new, (unsigned) __fls(new & CLOSURE_GUARD_MASK), d))
new &= ~CLOSURE_GUARD_MASK;
- WARN(!count && (new & ~CLOSURE_DESTRUCTOR),
+ WARN(!count && (new & ~(CLOSURE_DESTRUCTOR|CLOSURE_SLEEPING)),
"closure %ps ref hit 0 with incorrect flags set: %x (%u)",
cl->fn,
new, (unsigned) __fls(new));
@@ -39,19 +39,27 @@ enum new_closure_state {
void closure_sub(struct closure *cl, int v)
{
enum new_closure_state s;
+ struct task_struct *sleeper;
- int old = atomic_read(&cl->remaining), new;
+ /* rcu_read_lock, atomic_read_acquire() are both for cl->sleeper: */
+ guard(rcu)();
+
+ int old = atomic_read_acquire(&cl->remaining), new;
do {
new = old - v;
if (new & CLOSURE_REMAINING_MASK) {
s = CLOSURE_normal_put;
} else {
- if (cl->fn && !(new & CLOSURE_DESTRUCTOR)) {
+ if ((cl->fn || (new & CLOSURE_SLEEPING)) &&
+ !(new & CLOSURE_DESTRUCTOR)) {
s = CLOSURE_requeue;
new += CLOSURE_REMAINING_INITIALIZER;
} else
s = CLOSURE_done;
+
+ sleeper = new & CLOSURE_SLEEPING ? cl->sleeper : NULL;
+ new &= ~CLOSURE_SLEEPING;
}
closure_val_checks(cl, new, -v);
@@ -60,6 +68,12 @@ void closure_sub(struct closure *cl, int v)
if (s == CLOSURE_normal_put)
return;
+ if (sleeper) {
+ smp_mb();
+ wake_up_process(sleeper);
+ return;
+ }
+
if (s == CLOSURE_requeue) {
cl->closure_get_happened = false;
closure_queue(cl);
@@ -114,41 +128,25 @@ bool closure_wait(struct closure_waitlist *waitlist, struct closure *cl)
cl->closure_get_happened = true;
closure_set_waiting(cl, _RET_IP_);
- atomic_add(CLOSURE_WAITING + 1, &cl->remaining);
+ unsigned r = atomic_add_return(CLOSURE_WAITING + 1, &cl->remaining);
+ closure_val_checks(cl, r, CLOSURE_WAITING + 1);
+
llist_add(&cl->list, &waitlist->list);
return true;
}
EXPORT_SYMBOL(closure_wait);
-struct closure_syncer {
- struct task_struct *task;
- int done;
-};
-
-static CLOSURE_CALLBACK(closure_sync_fn)
-{
- struct closure *cl = container_of(ws, struct closure, work);
- struct closure_syncer *s = cl->s;
- struct task_struct *p;
-
- rcu_read_lock();
- p = READ_ONCE(s->task);
- s->done = 1;
- wake_up_process(p);
- rcu_read_unlock();
-}
-
void __sched __closure_sync(struct closure *cl)
{
- struct closure_syncer s = { .task = current };
-
- cl->s = &s;
- continue_at(cl, closure_sync_fn, NULL);
+ cl->sleeper = current;
+ closure_sub(cl,
+ CLOSURE_REMAINING_INITIALIZER -
+ CLOSURE_SLEEPING);
while (1) {
set_current_state(TASK_UNINTERRUPTIBLE);
- if (s.done)
+ if (!(atomic_read(&cl->remaining) & CLOSURE_SLEEPING))
break;
schedule();
}
@@ -162,31 +160,25 @@ EXPORT_SYMBOL(__closure_sync);
* for outstanding get()s to finish) and returning once closure refcount is 0.
*
* Unlike closure_sync() this doesn't reinit the ref to 1; subsequent
- * closure_get_not_zero() calls waill fail.
+ * closure_get_not_zero() calls will fail.
*/
void __sched closure_return_sync(struct closure *cl)
{
- struct closure_syncer s = { .task = current };
-
- cl->s = &s;
- set_closure_fn(cl, closure_sync_fn, NULL);
-
- unsigned flags = atomic_sub_return_release(1 + CLOSURE_RUNNING - CLOSURE_DESTRUCTOR,
- &cl->remaining);
+ cl->sleeper = current;
+ closure_sub(cl,
+ CLOSURE_REMAINING_INITIALIZER -
+ CLOSURE_DESTRUCTOR -
+ CLOSURE_SLEEPING);
- closure_val_checks(cl, flags, 1 + CLOSURE_RUNNING - CLOSURE_DESTRUCTOR);
-
- if (unlikely(flags & CLOSURE_REMAINING_MASK)) {
- while (1) {
- set_current_state(TASK_UNINTERRUPTIBLE);
- if (s.done)
- break;
- schedule();
- }
-
- __set_current_state(TASK_RUNNING);
+ while (1) {
+ set_current_state(TASK_UNINTERRUPTIBLE);
+ if (!(atomic_read(&cl->remaining) & CLOSURE_SLEEPING))
+ break;
+ schedule();
}
+ __set_current_state(TASK_RUNNING);
+
if (cl->parent)
closure_put(cl->parent);
}
@@ -194,31 +186,35 @@ EXPORT_SYMBOL(closure_return_sync);
int __sched __closure_sync_timeout(struct closure *cl, unsigned long timeout)
{
- struct closure_syncer s = { .task = current };
int ret = 0;
- cl->s = &s;
- continue_at(cl, closure_sync_fn, NULL);
+ cl->sleeper = current;
+ closure_sub(cl,
+ CLOSURE_REMAINING_INITIALIZER -
+ CLOSURE_SLEEPING);
while (1) {
set_current_state(TASK_UNINTERRUPTIBLE);
- if (s.done)
- break;
+ /*
+ * Carefully undo the continue_at() - but only if it
+ * hasn't completed, i.e. the final closure_put() hasn't
+ * happened yet:
+ */
+ unsigned old = atomic_read(&cl->remaining), new;
+ if (!(old & CLOSURE_SLEEPING))
+ goto success;
+
if (!timeout) {
- /*
- * Carefully undo the continue_at() - but only if it
- * hasn't completed, i.e. the final closure_put() hasn't
- * happened yet:
- */
- unsigned old, new, v = atomic_read(&cl->remaining);
do {
- old = v;
- if (!old || (old & CLOSURE_RUNNING))
+ if (!(old & CLOSURE_SLEEPING))
goto success;
- new = old + CLOSURE_REMAINING_INITIALIZER;
- } while ((v = atomic_cmpxchg(&cl->remaining, old, new)) != old);
+ new = old + CLOSURE_REMAINING_INITIALIZER - CLOSURE_SLEEPING;
+ closure_val_checks(cl, new, CLOSURE_REMAINING_INITIALIZER - CLOSURE_SLEEPING);
+ } while (!atomic_try_cmpxchg(&cl->remaining, &old, new));
+
ret = -ETIME;
+ break;
}
timeout = schedule_timeout(timeout);
--
2.51.0
Powered by blists - more mailing lists