[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <ed37058b-ee96-7d44-1dc7-d2c48e2ac23f@kernel.dk>
Date: Tue, 24 Sep 2019 10:36:28 +0200
From: Jens Axboe <axboe@...nel.dk>
To: Pavel Begunkov <asml.silence@...il.com>,
Ingo Molnar <mingo@...nel.org>
Cc: Ingo Molnar <mingo@...hat.com>,
Peter Zijlstra <peterz@...radead.org>,
linux-block@...r.kernel.org, linux-kernel@...r.kernel.org
Subject: Re: [PATCH v2 0/2] Optimise io_uring completion waiting
On 9/24/19 2:27 AM, Jens Axboe wrote:
> On 9/24/19 2:02 AM, Jens Axboe wrote:
>> On 9/24/19 1:06 AM, Pavel Begunkov wrote:
>>> On 24/09/2019 02:00, Jens Axboe wrote:
>>>>> I think we can do the same thing, just wrapping the waitqueue in a
>>>>> structure with a count in it, on the stack. Got some flight time
>>>>> coming up later today, let me try and cook up a patch.
>>>>
>>>> Totally untested, and sent out 5 min before departure... But something
>>>> like this.
>>> Hmm, reminds me my first version. Basically that's the same thing but
>>> with macroses inlined. I wanted to make it reusable and self-contained,
>>> though.
>>>
>>> If you don't think it could be useful in other places, sure, we could do
>>> something like that. Is that so?
>>
>> I totally agree it could be useful in other places. Maybe formalized and
>> used with wake_up_nr() instead of adding a new primitive? Haven't looked
>> into that, I may be talking nonsense.
>>
>> In any case, I did get a chance to test it and it works for me. Here's
>> the "finished" version, slightly cleaned up and with a comment added
>> for good measure.
>
> Notes:
>
> This version gets the ordering right, you need exclusive waits to get
> fifo ordering on the waitqueue.
>
> Both versions (yours and mine) suffer from the problem of potentially
> waking too many. I don't think this is a real issue, as generally we
> don't do threaded access to the io_urings. But if you had the following
> tasks wait on the cqring:
>
> [min_events = 32], [min_events = 8], [min_events = 8]
>
> and we reach the io_cqring_events() == threshold, we'll wake all three.
> I don't see a good solution to this, so I suspect we just live with
> until proven an issue. Both versions are much better than what we have
> now.
Forgot an issue around signal handling, version below adds the
right check for that too.
Curious what your test case was for this?
diff --git a/fs/io_uring.c b/fs/io_uring.c
index ca7570aca430..3fbab5692f14 100644
--- a/fs/io_uring.c
+++ b/fs/io_uring.c
@@ -2768,6 +2768,42 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
return submit;
}
+struct io_wait_queue {
+ struct wait_queue_entry wq;
+ struct io_ring_ctx *ctx;
+ struct task_struct *task;
+ unsigned to_wait;
+ unsigned nr_timeouts;
+};
+
+static inline bool io_should_wake(struct io_wait_queue *iowq)
+{
+ struct io_ring_ctx *ctx = iowq->ctx;
+
+ /*
+ * Wake up if we have enough events, or if a timeout occured since we
+ * started waiting. For timeouts, we always want to return to userspace,
+ * regardless of event count.
+ */
+ return io_cqring_events(ctx->rings) >= iowq->to_wait ||
+ atomic_read(&ctx->cq_timeouts) != iowq->nr_timeouts;
+}
+
+static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
+ int wake_flags, void *key)
+{
+ struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue,
+ wq);
+
+ if (io_should_wake(iowq)) {
+ list_del_init(&curr->entry);
+ wake_up_process(iowq->task);
+ return 1;
+ }
+
+ return -1;
+}
+
/*
* Wait until events become available, if we don't already have some. The
* application must reap them itself, as they reside on the shared cq ring.
@@ -2775,8 +2811,16 @@ static int io_ring_submit(struct io_ring_ctx *ctx, unsigned int to_submit,
static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
const sigset_t __user *sig, size_t sigsz)
{
+ struct io_wait_queue iowq = {
+ .wq = {
+ .func = io_wake_function,
+ .entry = LIST_HEAD_INIT(iowq.wq.entry),
+ },
+ .task = current,
+ .ctx = ctx,
+ .to_wait = min_events,
+ };
struct io_rings *rings = ctx->rings;
- unsigned nr_timeouts;
int ret;
if (io_cqring_events(rings) >= min_events)
@@ -2795,15 +2839,18 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
return ret;
}
- nr_timeouts = atomic_read(&ctx->cq_timeouts);
- /*
- * Return if we have enough events, or if a timeout occured since
- * we started waiting. For timeouts, we always want to return to
- * userspace.
- */
- ret = wait_event_interruptible(ctx->wait,
- io_cqring_events(rings) >= min_events ||
- atomic_read(&ctx->cq_timeouts) != nr_timeouts);
+ iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
+ prepare_to_wait_exclusive(&ctx->wait, &iowq.wq, TASK_INTERRUPTIBLE);
+ do {
+ if (io_should_wake(&iowq))
+ break;
+ schedule();
+ if (signal_pending(current))
+ break;
+ set_current_state(TASK_INTERRUPTIBLE);
+ } while (1);
+ finish_wait(&ctx->wait, &iowq.wq);
+
restore_saved_sigmask_unless(ret == -ERESTARTSYS);
if (ret == -ERESTARTSYS)
ret = -EINTR;
--
Jens Axboe
Powered by blists - more mailing lists