[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <aS0FtmO5LhltDG5-@fedora>
Date: Mon, 1 Dec 2025 11:04:22 +0800
From: Ming Lei <ming.lei@...hat.com>
To: Caleb Sander Mateos <csander@...estorage.com>
Cc: Jens Axboe <axboe@...nel.dk>, linux-block@...r.kernel.org,
Uday Shankar <ushankar@...estorage.com>,
Stefani Seibold <stefani@...bold.net>,
Andrew Morton <akpm@...ux-foundation.org>,
linux-kernel@...r.kernel.org
Subject: Re: [PATCH V4 12/27] ublk: add io events fifo structure
On Sun, Nov 30, 2025 at 08:53:03AM -0800, Caleb Sander Mateos wrote:
> On Thu, Nov 20, 2025 at 5:59 PM Ming Lei <ming.lei@...hat.com> wrote:
> >
> > Add ublk io events fifo structure and prepare for supporting command
> > batch, which will use io_uring multishot uring_cmd for fetching one
> > batch of io commands each time.
> >
> > One nice feature of kfifo is to allow multiple producer vs single
> > consumer. We just need lock the producer side, meantime the single
> > consumer can be lockless.
> >
> > The producer is actually from ublk_queue_rq() or ublk_queue_rqs(), so
> > lock contention can be eased by setting proper blk-mq nr_queues.
> >
> > Signed-off-by: Ming Lei <ming.lei@...hat.com>
> > ---
> > drivers/block/ublk_drv.c | 65 ++++++++++++++++++++++++++++++++++++----
> > 1 file changed, 60 insertions(+), 5 deletions(-)
> >
> > diff --git a/drivers/block/ublk_drv.c b/drivers/block/ublk_drv.c
> > index ea992366af5b..6ff284243630 100644
> > --- a/drivers/block/ublk_drv.c
> > +++ b/drivers/block/ublk_drv.c
> > @@ -44,6 +44,7 @@
> > #include <linux/task_work.h>
> > #include <linux/namei.h>
> > #include <linux/kref.h>
> > +#include <linux/kfifo.h>
> > #include <uapi/linux/ublk_cmd.h>
> >
> > #define UBLK_MINORS (1U << MINORBITS)
> > @@ -217,6 +218,22 @@ struct ublk_queue {
> > bool fail_io; /* copy of dev->state == UBLK_S_DEV_FAIL_IO */
> > spinlock_t cancel_lock;
> > struct ublk_device *dev;
> > +
> > + /*
> > + * Inflight ublk request tag is saved in this fifo
> > + *
> > + * There are multiple writer from ublk_queue_rq() or ublk_queue_rqs(),
> > + * so lock is required for storing request tag to fifo
> > + *
> > + * Make sure just one reader for fetching request from task work
> > + * function to ublk server, so no need to grab the lock in reader
> > + * side.
>
> Can you clarify that this is only used for batch mode?
Yes.
>
> > + */
> > + struct {
> > + DECLARE_KFIFO_PTR(evts_fifo, unsigned short);
> > + spinlock_t evts_lock;
> > + }____cacheline_aligned_in_smp;
> > +
> > struct ublk_io ios[] __counted_by(q_depth);
> > };
> >
> > @@ -282,6 +299,32 @@ static inline void ublk_io_unlock(struct ublk_io *io)
> > spin_unlock(&io->lock);
> > }
> >
> > +/* Initialize the queue */
>
> "queue" -> "events queue"? Otherwise, it sounds like it's referring to
> struct ublk_queue.
OK.
>
> > +static inline int ublk_io_evts_init(struct ublk_queue *q, unsigned int size,
> > + int numa_node)
> > +{
> > + spin_lock_init(&q->evts_lock);
> > + return kfifo_alloc_node(&q->evts_fifo, size, GFP_KERNEL, numa_node);
> > +}
> > +
> > +/* Check if queue is empty */
> > +static inline bool ublk_io_evts_empty(const struct ublk_queue *q)
> > +{
> > + return kfifo_is_empty(&q->evts_fifo);
> > +}
> > +
> > +/* Check if queue is full */
> > +static inline bool ublk_io_evts_full(const struct ublk_queue *q)
>
> Function is unused?
Yes, will remove it.
>
> > +{
> > + return kfifo_is_full(&q->evts_fifo);
> > +}
> > +
> > +static inline void ublk_io_evts_deinit(struct ublk_queue *q)
> > +{
> > + WARN_ON_ONCE(!kfifo_is_empty(&q->evts_fifo));
> > + kfifo_free(&q->evts_fifo);
> > +}
> > +
> > static inline struct ublksrv_io_desc *
> > ublk_get_iod(const struct ublk_queue *ubq, unsigned tag)
> > {
> > @@ -3038,6 +3081,9 @@ static void ublk_deinit_queue(struct ublk_device *ub, int q_id)
> > if (ubq->io_cmd_buf)
> > free_pages((unsigned long)ubq->io_cmd_buf, get_order(size));
> >
> > + if (ublk_dev_support_batch_io(ub))
> > + ublk_io_evts_deinit(ubq);
> > +
> > kvfree(ubq);
> > ub->queues[q_id] = NULL;
> > }
> > @@ -3062,7 +3108,7 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
> > struct ublk_queue *ubq;
> > struct page *page;
> > int numa_node;
> > - int size, i;
> > + int size, i, ret = -ENOMEM;
> >
> > /* Determine NUMA node based on queue's CPU affinity */
> > numa_node = ublk_get_queue_numa_node(ub, q_id);
> > @@ -3081,18 +3127,27 @@ static int ublk_init_queue(struct ublk_device *ub, int q_id)
> >
> > /* Allocate I/O command buffer on local NUMA node */
> > page = alloc_pages_node(numa_node, gfp_flags, get_order(size));
> > - if (!page) {
> > - kvfree(ubq);
> > - return -ENOMEM;
> > - }
> > + if (!page)
> > + goto fail_nomem;
> > ubq->io_cmd_buf = page_address(page);
> >
> > for (i = 0; i < ubq->q_depth; i++)
> > spin_lock_init(&ubq->ios[i].lock);
> >
> > + if (ublk_dev_support_batch_io(ub)) {
> > + ret = ublk_io_evts_init(ubq, ubq->q_depth, numa_node);
> > + if (ret)
> > + goto fail;
> > + }
> > ub->queues[q_id] = ubq;
> > ubq->dev = ub;
> > +
> > return 0;
> > +fail:
> > + ublk_deinit_queue(ub, q_id);
>
> This is a no-op since ub->queues[q_id] hasn't been assigned yet?
Good catch, __ublk_deinit_queue(ub, ubq) can be added for solving the
failure handling.
Thanks,
Ming
Powered by blists - more mailing lists