[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250804022101.2171981-3-xukuohai@huaweicloud.com>
Date: Mon, 4 Aug 2025 10:20:58 +0800
From: Xu Kuohai <xukuohai@...weicloud.com>
To: bpf@...r.kernel.org,
linux-kselftest@...r.kernel.org,
linux-kernel@...r.kernel.org
Cc: Alexei Starovoitov <ast@...nel.org>,
Daniel Borkmann <daniel@...earbox.net>,
Andrii Nakryiko <andrii@...nel.org>,
Martin KaFai Lau <martin.lau@...ux.dev>,
Eduard Zingerman <eddyz87@...il.com>,
Yonghong Song <yhs@...com>,
Song Liu <song@...nel.org>,
John Fastabend <john.fastabend@...il.com>,
KP Singh <kpsingh@...nel.org>,
Stanislav Fomichev <sdf@...gle.com>,
Hao Luo <haoluo@...gle.com>,
Jiri Olsa <jolsa@...nel.org>,
Mykola Lysenko <mykolal@...com>,
Shuah Khan <shuah@...nel.org>,
Stanislav Fomichev <sdf@...ichev.me>,
Willem de Bruijn <willemb@...gle.com>,
Jason Xing <kerneljasonxing@...il.com>,
Paul Chaignon <paul.chaignon@...il.com>,
Tao Chen <chen.dylane@...ux.dev>,
Kumar Kartikeya Dwivedi <memxor@...il.com>,
Martin Kelly <martin.kelly@...wdstrike.com>
Subject: [PATCH bpf-next 2/4] libbpf: ringbuf: Add overwrite ring buffer process
From: Xu Kuohai <xukuohai@...wei.com>
In overwrite mode, the producer does not wait for the consumer, so the
consumer is responsible for handling conflicts. An optimistic method
is used to resolve the conflicts: the consumer first reads consumer_pos,
producer_pos and overwrite_pos, then calculates a read window and copies
data in the window from the ring buffer. After copying, it checks the
positions to decide if the data in the copy window have been overwritten
by be the producer. If so, it discards the copy and tries again. Once
success, the consumer processes the events in the copy.
Signed-off-by: Xu Kuohai <xukuohai@...wei.com>
---
tools/lib/bpf/ringbuf.c | 103 +++++++++++++++++++++++++++++++++++++++-
1 file changed, 102 insertions(+), 1 deletion(-)
diff --git a/tools/lib/bpf/ringbuf.c b/tools/lib/bpf/ringbuf.c
index 9702b70da444..9c072af675ff 100644
--- a/tools/lib/bpf/ringbuf.c
+++ b/tools/lib/bpf/ringbuf.c
@@ -27,10 +27,13 @@ struct ring {
ring_buffer_sample_fn sample_cb;
void *ctx;
void *data;
+ void *read_buffer;
unsigned long *consumer_pos;
unsigned long *producer_pos;
+ unsigned long *overwrite_pos;
unsigned long mask;
int map_fd;
+ bool overwrite_mode;
};
struct ring_buffer {
@@ -69,6 +72,9 @@ static void ringbuf_free_ring(struct ring_buffer *rb, struct ring *r)
r->producer_pos = NULL;
}
+ if (r->read_buffer)
+ free(r->read_buffer);
+
free(r);
}
@@ -119,6 +125,14 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
r->sample_cb = sample_cb;
r->ctx = ctx;
r->mask = info.max_entries - 1;
+ r->overwrite_mode = info.map_flags & BPF_F_OVERWRITE;
+ if (unlikely(r->overwrite_mode)) {
+ r->read_buffer = malloc(info.max_entries);
+ if (!r->read_buffer) {
+ err = -ENOMEM;
+ goto err_out;
+ }
+ }
/* Map writable consumer page */
tmp = mmap(NULL, rb->page_size, PROT_READ | PROT_WRITE, MAP_SHARED, map_fd, 0);
@@ -148,6 +162,7 @@ int ring_buffer__add(struct ring_buffer *rb, int map_fd,
goto err_out;
}
r->producer_pos = tmp;
+ r->overwrite_pos = r->producer_pos + 1; /* overwrite_pos is next to producer_pos */
r->data = tmp + rb->page_size;
e = &rb->events[rb->ring_cnt];
@@ -232,7 +247,7 @@ static inline int roundup_len(__u32 len)
return (len + 7) / 8 * 8;
}
-static int64_t ringbuf_process_ring(struct ring *r, size_t n)
+static int64_t ringbuf_process_normal_ring(struct ring *r, size_t n)
{
int *len_ptr, len, err;
/* 64-bit to avoid overflow in case of extreme application behavior */
@@ -278,6 +293,92 @@ static int64_t ringbuf_process_ring(struct ring *r, size_t n)
return cnt;
}
+static int64_t ringbuf_process_overwrite_ring(struct ring *r, size_t n)
+{
+
+ int err;
+ uint32_t *len_ptr, len;
+ /* 64-bit to avoid overflow in case of extreme application behavior */
+ int64_t cnt = 0;
+ size_t size, offset;
+ unsigned long cons_pos, prod_pos, over_pos, tmp_pos;
+ bool got_new_data;
+ void *sample;
+ bool copied;
+
+ size = r->mask + 1;
+
+ cons_pos = smp_load_acquire(r->consumer_pos);
+ do {
+ got_new_data = false;
+
+ /* grab a copy of data */
+ prod_pos = smp_load_acquire(r->producer_pos);
+ do {
+ over_pos = READ_ONCE(*r->overwrite_pos);
+ /* prod_pos may be outdated now */
+ if (over_pos < prod_pos) {
+ tmp_pos = max(cons_pos, over_pos);
+ /* smp_load_acquire(r->producer_pos) before
+ * READ_ONCE(*r->overwrite_pos) ensures that
+ * over_pos + r->mask < prod_pos never occurs,
+ * so size is never larger than r->mask
+ */
+ size = prod_pos - tmp_pos;
+ if (!size)
+ goto done;
+ memcpy(r->read_buffer,
+ r->data + (tmp_pos & r->mask), size);
+ copied = true;
+ } else {
+ copied = false;
+ }
+ prod_pos = smp_load_acquire(r->producer_pos);
+ /* retry if data is overwritten by producer */
+ } while (!copied || prod_pos - tmp_pos > r->mask);
+
+ cons_pos = tmp_pos;
+
+ for (offset = 0; offset < size; offset += roundup_len(len)) {
+ len_ptr = r->read_buffer + (offset & r->mask);
+ len = *len_ptr;
+
+ if (len & BPF_RINGBUF_BUSY_BIT)
+ goto done;
+
+ got_new_data = true;
+ cons_pos += roundup_len(len);
+
+ if ((len & BPF_RINGBUF_DISCARD_BIT) == 0) {
+ sample = (void *)len_ptr + BPF_RINGBUF_HDR_SZ;
+ err = r->sample_cb(r->ctx, sample, len);
+ if (err < 0) {
+ /* update consumer pos and bail out */
+ smp_store_release(r->consumer_pos,
+ cons_pos);
+ return err;
+ }
+ cnt++;
+ }
+
+ if (cnt >= n)
+ goto done;
+ }
+ } while (got_new_data);
+
+done:
+ smp_store_release(r->consumer_pos, cons_pos);
+ return cnt;
+}
+
+static int64_t ringbuf_process_ring(struct ring *r, size_t n)
+{
+ if (likely(!r->overwrite_mode))
+ return ringbuf_process_normal_ring(r, n);
+ else
+ return ringbuf_process_overwrite_ring(r, n);
+}
+
/* Consume available ring buffer(s) data without event polling, up to n
* records.
*
--
2.43.0
Powered by blists - more mailing lists