[<prev] [next>] [<thread-prev] [day] [month] [year] [list]
Message-ID: <CAJ8uoz35zcJT1_jZPs6ijvDrtQiGZvu3RnaeS9wx+Y_aU3m+fg@mail.gmail.com>
Date: Thu, 22 Aug 2019 12:52:08 +0200
From: Magnus Karlsson <magnus.karlsson@...il.com>
To: Eelco Chaudron <echaudro@...hat.com>
Cc: Network Development <netdev@...r.kernel.org>,
Alexei Starovoitov <ast@...nel.org>,
Daniel Borkmann <daniel@...earbox.net>,
Martin KaFai Lau <kafai@...com>,
Song Liu <songliubraving@...com>, Yonghong Song <yhs@...com>,
Andrii Nakryiko <andrii.nakryiko@...il.com>
Subject: Re: [PATCH bpf-next v5] libbpf: add xsk_ring_prod__nb_free() function
On Thu, Aug 22, 2019 at 11:54 AM Eelco Chaudron <echaudro@...hat.com> wrote:
>
> When an AF_XDP application received X packets, it does not mean X
> frames can be stuffed into the producer ring. To make it easier for
> AF_XDP applications this API allows them to check how many frames can
> be added into the ring.
>
> The patch below looks like a name change only, but the xsk_prod__
> prefix denotes that this API is exposed to be used by applications.
>
> Besides, if you set the nb value to the size of the ring, you will
> get the exact amount of slots available, at the cost of performance
> (you touch shared state for sure). nb is there to limit the
> touching of the shared state.
>
> Also the example xdpsock application has been modified to use this
> new API, so it's also able to process flows at a 1pps rate on veth
> interfaces.
Still just 1 single packet per second with veth and this optimization ;-)?
Thanks Eelco for working on this.
Acked-by: Magnus Karlsson <magnus.karlsson@...el.com>
> Signed-off-by: Eelco Chaudron <echaudro@...hat.com>
>
> ---
> v4 -> v5
> - Rebase on latest bpf-next
>
> v3 -> v4
> - Cleanedup commit message
> - Updated AF_XDP sample application to use this new API
>
> v2 -> v3
> - Removed cache by pass option
>
> v1 -> v2
> - Renamed xsk_ring_prod__free() to xsk_ring_prod__nb_free()
> - Add caching so it will only touch global state when needed
>
> samples/bpf/xdpsock_user.c | 119 ++++++++++++++++++++++++++++---------
> tools/lib/bpf/xsk.h | 4 +-
> 2 files changed, 93 insertions(+), 30 deletions(-)
>
> diff --git a/samples/bpf/xdpsock_user.c b/samples/bpf/xdpsock_user.c
> index da84c760c094..bec0ee463184 100644
> --- a/samples/bpf/xdpsock_user.c
> +++ b/samples/bpf/xdpsock_user.c
> @@ -470,9 +470,13 @@ static void kick_tx(struct xsk_socket_info *xsk)
> static inline void complete_tx_l2fwd(struct xsk_socket_info *xsk,
> struct pollfd *fds)
> {
> - u32 idx_cq = 0, idx_fq = 0;
> - unsigned int rcvd;
> + static u64 free_frames[NUM_FRAMES];
> + static size_t nr_free_frames;
> +
> + u32 idx_cq = 0, idx_fq = 0, free_slots;
> + unsigned int rcvd, i;
> size_t ndescs;
> + int ret;
>
> if (!xsk->outstanding_tx)
> return;
> @@ -485,29 +489,56 @@ static inline void complete_tx_l2fwd(struct xsk_socket_info *xsk,
>
> /* re-add completed Tx buffers */
> rcvd = xsk_ring_cons__peek(&xsk->umem->cq, ndescs, &idx_cq);
> - if (rcvd > 0) {
> - unsigned int i;
> - int ret;
> -
> - ret = xsk_ring_prod__reserve(&xsk->umem->fq, rcvd, &idx_fq);
> - while (ret != rcvd) {
> - if (ret < 0)
> - exit_with_error(-ret);
> - if (xsk_ring_prod__needs_wakeup(&xsk->umem->fq))
> - ret = poll(fds, num_socks, opt_timeout);
> - ret = xsk_ring_prod__reserve(&xsk->umem->fq, rcvd,
> - &idx_fq);
> - }
> - for (i = 0; i < rcvd; i++)
> + if (!rcvd)
> + return;
> +
> + /* When xsk_ring_cons__peek() for example returns that 5 packets
> + * have been received, it does not automatically mean that
> + * xsk_ring_prod__reserve() will have 5 slots available. You will
> + * see this, for example, when using a veth interface due to the
> + * RX_BATCH_SIZE used by the generic driver.
> + *
> + * In this example we store unused buffers and try to re-stock
> + * them the next iteration.
> + */
> +
> + free_slots = xsk_prod__nb_free(&xsk->umem->fq, rcvd + nr_free_frames);
> + if (free_slots > rcvd + nr_free_frames)
> + free_slots = rcvd + nr_free_frames;
> +
> + ret = xsk_ring_prod__reserve(&xsk->umem->fq, free_slots, &idx_fq);
> + while (ret != free_slots) {
> + if (ret < 0)
> + exit_with_error(-ret);
> +
> + if (xsk_ring_prod__needs_wakeup(&xsk->umem->fq))
> + ret = poll(fds, num_socks, opt_timeout);
> +
> + ret = xsk_ring_prod__reserve(&xsk->umem->fq, free_slots,
> + &idx_fq);
> + }
> + for (i = 0; i < rcvd; i++) {
> + u64 addr = *xsk_ring_cons__comp_addr(&xsk->umem->cq, idx_cq++);
> +
> + if (i < free_slots)
> *xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) =
> - *xsk_ring_cons__comp_addr(&xsk->umem->cq,
> - idx_cq++);
> + addr;
> + else
> + free_frames[nr_free_frames++] = addr;
> + }
>
> - xsk_ring_prod__submit(&xsk->umem->fq, rcvd);
> - xsk_ring_cons__release(&xsk->umem->cq, rcvd);
> - xsk->outstanding_tx -= rcvd;
> - xsk->tx_npkts += rcvd;
> + if (free_slots > rcvd) {
> + for (i = 0; i < (free_slots - rcvd); i++) {
> + u64 addr = free_frames[--nr_free_frames];
> + *xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) =
> + addr;
> + }
> }
> +
> + xsk_ring_prod__submit(&xsk->umem->fq, free_slots);
> + xsk_ring_cons__release(&xsk->umem->cq, rcvd);
> + xsk->outstanding_tx -= rcvd;
> + xsk->tx_npkts += rcvd;
> }
>
> static inline void complete_tx_only(struct xsk_socket_info *xsk)
> @@ -531,8 +562,11 @@ static inline void complete_tx_only(struct xsk_socket_info *xsk)
>
> static void rx_drop(struct xsk_socket_info *xsk, struct pollfd *fds)
> {
> + static u64 free_frames[NUM_FRAMES];
> + static size_t nr_free_frames;
> +
> unsigned int rcvd, i;
> - u32 idx_rx = 0, idx_fq = 0;
> + u32 idx_rx = 0, idx_fq = 0, free_slots;
> int ret;
>
> rcvd = xsk_ring_cons__peek(&xsk->rx, BATCH_SIZE, &idx_rx);
> @@ -542,13 +576,30 @@ static void rx_drop(struct xsk_socket_info *xsk, struct pollfd *fds)
> return;
> }
>
> - ret = xsk_ring_prod__reserve(&xsk->umem->fq, rcvd, &idx_fq);
> - while (ret != rcvd) {
> + /* When xsk_ring_cons__peek() for example returns that 5 packets
> + * have been received, it does not automatically mean that
> + * xsk_ring_prod__reserve() will have 5 slots available. You will
> + * see this, for example, when using a veth interface due to the
> + * RX_BATCH_SIZE used by the generic driver.
> + *
> + * In this example we store unused buffers and try to re-stock
> + * them the next iteration.
> + */
> +
> + free_slots = xsk_prod__nb_free(&xsk->umem->fq, rcvd + nr_free_frames);
> + if (free_slots > rcvd + nr_free_frames)
> + free_slots = rcvd + nr_free_frames;
> +
> + ret = xsk_ring_prod__reserve(&xsk->umem->fq, free_slots, &idx_fq);
> + while (ret != free_slots) {
> if (ret < 0)
> exit_with_error(-ret);
> +
> if (xsk_ring_prod__needs_wakeup(&xsk->umem->fq))
> ret = poll(fds, num_socks, opt_timeout);
> - ret = xsk_ring_prod__reserve(&xsk->umem->fq, rcvd, &idx_fq);
> +
> + ret = xsk_ring_prod__reserve(&xsk->umem->fq, free_slots,
> + &idx_fq);
> }
>
> for (i = 0; i < rcvd; i++) {
> @@ -557,10 +608,22 @@ static void rx_drop(struct xsk_socket_info *xsk, struct pollfd *fds)
> char *pkt = xsk_umem__get_data(xsk->umem->buffer, addr);
>
> hex_dump(pkt, len, addr);
> - *xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) = addr;
> + if (i < free_slots)
> + *xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) =
> + addr;
> + else
> + free_frames[nr_free_frames++] = addr;
> + }
> +
> + if (free_slots > rcvd) {
> + for (i = 0; i < (free_slots - rcvd); i++) {
> + u64 addr = free_frames[--nr_free_frames];
> + *xsk_ring_prod__fill_addr(&xsk->umem->fq, idx_fq++) =
> + addr;
> + }
> }
>
> - xsk_ring_prod__submit(&xsk->umem->fq, rcvd);
> + xsk_ring_prod__submit(&xsk->umem->fq, free_slots);
> xsk_ring_cons__release(&xsk->rx, rcvd);
> xsk->rx_npkts += rcvd;
> }
> diff --git a/tools/lib/bpf/xsk.h b/tools/lib/bpf/xsk.h
> index aa1d6122b7db..520a772c882c 100644
> --- a/tools/lib/bpf/xsk.h
> +++ b/tools/lib/bpf/xsk.h
> @@ -82,7 +82,7 @@ static inline int xsk_ring_prod__needs_wakeup(const struct xsk_ring_prod *r)
> return *r->flags & XDP_RING_NEED_WAKEUP;
> }
>
> -static inline __u32 xsk_prod_nb_free(struct xsk_ring_prod *r, __u32 nb)
> +static inline __u32 xsk_prod__nb_free(struct xsk_ring_prod *r, __u32 nb)
> {
> __u32 free_entries = r->cached_cons - r->cached_prod;
>
> @@ -116,7 +116,7 @@ static inline __u32 xsk_cons_nb_avail(struct xsk_ring_cons *r, __u32 nb)
> static inline size_t xsk_ring_prod__reserve(struct xsk_ring_prod *prod,
> size_t nb, __u32 *idx)
> {
> - if (xsk_prod_nb_free(prod, nb) < nb)
> + if (xsk_prod__nb_free(prod, nb) < nb)
> return 0;
>
> *idx = prod->cached_prod;
> --
> 2.18.1
>
Powered by blists - more mailing lists