[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <20180131135356.19134-20-bjorn.topel@gmail.com>
Date: Wed, 31 Jan 2018 14:53:51 +0100
From: Björn Töpel <bjorn.topel@...il.com>
To: bjorn.topel@...il.com, magnus.karlsson@...el.com,
alexander.h.duyck@...el.com, alexander.duyck@...il.com,
john.fastabend@...il.com, ast@...com, brouer@...hat.com,
willemdebruijn.kernel@...il.com, daniel@...earbox.net,
netdev@...r.kernel.org
Cc: Björn Töpel <bjorn.topel@...el.com>,
michael.lundkvist@...csson.com, jesse.brandeburg@...el.com,
anjali.singhai@...el.com, jeffrey.b.shaw@...el.com,
ferruh.yigit@...el.com, qi.z.zhang@...el.com
Subject: [RFC PATCH 19/24] xsk: add support for zero copy Rx
From: Björn Töpel <bjorn.topel@...el.com>
In this commit we start making use of the new ndo_bpf sub-commands,
and try to enable zero copy, if available.
Signed-off-by: Björn Töpel <bjorn.topel@...el.com>
---
net/xdp/xsk.c | 185 +++++++++++++++++++++++++++++++++++++++++++++-------------
1 file changed, 145 insertions(+), 40 deletions(-)
diff --git a/net/xdp/xsk.c b/net/xdp/xsk.c
index f372c3288301..f05ab825d157 100644
--- a/net/xdp/xsk.c
+++ b/net/xdp/xsk.c
@@ -29,15 +29,21 @@
#include <linux/netdevice.h>
#include <net/sock.h>
+#include <net/xdp_sock.h>
+#include <linux/buff_pool.h>
+
#include "xsk.h"
#include "xsk_buff.h"
#include "xsk_ring.h"
+#include "xsk_buff_pool.h"
+#include "xsk_packet_array.h"
#define XSK_UMEM_MIN_FRAME_SIZE 2048
#define XSK_ARRAY_SIZE 512
struct xsk_info {
struct xsk_packet_array *pa;
+ struct buff_pool *bp;
spinlock_t pa_lock;
struct xsk_queue *q;
struct xsk_umem *umem;
@@ -56,8 +62,24 @@ struct xdp_sock {
struct mutex tx_mutex;
u32 ifindex;
u16 queue_id;
+ bool zc_mode;
};
+static inline bool xsk_is_zc_cap(struct xdp_sock *xs)
+{
+ return xs->zc_mode;
+}
+
+static void xsk_set_zc_cap(struct xdp_sock *xs)
+{
+ xs->zc_mode = true;
+}
+
+static void xsk_clear_zc_cap(struct xdp_sock *xs)
+{
+ xs->zc_mode = false;
+}
+
static struct xdp_sock *xdp_sk(struct sock *sk)
{
return (struct xdp_sock *)sk;
@@ -323,6 +345,22 @@ static int xsk_init_tx_ring(struct sock *sk, int mr_fd, u32 desc_nr)
return xsk_init_ring(sk, mr_fd, desc_nr, &xs->tx);
}
+static void xsk_disable_zc(struct xdp_sock *xs)
+{
+ struct netdev_bpf bpf = {};
+
+ if (!xsk_is_zc_cap(xs))
+ return;
+
+ bpf.command = XDP_UNREGISTER_XSK;
+ bpf.xsk.queue_id = xs->queue_id;
+
+ rtnl_lock();
+ (void)xs->dev->netdev_ops->ndo_bpf(xs->dev, &bpf);
+ rtnl_unlock();
+ xsk_clear_zc_cap(xs);
+}
+
static int xsk_release(struct socket *sock)
{
struct sock *sk = sock->sk;
@@ -344,14 +382,22 @@ static int xsk_release(struct socket *sock)
xs_prev = xs->dev->_rx[xs->queue_id].xs;
rcu_assign_pointer(xs->dev->_rx[xs->queue_id].xs, NULL);
+ xsk_disable_zc(xs);
+
/* Wait for driver to stop using the xdp socket. */
synchronize_net();
xskpa_destroy(xs->rx.pa);
- xskpa_destroy(xs->tx.pa);
- xsk_umem_destroy(xs_prev->umem);
+ bpool_destroy(xs->rx.bp);
xskq_destroy(xs_prev->rx.q);
+ xsk_buff_info_destroy(xs->rx.buff_info);
+
+ xskpa_destroy(xs->tx.pa);
xskq_destroy(xs_prev->tx.q);
+ xsk_buff_info_destroy(xs->tx.buff_info);
+
+ xsk_umem_destroy(xs_prev->umem);
+
kobject_put(&xs_prev->dev->_rx[xs->queue_id].kobj);
dev_put(xs_prev->dev);
}
@@ -365,6 +411,45 @@ static int xsk_release(struct socket *sock)
return 0;
}
+static int xsk_dma_map_pool_cb(struct buff_pool *pool, struct device *dev,
+ enum dma_data_direction dir,
+ unsigned long attrs)
+{
+ struct xsk_buff_pool *bp = (struct xsk_buff_pool *)pool->pool;
+
+ return xsk_buff_dma_map(bp->bi, dev, dir, attrs);
+}
+
+static void xsk_error_report(void *ctx, int err)
+{
+ struct xsk_sock *xs = (struct xsk_sock *)ctx;
+}
+
+static void xsk_try_enable_zc(struct xdp_sock *xs)
+{
+ struct xsk_rx_parms rx_parms = {};
+ struct netdev_bpf bpf = {};
+ int err;
+
+ if (!xs->dev->netdev_ops->ndo_bpf)
+ return;
+
+ rx_parms.buff_pool = xs->rx.bp;
+ rx_parms.dma_map = xsk_dma_map_pool_cb;
+ rx_parms.error_report_ctx = xs;
+ rx_parms.error_report = xsk_error_report;
+
+ bpf.command = XDP_REGISTER_XSK;
+ bpf.xsk.rx_parms = &rx_parms;
+ bpf.xsk.queue_id = xs->queue_id;
+
+ rtnl_lock();
+ err = xs->dev->netdev_ops->ndo_bpf(xs->dev, &bpf);
+ rtnl_unlock();
+ if (!err)
+ xsk_set_zc_cap(xs);
+}
+
static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
{
struct sockaddr_xdp *sxdp = (struct sockaddr_xdp *)addr;
@@ -429,6 +514,13 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
goto out_rx_pa;
}
+ /* ...and Rx buffer pool is used for zerocopy. */
+ xs->rx.bp = xsk_buff_pool_create(xs->rx.buff_info, xs->rx.q);
+ if (!xs->rx.bp) {
+ err = -ENOMEM;
+ goto out_rx_bp;
+ }
+
/* Tx */
xs->tx.buff_info = xsk_buff_info_create(xs->tx.umem);
if (!xs->tx.buff_info) {
@@ -446,12 +538,17 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
rcu_assign_pointer(dev->_rx[sxdp->sxdp_queue_id].xs, xs);
+ xsk_try_enable_zc(xs);
+
goto out_unlock;
out_tx_pa:
xsk_buff_info_destroy(xs->tx.buff_info);
xs->tx.buff_info = NULL;
out_tx_bi:
+ bpool_destroy(xs->rx.bp);
+ xs->rx.bp = NULL;
+out_rx_bp:
xskpa_destroy(xs->rx.pa);
xs->rx.pa = NULL;
out_rx_pa:
@@ -509,27 +606,16 @@ int xsk_generic_rcv(struct xdp_buff *xdp)
}
EXPORT_SYMBOL_GPL(xsk_generic_rcv);
-struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
+static int __xsk_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
{
u32 len = xdp->data_end - xdp->data;
struct xsk_frame_set p;
- rcu_read_lock();
- if (!xsk)
- xsk = lookup_xsk(xdp->rxq->dev, xdp->rxq->queue_index);
- if (unlikely(!xsk)) {
- rcu_read_unlock();
- return ERR_PTR(-EINVAL);
- }
-
- if (!xskpa_next_frame_populate(xsk->rx.pa, &p)) {
- rcu_read_unlock();
- return ERR_PTR(-ENOSPC);
- }
+ if (!xskpa_next_frame_populate(xs->rx.pa, &p))
+ return -ENOSPC;
memcpy(xskf_get_data(&p), xdp->data, len);
xskf_set_frame_no_offset(&p, len, true);
- rcu_read_unlock();
/* We assume that the semantic of xdp_do_redirect is such that
* ndo_xdp_xmit will decrease the refcount of the page when it
@@ -540,41 +626,60 @@ struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
*/
page_frag_free(xdp->data);
- return xsk;
+ return 0;
}
-EXPORT_SYMBOL_GPL(xsk_rcv);
-int xsk_zc_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
+static void __xsk_rcv_zc(struct xdp_sock *xs, struct xdp_buff *xdp)
{
- u32 offset = xdp->data - xdp->data_hard_start;
- u32 len = xdp->data_end - xdp->data;
- struct xsk_frame_set p;
+ struct xsk_buff *b = (struct xsk_buff *)xdp->bp_handle;
- /* We do not need any locking here since we are guaranteed
- * a single producer and a single consumer.
- */
- if (xskpa_next_frame_populate(xsk->rx.pa, &p)) {
- xskf_set_frame(&p, len, offset, true);
- return 0;
- }
-
- /* No user-space buffer to put the packet in. */
- return -ENOSPC;
+ xskq_enq_lazy(xs->rx.q, b->id, xdp->data_end - xdp->data,
+ b->offset + (xdp->data - xdp->data_hard_start));
}
-EXPORT_SYMBOL_GPL(xsk_zc_rcv);
-void xsk_flush(struct xdp_sock *xsk)
+struct xdp_sock *xsk_rcv(struct xdp_sock *xsk, struct xdp_buff *xdp)
{
+ int err = 0;
+
rcu_read_lock();
- if (!xsk)
- xsk = lookup_xsk(xsk->dev, xsk->queue_id);
- if (unlikely(!xsk)) {
- rcu_read_unlock();
- return;
+
+ if (!xsk) {
+ xsk = lookup_xsk(xdp->rxq->dev, xdp->rxq->queue_index);
+ if (!xsk) {
+ err = -EINVAL;
+ goto out;
+ }
}
- WARN_ON_ONCE(xskpa_flush(xsk->rx.pa));
+ /* XXX Ick, this is very hacky. Need a better solution */
+ if (xdp->rxq->bpool)
+ __xsk_rcv_zc(xsk, xdp);
+ else
+ err = __xsk_rcv(xsk, xdp);
+
+out:
rcu_read_unlock();
+
+ return err ? ERR_PTR(err) : xsk;
+}
+EXPORT_SYMBOL_GPL(xsk_rcv);
+
+static void __xsk_flush(struct xdp_sock *xs)
+{
+ WARN_ON_ONCE(xskpa_flush(xs->rx.pa));
+}
+
+static void __xsk_flush_zc(struct xdp_sock *xs)
+{
+ xskq_enq_flush(xs->rx.q);
+}
+
+void xsk_flush(struct xdp_sock *xsk)
+{
+ if (xsk_is_zc_cap(xsk))
+ __xsk_flush_zc(xsk);
+ else
+ __xsk_flush(xsk);
}
EXPORT_SYMBOL_GPL(xsk_flush);
--
2.14.1
Powered by blists - more mailing lists