[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <63329cd7-4d3a-9497-e5ed-6995f05cd81f@mellanox.com>
Date: Fri, 13 Dec 2019 18:04:08 +0000
From: Maxim Mikityanskiy <maximmi@...lanox.com>
To: Magnus Karlsson <magnus.karlsson@...el.com>,
"bjorn.topel@...el.com" <bjorn.topel@...el.com>,
"ast@...nel.org" <ast@...nel.org>,
"daniel@...earbox.net" <daniel@...earbox.net>,
"netdev@...r.kernel.org" <netdev@...r.kernel.org>,
"jonathan.lemon@...il.com" <jonathan.lemon@...il.com>
CC: "bpf@...r.kernel.org" <bpf@...r.kernel.org>,
Saeed Mahameed <saeedm@...lanox.com>,
"jeffrey.t.kirsher@...el.com" <jeffrey.t.kirsher@...el.com>,
"maciej.fijalkowski@...el.com" <maciej.fijalkowski@...el.com>,
"maciejromanfijalkowski@...il.com" <maciejromanfijalkowski@...il.com>,
Maxim Mikityanskiy <maxtram95@...il.com>,
Maxim Mikityanskiy <maximmi@...lanox.com>
Subject: Re: [PATCH bpf-next 02/12] xsk: consolidate to one single cached
producer pointer
On 2019-12-09 09:56, Magnus Karlsson wrote:
> Currently, the xsk ring code has two cached producer pointers:
> prod_head and prod_tail. This patch consolidates these two into a
> single one called cached_prod to make the code simpler and easier to
> maintain. This will be in line with the user space part of the the
> code found in libbpf, that only uses a single cached pointer.
>
> The Rx path only uses the two top level functions
> xskq_produce_batch_desc and xskq_produce_flush_desc and they both use
> prod_head and never prod_tail. So just move them over to
> cached_prod.
>
> The Tx XDP_DRV path uses xskq_produce_addr_lazy and
> xskq_produce_flush_addr_n and unnecessarily operates on both prod_tail
> and prod_cons, so move them over to just use cached_prod by skipping
> the intermediate step of updating prod_tail.
>
> The Tx path in XDP_SKB mode uses xskq_reserve_addr and
> xskq_produce_addr. They currently use both cached pointers, but we can
> operate on the global producer pointer in xskq_produce_addr since it
> has to be updated anyway, thus eliminating the use of both cached
> pointers. We can also remove the xskq_nb_free in xskq_produce_addr
> since it is already called in xskq_reserve_addr. No need to do it
> twice.
>
> When there is only one cached producer pointer, we can also simplify
> xskq_nb_free by removing one argument.
>
> Signed-off-by: Magnus Karlsson <magnus.karlsson@...el.com>
> ---
> net/xdp/xsk_queue.h | 49 ++++++++++++++++++++++---------------------------
> 1 file changed, 22 insertions(+), 27 deletions(-)
>
> diff --git a/net/xdp/xsk_queue.h b/net/xdp/xsk_queue.h
> index a2f0ba6..d88e1a0 100644
> --- a/net/xdp/xsk_queue.h
> +++ b/net/xdp/xsk_queue.h
> @@ -35,8 +35,7 @@ struct xsk_queue {
> u64 size;
> u32 ring_mask;
> u32 nentries;
> - u32 prod_head;
> - u32 prod_tail;
> + u32 cached_prod;
> u32 cons_head;
> u32 cons_tail;
> struct xdp_ring *ring;
> @@ -94,39 +93,39 @@ static inline u64 xskq_nb_invalid_descs(struct xsk_queue *q)
>
> static inline u32 xskq_nb_avail(struct xsk_queue *q, u32 dcnt)
> {
> - u32 entries = q->prod_tail - q->cons_tail;
> + u32 entries = q->cached_prod - q->cons_tail;
>
> if (entries == 0) {
> /* Refresh the local pointer */
> - q->prod_tail = READ_ONCE(q->ring->producer);
> - entries = q->prod_tail - q->cons_tail;
> + q->cached_prod = READ_ONCE(q->ring->producer);
> + entries = q->cached_prod - q->cons_tail;
> }
>
> return (entries > dcnt) ? dcnt : entries;
> }
>
> -static inline u32 xskq_nb_free(struct xsk_queue *q, u32 producer, u32 dcnt)
> +static inline u32 xskq_nb_free(struct xsk_queue *q, u32 dcnt)
> {
> - u32 free_entries = q->nentries - (producer - q->cons_tail);
> + u32 free_entries = q->nentries - (q->cached_prod - q->cons_tail);
>
> if (free_entries >= dcnt)
> return free_entries;
>
> /* Refresh the local tail pointer */
> q->cons_tail = READ_ONCE(q->ring->consumer);
> - return q->nentries - (producer - q->cons_tail);
> + return q->nentries - (q->cached_prod - q->cons_tail);
> }
>
> static inline bool xskq_has_addrs(struct xsk_queue *q, u32 cnt)
> {
> - u32 entries = q->prod_tail - q->cons_tail;
> + u32 entries = q->cached_prod - q->cons_tail;
>
> if (entries >= cnt)
> return true;
>
> /* Refresh the local pointer. */
> - q->prod_tail = READ_ONCE(q->ring->producer);
> - entries = q->prod_tail - q->cons_tail;
> + q->cached_prod = READ_ONCE(q->ring->producer);
> + entries = q->cached_prod - q->cons_tail;
>
> return entries >= cnt;
> }
> @@ -220,17 +219,15 @@ static inline void xskq_discard_addr(struct xsk_queue *q)
> static inline int xskq_produce_addr(struct xsk_queue *q, u64 addr)
> {
> struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
> -
> - if (xskq_nb_free(q, q->prod_tail, 1) == 0)
> - return -ENOSPC;
> + unsigned int idx = q->ring->producer;
>
> /* A, matches D */
> - ring->desc[q->prod_tail++ & q->ring_mask] = addr;
> + ring->desc[idx++ & q->ring_mask] = addr;
>
> /* Order producer and data */
> smp_wmb(); /* B, matches C */
>
> - WRITE_ONCE(q->ring->producer, q->prod_tail);
> + WRITE_ONCE(q->ring->producer, idx);
> return 0;
> }
>
> @@ -238,11 +235,11 @@ static inline int xskq_produce_addr_lazy(struct xsk_queue *q, u64 addr)
> {
> struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
>
> - if (xskq_nb_free(q, q->prod_head, 1) == 0)
> + if (xskq_nb_free(q, 1) == 0)
> return -ENOSPC;
>
> /* A, matches D */
> - ring->desc[q->prod_head++ & q->ring_mask] = addr;
> + ring->desc[q->cached_prod++ & q->ring_mask] = addr;
> return 0;
> }
>
> @@ -252,17 +249,16 @@ static inline void xskq_produce_flush_addr_n(struct xsk_queue *q,
> /* Order producer and data */
> smp_wmb(); /* B, matches C */
>
> - q->prod_tail += nb_entries;
> - WRITE_ONCE(q->ring->producer, q->prod_tail);
> + WRITE_ONCE(q->ring->producer, q->ring->producer + nb_entries);
> }
>
> static inline int xskq_reserve_addr(struct xsk_queue *q)
> {
> - if (xskq_nb_free(q, q->prod_head, 1) == 0)
> + if (xskq_nb_free(q, 1) == 0)
> return -ENOSPC;
>
> /* A, matches D */
> - q->prod_head++;
> + q->cached_prod++;
> return 0;
> }
>
> @@ -340,11 +336,11 @@ static inline int xskq_produce_batch_desc(struct xsk_queue *q,
> struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
> unsigned int idx;
>
> - if (xskq_nb_free(q, q->prod_head, 1) == 0)
> + if (xskq_nb_free(q, 1) == 0)
> return -ENOSPC;
>
> /* A, matches D */
> - idx = (q->prod_head++) & q->ring_mask;
> + idx = q->cached_prod++ & q->ring_mask;
> ring->desc[idx].addr = addr;
> ring->desc[idx].len = len;
>
> @@ -356,8 +352,7 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q)
> /* Order producer and data */
> smp_wmb(); /* B, matches C */
>
> - q->prod_tail = q->prod_head;
> - WRITE_ONCE(q->ring->producer, q->prod_tail);
> + WRITE_ONCE(q->ring->producer, q->cached_prod);
> }
>
> static inline bool xskq_full_desc(struct xsk_queue *q)
> @@ -367,7 +362,7 @@ static inline bool xskq_full_desc(struct xsk_queue *q)
>
> static inline bool xskq_empty_desc(struct xsk_queue *q)
> {
> - return xskq_nb_free(q, q->prod_tail, q->nentries) == q->nentries;
> + return xskq_nb_free(q, q->nentries) == q->nentries;
I don't think this change is correct. The old code checked the number of
free items against prod_tail (== producer). The new code changes it to
prod_head (which is now cached_prod). xskq_nb_free is used in xsk_poll
to set EPOLLIN. After this change EPOLLIN will be set right after
xskq_produce_batch_desc, but it should only be set after
xskq_produce_flush_desc, just as before, otherwise the application will
wake up before the data is available, and it will just waste CPU cycles.
> }
>
> void xskq_set_umem(struct xsk_queue *q, u64 size, u64 chunk_mask);
>
Powered by blists - more mailing lists