[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <d66a3abd-18cf-02be-cd99-9dda1b3fd85e@kernel.dk>
Date: Tue, 7 Feb 2023 15:50:31 -0700
From: Jens Axboe <axboe@...nel.dk>
To: Stefan Roesch <shr@...kernel.io>, kernel-team@...com
Cc: olivier@...llion01.com, netdev@...r.kernel.org,
io-uring@...r.kernel.org, kuba@...nel.org, ammarfaizi2@...weeb.org
Subject: Re: [PATCH v7 1/3] io_uring: add napi busy polling support
> diff --git a/include/linux/io_uring_types.h b/include/linux/io_uring_types.h
> index 128a67a40065..d9551790356e 100644
> --- a/include/linux/io_uring_types.h
> +++ b/include/linux/io_uring_types.h
> @@ -2,6 +2,7 @@
> #define IO_URING_TYPES_H
>
> #include <linux/blkdev.h>
> +#include <linux/hashtable.h>
> #include <linux/task_work.h>
> #include <linux/bitmap.h>
> #include <linux/llist.h>
> @@ -274,6 +275,15 @@ struct io_ring_ctx {
> struct xarray personalities;
> u32 pers_next;
>
> +#ifdef CONFIG_NET_RX_BUSY_POLL
> + struct list_head napi_list; /* track busy poll napi_id */
> + DECLARE_HASHTABLE(napi_ht, 8);
> + spinlock_t napi_lock; /* napi_list lock */
> +
> + unsigned int napi_busy_poll_to; /* napi busy poll default timeout */
> + bool napi_prefer_busy_poll;
> +#endif
Minor thing, but I wonder if we should put this in a struct and allocate
it if NAPI gets used rather than bloat the whole thing here. This
doubles the size of io_ring_ctx, the hash above is 2k in size!
> diff --git a/io_uring/io_uring.c b/io_uring/io_uring.c
> index db623b3185c8..96062036db41 100644
> --- a/io_uring/io_uring.c
> +++ b/io_uring/io_uring.c
> @@ -90,6 +90,7 @@
> #include "rsrc.h"
> #include "cancel.h"
> #include "net.h"
> +#include "napi.h"
> #include "notif.h"
>
> #include "timeout.h"
> @@ -335,6 +336,14 @@ static __cold struct io_ring_ctx *io_ring_ctx_alloc(struct io_uring_params *p)
> INIT_WQ_LIST(&ctx->locked_free_list);
> INIT_DELAYED_WORK(&ctx->fallback_work, io_fallback_req_func);
> INIT_WQ_LIST(&ctx->submit_state.compl_reqs);
> +
> +#ifdef CONFIG_NET_RX_BUSY_POLL
> + INIT_LIST_HEAD(&ctx->napi_list);
> + spin_lock_init(&ctx->napi_lock);
> + ctx->napi_prefer_busy_poll = false;
> + ctx->napi_busy_poll_to = READ_ONCE(sysctl_net_busy_poll);
> +#endif
I think that should go in a io_napi_init() function, so we can get rid
of these ifdefs in the main code.
> static inline bool io_has_work(struct io_ring_ctx *ctx)
> @@ -2498,6 +2512,196 @@ static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
> return ret < 0 ? ret : 1;
> }
>
> +#ifdef CONFIG_NET_RX_BUSY_POLL
> +#define NAPI_TIMEOUT (60 * SEC_CONVERSION)
> +
> +struct io_napi_ht_entry {
> + unsigned int napi_id;
> + struct list_head list;
> +
> + /* Covered by napi lock spinlock. */
> + unsigned long timeout;
> + struct hlist_node node;
> +};
Not strictly related to just this, but I think it'd be a good idea to
add a napi.c file and put it all in there rather than in the core
io_uring code. It really doesn't belong there.
> +/*
> + * io_napi_add() - Add napi id to the busy poll list
> + * @file: file pointer for socket
> + * @ctx: io-uring context
> + *
> + * Add the napi id of the socket to the napi busy poll list and hash table.
> + */
> +void io_napi_add(struct file *file, struct io_ring_ctx *ctx)
> +{
> + unsigned int napi_id;
> + struct socket *sock;
> + struct sock *sk;
> + struct io_napi_ht_entry *he;
> +
> + if (!io_napi_busy_loop_on(ctx))
> + return;
I think io_napi_add() belongs in napi.h and should look ala:
static inline void io_napi_add(struct io_kiocb *req)
{
struct io_ring_ctx *ctx = req->ctx;
if (!io_napi_busy_loop_on(ctx))
return;
__io_napi_add(ctx, req->file);
}
and put __io_napi_add() in napi.c
> +static void io_napi_free_list(struct io_ring_ctx *ctx)
> +{
> + unsigned int i;
> + struct io_napi_ht_entry *he;
> + LIST_HEAD(napi_list);
> +
> + spin_lock(&ctx->napi_lock);
> + hash_for_each(ctx->napi_ht, i, he, node) {
> + hash_del(&he->node);
> + }
> + spin_unlock(&ctx->napi_lock);
> +}
No need for the braces here for the loop.
> +static void io_napi_blocking_busy_loop(struct list_head *napi_list,
> + struct io_wait_queue *iowq)
> +{
> + unsigned long start_time = list_is_singular(napi_list)
> + ? 0
> + : busy_loop_current_time();
No ternaries please. This is so much easier to read as:
unsigned long start_time = 0;
if (!list_is_singular(napi_list))
start_time = busy_loop_current_time();
> + do {
> + if (list_is_singular(napi_list)) {
> + struct io_napi_ht_entry *ne =
> + list_first_entry(napi_list,
> + struct io_napi_ht_entry, list);
> +
> + napi_busy_loop(ne->napi_id, io_napi_busy_loop_end, iowq,
> + iowq->napi_prefer_busy_poll, BUSY_POLL_BUDGET);
> + break;
> + }
> + } while (io_napi_busy_loop(napi_list, iowq->napi_prefer_busy_poll) &&
> + !io_napi_busy_loop_end(iowq, start_time));
> +}
This is almost impossible to read, please rewrite that in a way so
that it's straight forward to understand what is going on.
> +void io_napi_merge_lists(struct io_ring_ctx *ctx, struct list_head *napi_list)
> +{
> + spin_lock(&ctx->napi_lock);
> + list_splice(napi_list, &ctx->napi_list);
> + io_napi_remove_stale(ctx);
> + spin_unlock(&ctx->napi_lock);
> +}
Question on the locking - the separate lock is obviously functionally
correct, but at least for the arming part, we generally already have the
ctx uring_lock at that point. Did you look into if it's feasible to take
advantage of that? I
> @@ -2510,6 +2714,9 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
> struct io_rings *rings = ctx->rings;
> ktime_t timeout = KTIME_MAX;
> int ret;
> +#ifdef CONFIG_NET_RX_BUSY_POLL
> + LIST_HEAD(local_napi_list);
> +#endif
>
> if (!io_allowed_run_tw(ctx))
> return -EEXIST;
> @@ -2539,12 +2746,34 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
> return ret;
> }
>
> +#ifdef CONFIG_NET_RX_BUSY_POLL
> + iowq.napi_busy_poll_to = 0;
> + iowq.napi_prefer_busy_poll = READ_ONCE(ctx->napi_prefer_busy_poll);
> +
> + if (!(ctx->flags & IORING_SETUP_SQPOLL)) {
> + spin_lock(&ctx->napi_lock);
> + list_splice_init(&ctx->napi_list, &local_napi_list);
> + spin_unlock(&ctx->napi_lock);
> + }
> +#endif
> +
> if (uts) {
> struct timespec64 ts;
>
> if (get_timespec64(&ts, uts))
> return -EFAULT;
> +
> +#ifdef CONFIG_NET_RX_BUSY_POLL
> + if (!list_empty(&local_napi_list)) {
> + io_napi_adjust_busy_loop_timeout(READ_ONCE(ctx->napi_busy_poll_to),
> + &ts, &iowq.napi_busy_poll_to);
> + }
> +#endif
> timeout = ktime_add_ns(timespec64_to_ktime(ts), ktime_get_ns());
> +#ifdef CONFIG_NET_RX_BUSY_POLL
> + } else if (!list_empty(&local_napi_list)) {
> + iowq.napi_busy_poll_to = READ_ONCE(ctx->napi_busy_poll_to);
> +#endif
> }
This is again a lot of ifdefs, please consider ways of getting rid of
them.
> @@ -2555,6 +2784,15 @@ static int io_cqring_wait(struct io_ring_ctx *ctx, int min_events,
> iowq.cq_tail = READ_ONCE(ctx->rings->cq.head) + min_events;
>
> trace_io_uring_cqring_wait(ctx, min_events);
> +
> +#ifdef CONFIG_NET_RX_BUSY_POLL
> + if (iowq.napi_busy_poll_to)
> + io_napi_blocking_busy_loop(&local_napi_list, &iowq);
> +
> + if (!list_empty(&local_napi_list))
> + io_napi_merge_lists(ctx, &local_napi_list);
> +#endif
> +
And here.
> do {
> if (test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)) {
> finish_wait(&ctx->cq_wait, &iowq.wq);
> @@ -2754,6 +2992,9 @@ static __cold void io_ring_ctx_free(struct io_ring_ctx *ctx)
> io_req_caches_free(ctx);
> if (ctx->hash_map)
> io_wq_put_hash(ctx->hash_map);
> +#ifdef CONFIG_NET_RX_BUSY_POLL
> + io_napi_free_list(ctx);
> +#endif
Put an io_napi_free_list() stub in napi.h with the actual thing in
napi.c if CONFIG_NET_RX_BUSY_POLL is defined.
> diff --git a/io_uring/sqpoll.c b/io_uring/sqpoll.c
> index 559652380672..b9fb077de15b 100644
> --- a/io_uring/sqpoll.c
> +++ b/io_uring/sqpoll.c
> @@ -15,6 +15,7 @@
> #include <uapi/linux/io_uring.h>
>
> #include "io_uring.h"
> +#include "napi.h"
> #include "sqpoll.h"
>
> #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
> @@ -168,6 +169,9 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
> {
> unsigned int to_submit;
> int ret = 0;
> +#ifdef CONFIG_NET_RX_BUSY_POLL
> + LIST_HEAD(local_napi_list);
> +#endif
>
> to_submit = io_sqring_entries(ctx);
> /* if we're handling multiple rings, cap submit size for fairness */
> @@ -193,6 +197,19 @@ static int __io_sq_thread(struct io_ring_ctx *ctx, bool cap_entries)
> ret = io_submit_sqes(ctx, to_submit);
> mutex_unlock(&ctx->uring_lock);
>
> +#ifdef CONFIG_NET_RX_BUSY_POLL
> + spin_lock(&ctx->napi_lock);
> + list_splice_init(&ctx->napi_list, &local_napi_list);
> + spin_unlock(&ctx->napi_lock);
> +
> + if (!list_empty(&local_napi_list) &&
> + READ_ONCE(ctx->napi_busy_poll_to) > 0 &&
> + io_napi_busy_loop(&local_napi_list, ctx->napi_prefer_busy_poll)) {
> + io_napi_merge_lists(ctx, &local_napi_list);
> + ++ret;
> + }
> +#endif
> +
> if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
> wake_up(&ctx->sqo_sq_wait);
> if (creds)
Add a helper here too please.
--
Jens Axboe
Powered by blists - more mailing lists