[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-Id: <20210329224316.17793-10-maciej.fijalkowski@intel.com>
Date: Tue, 30 Mar 2021 00:43:08 +0200
From: Maciej Fijalkowski <maciej.fijalkowski@...el.com>
To: bpf@...r.kernel.org, netdev@...r.kernel.org, daniel@...earbox.net,
ast@...nel.org, andrii@...nel.org
Cc: bjorn.topel@...el.com, magnus.karlsson@...el.com,
ciara.loftus@...el.com, john.fastabend@...il.com, toke@...hat.com,
Maciej Fijalkowski <maciej.fijalkowski@...el.com>
Subject: [PATCH v5 bpf-next 09/17] selftests: xsk: split worker thread
Let's a have a separate Tx/Rx worker threads instead of a one common
thread packed with Tx/Rx specific checks.
Move mmap for umem buffer space and a switch_namespace() call to
thread_common_ops.
This also allows for a bunch of simplifactions that are the subject of
the next commits. The final result will be a code base that is much
easier to follow.
Signed-off-by: Maciej Fijalkowski <maciej.fijalkowski@...el.com>
---
tools/testing/selftests/bpf/xdpxceiver.c | 156 +++++++++++------------
1 file changed, 77 insertions(+), 79 deletions(-)
diff --git a/tools/testing/selftests/bpf/xdpxceiver.c b/tools/testing/selftests/bpf/xdpxceiver.c
index 2e5f536563e4..b6fd5eb1d620 100644
--- a/tools/testing/selftests/bpf/xdpxceiver.c
+++ b/tools/testing/selftests/bpf/xdpxceiver.c
@@ -760,6 +760,15 @@ static void thread_common_ops(struct ifobject *ifobject, void *bufs, pthread_mut
int ctr = 0;
int ret;
+ pthread_attr_setstacksize(&attr, THREAD_STACK);
+
+ bufs = mmap(NULL, num_frames * XSK_UMEM__DEFAULT_FRAME_SIZE,
+ PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
+ if (bufs == MAP_FAILED)
+ exit_with_error(errno);
+
+ ifobject->ns_fd = switch_namespace(ifobject->nsname);
+
xsk_configure_umem(ifobject, bufs, num_frames * XSK_UMEM__DEFAULT_FRAME_SIZE);
ret = xsk_configure_socket(ifobject);
@@ -782,9 +791,12 @@ static void thread_common_ops(struct ifobject *ifobject, void *bufs, pthread_mut
if (ctr >= SOCK_RECONF_CTR)
exit_with_error(ret);
+
+ print_verbose("Interface [%s] vector [%s]\n",
+ ifobject->ifname, ifobject->fv.vector == tx ? "Tx" : "Rx");
}
-static void *worker_testapp_validate(void *arg)
+static void *worker_testapp_validate_tx(void *arg)
{
struct udphdr *udp_hdr =
(struct udphdr *)(pkt_data + sizeof(struct ethhdr) + sizeof(struct iphdr));
@@ -792,97 +804,83 @@ static void *worker_testapp_validate(void *arg)
struct ethhdr *eth_hdr = (struct ethhdr *)pkt_data;
struct ifobject *ifobject = (struct ifobject *)arg;
struct generic_data data;
+ int spinningrxctr = 0;
void *bufs = NULL;
- pthread_attr_setstacksize(&attr, THREAD_STACK);
-
- if (!bidi_pass) {
- bufs = mmap(NULL, num_frames * XSK_UMEM__DEFAULT_FRAME_SIZE,
- PROT_READ | PROT_WRITE, MAP_PRIVATE | MAP_ANONYMOUS, -1, 0);
- if (bufs == MAP_FAILED)
- exit_with_error(errno);
+ if (!bidi_pass)
+ thread_common_ops(ifobject, bufs, &sync_mutex_tx, &spinning_tx);
- ifobject->ns_fd = switch_namespace(ifobject->nsname);
+ while (atomic_load(&spinning_rx) && spinningrxctr < SOCK_RECONF_CTR) {
+ spinningrxctr++;
+ usleep(USLEEP_MAX);
}
- if (ifobject->fv.vector == tx) {
- int spinningrxctr = 0;
+ for (int i = 0; i < num_frames; i++) {
+ /*send EOT frame */
+ if (i == (num_frames - 1))
+ data.seqnum = -1;
+ else
+ data.seqnum = i;
+ gen_udp_hdr(&data, ifobject, udp_hdr);
+ gen_ip_hdr(ifobject, ip_hdr);
+ gen_udp_csum(udp_hdr, ip_hdr);
+ gen_eth_hdr(ifobject, eth_hdr);
+ gen_eth_frame(ifobject->umem, i * XSK_UMEM__DEFAULT_FRAME_SIZE);
+ }
- if (!bidi_pass)
- thread_common_ops(ifobject, bufs, &sync_mutex_tx, &spinning_tx);
+ print_verbose("Sending %d packets on interface %s\n",
+ (opt_pkt_count - 1), ifobject->ifname);
+ tx_only_all(ifobject);
- while (atomic_load(&spinning_rx) && spinningrxctr < SOCK_RECONF_CTR) {
- spinningrxctr++;
- usleep(USLEEP_MAX);
- }
+ if (test_type != TEST_TYPE_BIDI || bidi_pass) {
+ xsk_socket__delete(ifobject->xsk->xsk);
+ (void)xsk_umem__delete(ifobject->umem->umem);
+ }
+ pthread_exit(NULL);
+}
- print_verbose("Interface [%s] vector [Tx]\n", ifobject->ifname);
- for (int i = 0; i < num_frames; i++) {
- /*send EOT frame */
- if (i == (num_frames - 1))
- data.seqnum = -1;
- else
- data.seqnum = i;
- gen_udp_hdr(&data, ifobject, udp_hdr);
- gen_ip_hdr(ifobject, ip_hdr);
- gen_udp_csum(udp_hdr, ip_hdr);
- gen_eth_hdr(ifobject, eth_hdr);
- gen_eth_frame(ifobject->umem, i * XSK_UMEM__DEFAULT_FRAME_SIZE);
- }
+static void *worker_testapp_validate_rx(void *arg)
+{
+ struct ifobject *ifobject = (struct ifobject *)arg;
+ struct pollfd fds[MAX_SOCKS] = { };
+ void *bufs = NULL;
- print_verbose("Sending %d packets on interface %s\n",
- (opt_pkt_count - 1), ifobject->ifname);
- tx_only_all(ifobject);
- } else if (ifobject->fv.vector == rx) {
- struct pollfd fds[MAX_SOCKS] = { };
- int ret;
-
- if (!bidi_pass)
- thread_common_ops(ifobject, bufs, &sync_mutex_tx, &spinning_rx);
-
- print_verbose("Interface [%s] vector [Rx]\n", ifobject->ifname);
- if (stat_test_type != STAT_TEST_RX_FILL_EMPTY)
- xsk_populate_fill_ring(ifobject->umem);
-
- TAILQ_INIT(&head);
- if (debug_pkt_dump) {
- pkt_buf = calloc(num_frames, sizeof(*pkt_buf));
- if (!pkt_buf)
- exit_with_error(errno);
- }
+ if (!bidi_pass)
+ thread_common_ops(ifobject, bufs, &sync_mutex_tx, &spinning_rx);
- fds[0].fd = xsk_socket__fd(ifobject->xsk->xsk);
- fds[0].events = POLLIN;
+ if (stat_test_type != STAT_TEST_RX_FILL_EMPTY)
+ xsk_populate_fill_ring(ifobject->umem);
- pthread_mutex_lock(&sync_mutex);
- pthread_cond_signal(&signal_rx_condition);
- pthread_mutex_unlock(&sync_mutex);
+ TAILQ_INIT(&head);
+ if (debug_pkt_dump) {
+ pkt_buf = calloc(num_frames, sizeof(*pkt_buf));
+ if (!pkt_buf)
+ exit_with_error(errno);
+ }
- while (1) {
- if (test_type == TEST_TYPE_POLL) {
- ret = poll(fds, 1, POLL_TMOUT);
- if (ret <= 0)
- continue;
- }
+ fds[0].fd = xsk_socket__fd(ifobject->xsk->xsk);
+ fds[0].events = POLLIN;
- if (test_type != TEST_TYPE_STATS) {
- rx_pkt(ifobject->xsk, fds);
- worker_pkt_validate();
- } else {
- worker_stats_validate(ifobject);
- }
+ pthread_mutex_lock(&sync_mutex);
+ pthread_cond_signal(&signal_rx_condition);
+ pthread_mutex_unlock(&sync_mutex);
- if (sigvar)
- break;
+ while (1) {
+ if (test_type != TEST_TYPE_STATS) {
+ rx_pkt(ifobject->xsk, fds);
+ worker_pkt_validate();
+ } else {
+ worker_stats_validate(ifobject);
}
+ if (sigvar)
+ break;
+ }
- if (test_type != TEST_TYPE_STATS)
- print_verbose("Received %d packets on interface %s\n",
- pkt_counter, ifobject->ifname);
+ print_verbose("Received %d packets on interface %s\n",
+ pkt_counter, ifobject->ifname);
- if (test_type == TEST_TYPE_TEARDOWN)
- print_verbose("Destroying socket\n");
- }
+ if (test_type == TEST_TYPE_TEARDOWN)
+ print_verbose("Destroying socket\n");
if ((test_type != TEST_TYPE_BIDI) || bidi_pass) {
xsk_socket__delete(ifobject->xsk->xsk);
@@ -911,12 +909,12 @@ static void testapp_validate(void)
/*Spawn RX thread */
if (!bidi || !bidi_pass) {
- if (pthread_create(&t0, &attr, worker_testapp_validate, ifdict[1]))
+ if (pthread_create(&t0, &attr, worker_testapp_validate_rx, ifdict[1]))
exit_with_error(errno);
} else if (bidi && bidi_pass) {
/*switch Tx/Rx vectors */
ifdict[0]->fv.vector = rx;
- if (pthread_create(&t0, &attr, worker_testapp_validate, ifdict[0]))
+ if (pthread_create(&t0, &attr, worker_testapp_validate_rx, ifdict[0]))
exit_with_error(errno);
}
@@ -931,12 +929,12 @@ static void testapp_validate(void)
/*Spawn TX thread */
if (!bidi || !bidi_pass) {
- if (pthread_create(&t1, &attr, worker_testapp_validate, ifdict[0]))
+ if (pthread_create(&t1, &attr, worker_testapp_validate_tx, ifdict[0]))
exit_with_error(errno);
} else if (bidi && bidi_pass) {
/*switch Tx/Rx vectors */
ifdict[1]->fv.vector = tx;
- if (pthread_create(&t1, &attr, worker_testapp_validate, ifdict[1]))
+ if (pthread_create(&t1, &attr, worker_testapp_validate_tx, ifdict[1]))
exit_with_error(errno);
}
--
2.20.1
Powered by blists - more mailing lists