[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <a6838fb8ea49162097ab16ef1946d83d370c05d1.1726480607.git.lorenzo@kernel.org>
Date: Mon, 16 Sep 2024 12:13:45 +0200
From: Lorenzo Bianconi <lorenzo@...nel.org>
To: bpf@...r.kernel.org
Cc: kuba@...nel.org,
	aleksander.lobakin@...el.com,
	ast@...nel.org,
	daniel@...earbox.net,
	andrii@...nel.org,
	dxu@...uu.xyz,
	john.fastabend@...il.com,
	hawk@...nel.org,
	martin.lau@...ux.dev,
	davem@...emloft.net,
	edumazet@...gle.com,
	pabeni@...hat.com,
	netdev@...r.kernel.org,
	lorenzo.bianconi@...hat.com
Subject: [RFC/RFT v2 3/3] bpf: cpumap: Add gro support
Introduce GRO support to cpumap codebase moving the cpu_map_entry
kthread to a NAPI-kthread pinned on the selected cpu.
Signed-off-by: Lorenzo Bianconi <lorenzo@...nel.org>
---
 kernel/bpf/cpumap.c | 123 +++++++++++++++++++-------------------------
 1 file changed, 52 insertions(+), 71 deletions(-)
diff --git a/kernel/bpf/cpumap.c b/kernel/bpf/cpumap.c
index fbdf5a1aabfe4..3ec6739aec5ae 100644
--- a/kernel/bpf/cpumap.c
+++ b/kernel/bpf/cpumap.c
@@ -62,9 +62,11 @@ struct bpf_cpu_map_entry {
 	/* XDP can run multiple RX-ring queues, need __percpu enqueue store */
 	struct xdp_bulk_queue __percpu *bulkq;
 
-	/* Queue with potential multi-producers, and single-consumer kthread */
+	/* Queue with potential multi-producers, and single-consumer
+	 * NAPI-kthread
+	 */
 	struct ptr_ring *queue;
-	struct task_struct *kthread;
+	struct napi_struct napi;
 
 	struct bpf_cpumap_val value;
 	struct bpf_prog *prog;
@@ -261,58 +263,42 @@ static int cpu_map_bpf_prog_run(struct bpf_cpu_map_entry *rcpu, void **frames,
 	return nframes;
 }
 
-static int cpu_map_kthread_run(void *data)
+static int cpu_map_poll(struct napi_struct *napi, int budget)
 {
-	struct bpf_cpu_map_entry *rcpu = data;
-	unsigned long last_qs = jiffies;
+	struct xdp_cpumap_stats stats = {}; /* zero stats */
+	unsigned int kmem_alloc_drops = 0;
+	struct bpf_cpu_map_entry *rcpu;
+	int done = 0;
 
+	rcu_read_lock();
+	rcpu = container_of(napi, struct bpf_cpu_map_entry, napi);
 	complete(&rcpu->kthread_running);
-	set_current_state(TASK_INTERRUPTIBLE);
 
-	/* When kthread gives stop order, then rcpu have been disconnected
-	 * from map, thus no new packets can enter. Remaining in-flight
-	 * per CPU stored packets are flushed to this queue.  Wait honoring
-	 * kthread_stop signal until queue is empty.
-	 */
-	while (!kthread_should_stop() || !__ptr_ring_empty(rcpu->queue)) {
-		struct xdp_cpumap_stats stats = {}; /* zero stats */
-		unsigned int kmem_alloc_drops = 0, sched = 0;
+	while (done < budget) {
 		gfp_t gfp = __GFP_ZERO | GFP_ATOMIC;
-		int i, n, m, nframes, xdp_n;
+		int n, i, m, xdp_n = 0, nframes;
 		void *frames[CPUMAP_BATCH];
+		struct sk_buff *skb, *tmp;
 		void *skbs[CPUMAP_BATCH];
 		LIST_HEAD(list);
 
-		/* Release CPU reschedule checks */
-		if (__ptr_ring_empty(rcpu->queue)) {
-			set_current_state(TASK_INTERRUPTIBLE);
-			/* Recheck to avoid lost wake-up */
-			if (__ptr_ring_empty(rcpu->queue)) {
-				schedule();
-				sched = 1;
-				last_qs = jiffies;
-			} else {
-				__set_current_state(TASK_RUNNING);
-			}
-		} else {
-			rcu_softirq_qs_periodic(last_qs);
-			sched = cond_resched();
-		}
-
+		if (__ptr_ring_empty(rcpu->queue))
+			break;
 		/*
 		 * The bpf_cpu_map_entry is single consumer, with this
 		 * kthread CPU pinned. Lockless access to ptr_ring
 		 * consume side valid as no-resize allowed of queue.
 		 */
-		n = __ptr_ring_consume_batched(rcpu->queue, frames,
-					       CPUMAP_BATCH);
-		for (i = 0, xdp_n = 0; i < n; i++) {
+		n = min(budget -  done, CPUMAP_BATCH);
+		n = __ptr_ring_consume_batched(rcpu->queue, frames, n);
+		done += n;
+
+		for (i = 0; i < n; i++) {
 			void *f = frames[i];
 			struct page *page;
 
 			if (unlikely(__ptr_test_bit(0, &f))) {
-				struct sk_buff *skb = f;
-
+				skb = f;
 				__ptr_clear_bit(0, &skb);
 				list_add_tail(&skb->list, &list);
 				continue;
@@ -340,12 +326,10 @@ static int cpu_map_kthread_run(void *data)
 			}
 		}
 
-		local_bh_disable();
 		for (i = 0; i < nframes; i++) {
 			struct xdp_frame *xdpf = frames[i];
-			struct sk_buff *skb = skbs[i];
 
-			skb = __xdp_build_skb_from_frame(xdpf, skb,
+			skb = __xdp_build_skb_from_frame(xdpf, skbs[i],
 							 xdpf->dev_rx);
 			if (!skb) {
 				xdp_return_frame(xdpf);
@@ -354,17 +338,21 @@ static int cpu_map_kthread_run(void *data)
 
 			list_add_tail(&skb->list, &list);
 		}
-		netif_receive_skb_list(&list);
-
-		/* Feedback loop via tracepoint */
-		trace_xdp_cpumap_kthread(rcpu->map_id, n, kmem_alloc_drops,
-					 sched, &stats);
 
-		local_bh_enable(); /* resched point, may call do_softirq() */
+		list_for_each_entry_safe(skb, tmp, &list, list) {
+			skb_list_del_init(skb);
+			napi_gro_receive(napi, skb);
+		}
 	}
-	__set_current_state(TASK_RUNNING);
 
-	return 0;
+	rcu_read_unlock();
+	/* Feedback loop via tracepoint */
+	trace_xdp_cpumap_kthread(rcpu->map_id, done, kmem_alloc_drops, 0,
+				 &stats);
+	if (done < budget)
+		napi_complete(napi);
+
+	return done;
 }
 
 static int __cpu_map_load_bpf_program(struct bpf_cpu_map_entry *rcpu,
@@ -432,18 +420,19 @@ __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
 	if (fd > 0 && __cpu_map_load_bpf_program(rcpu, map, fd))
 		goto free_ptr_ring;
 
+	napi_init_for_gro(NULL, &rcpu->napi, cpu_map_poll,
+			  NAPI_POLL_WEIGHT);
+	set_bit(NAPI_STATE_THREADED, &rcpu->napi.state);
+
 	/* Setup kthread */
 	init_completion(&rcpu->kthread_running);
-	rcpu->kthread = kthread_create_on_node(cpu_map_kthread_run, rcpu, numa,
-					       "cpumap/%d/map:%d", cpu,
-					       map->id);
-	if (IS_ERR(rcpu->kthread))
+	rcpu->napi.thread = kthread_run_on_cpu(napi_threaded_poll,
+					       &rcpu->napi, cpu,
+					       "cpumap-napi/%d");
+	if (IS_ERR(rcpu->napi.thread))
 		goto free_prog;
 
-	/* Make sure kthread runs on a single CPU */
-	kthread_bind(rcpu->kthread, cpu);
-	wake_up_process(rcpu->kthread);
-
+	napi_schedule(&rcpu->napi);
 	/* Make sure kthread has been running, so kthread_stop() will not
 	 * stop the kthread prematurely and all pending frames or skbs
 	 * will be handled by the kthread before kthread_stop() returns.
@@ -477,12 +466,8 @@ static void __cpu_map_entry_free(struct work_struct *work)
 	 */
 	rcpu = container_of(to_rcu_work(work), struct bpf_cpu_map_entry, free_work);
 
-	/* kthread_stop will wake_up_process and wait for it to complete.
-	 * cpu_map_kthread_run() makes sure the pointer ring is empty
-	 * before exiting.
-	 */
-	kthread_stop(rcpu->kthread);
-
+	napi_disable(&rcpu->napi);
+	__netif_napi_del(&rcpu->napi);
 	if (rcpu->prog)
 		bpf_prog_put(rcpu->prog);
 	/* The queue should be empty at this point */
@@ -498,8 +483,8 @@ static void __cpu_map_entry_free(struct work_struct *work)
  * __cpu_map_entry_free() in a separate workqueue after waiting for an RCU grace
  * period. This means that (a) all pending enqueue and flush operations have
  * completed (because of the RCU callback), and (b) we are in a workqueue
- * context where we can stop the kthread and wait for it to exit before freeing
- * everything.
+ * context where we can stop the NAPI-kthread and wait for it to exit before
+ * freeing everything.
  */
 static void __cpu_map_entry_replace(struct bpf_cpu_map *cmap,
 				    u32 key_cpu, struct bpf_cpu_map_entry *rcpu)
@@ -579,9 +564,7 @@ static void cpu_map_free(struct bpf_map *map)
 	 */
 	synchronize_rcu();
 
-	/* The only possible user of bpf_cpu_map_entry is
-	 * cpu_map_kthread_run().
-	 */
+	/* The only possible user of bpf_cpu_map_entry is the NAPI-kthread. */
 	for (i = 0; i < cmap->map.max_entries; i++) {
 		struct bpf_cpu_map_entry *rcpu;
 
@@ -589,7 +572,7 @@ static void cpu_map_free(struct bpf_map *map)
 		if (!rcpu)
 			continue;
 
-		/* Stop kthread and cleanup entry directly */
+		/* Stop NAPI-kthread and cleanup entry directly */
 		__cpu_map_entry_free(&rcpu->free_work.work);
 	}
 	bpf_map_area_free(cmap->cpu_map);
@@ -753,7 +736,7 @@ int cpu_map_generic_redirect(struct bpf_cpu_map_entry *rcpu,
 	if (ret < 0)
 		goto trace;
 
-	wake_up_process(rcpu->kthread);
+	napi_schedule(&rcpu->napi);
 trace:
 	trace_xdp_cpumap_enqueue(rcpu->map_id, !ret, !!ret, rcpu->cpu);
 	return ret;
@@ -765,8 +748,6 @@ void __cpu_map_flush(struct list_head *flush_list)
 
 	list_for_each_entry_safe(bq, tmp, flush_list, flush_node) {
 		bq_flush_to_queue(bq);
-
-		/* If already running, costs spin_lock_irqsave + smb_mb */
-		wake_up_process(bq->obj->kthread);
+		napi_schedule(&bq->obj->napi);
 	}
 }
-- 
2.46.0
Powered by blists - more mailing lists
 
