[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250115185937.1324-5-ouster@cs.stanford.edu>
Date: Wed, 15 Jan 2025 10:59:28 -0800
From: John Ousterhout <ouster@...stanford.edu>
To: netdev@...r.kernel.org
Cc: pabeni@...hat.com,
edumazet@...gle.com,
horms@...nel.org,
kuba@...nel.org,
John Ousterhout <ouster@...stanford.edu>
Subject: [PATCH net-next v6 04/12] net: homa: create homa_pool.h and homa_pool.c
These files implement Homa's mechanism for managing application-level
buffer space for incoming messages This mechanism is needed to allow
Homa to copy data out to user space in parallel with receiving packets;
it was discussed in a talk at NetDev 0x17.
Signed-off-by: John Ousterhout <ouster@...stanford.edu>
---
net/homa/homa_pool.c | 453 +++++++++++++++++++++++++++++++++++++++++++
net/homa/homa_pool.h | 154 +++++++++++++++
2 files changed, 607 insertions(+)
create mode 100644 net/homa/homa_pool.c
create mode 100644 net/homa/homa_pool.h
diff --git a/net/homa/homa_pool.c b/net/homa/homa_pool.c
new file mode 100644
index 000000000000..0b2ec83b6174
--- /dev/null
+++ b/net/homa/homa_pool.c
@@ -0,0 +1,453 @@
+// SPDX-License-Identifier: BSD-2-Clause
+
+#include "homa_impl.h"
+#include "homa_pool.h"
+
+/* This file contains functions that manage user-space buffer pools. */
+
+/* Pools must always have at least this many bpages (no particular
+ * reasoning behind this value).
+ */
+#define MIN_POOL_SIZE 2
+
+/* Used when determining how many bpages to consider for allocation. */
+#define MIN_EXTRA 4
+
+/**
+ * set_bpages_needed() - Set the bpages_needed field of @pool based
+ * on the length of the first RPC that's waiting for buffer space.
+ * The caller must own the lock for @pool->hsk.
+ * @pool: Pool to update.
+ */
+static void set_bpages_needed(struct homa_pool *pool)
+{
+ struct homa_rpc *rpc = list_first_entry(&pool->hsk->waiting_for_bufs,
+ struct homa_rpc, buf_links);
+ pool->bpages_needed = (rpc->msgin.length + HOMA_BPAGE_SIZE - 1)
+ >> HOMA_BPAGE_SHIFT;
+}
+
+/**
+ * homa_pool_init() - Initialize a homa_pool; any previous contents are
+ * destroyed.
+ * @hsk: Socket containing the pool to initialize.
+ * @region: First byte of the memory region for the pool, allocated
+ * by the application; must be page-aligned.
+ * @region_size: Total number of bytes available at @buf_region.
+ * Return: Either zero (for success) or a negative errno for failure.
+ */
+int homa_pool_init(struct homa_sock *hsk, void __user *region,
+ __u64 region_size)
+{
+ struct homa_pool *pool = hsk->buffer_pool;
+ int i, result;
+
+ homa_pool_destroy(hsk->buffer_pool);
+
+ if (((uintptr_t)region) & ~PAGE_MASK)
+ return -EINVAL;
+ pool->hsk = hsk;
+ pool->region = (char __user *)region;
+ pool->num_bpages = region_size >> HOMA_BPAGE_SHIFT;
+ pool->descriptors = NULL;
+ pool->cores = NULL;
+ if (pool->num_bpages < MIN_POOL_SIZE) {
+ result = -EINVAL;
+ goto error;
+ }
+ pool->descriptors = kmalloc_array(pool->num_bpages,
+ sizeof(struct homa_bpage),
+ GFP_ATOMIC);
+ if (!pool->descriptors) {
+ result = -ENOMEM;
+ goto error;
+ }
+ for (i = 0; i < pool->num_bpages; i++) {
+ struct homa_bpage *bp = &pool->descriptors[i];
+
+ spin_lock_init(&bp->lock);
+ atomic_set(&bp->refs, 0);
+ bp->owner = -1;
+ bp->expiration = 0;
+ }
+ atomic_set(&pool->free_bpages, pool->num_bpages);
+ pool->bpages_needed = INT_MAX;
+
+ /* Allocate and initialize core-specific data. */
+ pool->cores = kmalloc_array(nr_cpu_ids, sizeof(struct homa_pool_core),
+ GFP_ATOMIC);
+ if (!pool->cores) {
+ result = -ENOMEM;
+ goto error;
+ }
+ pool->num_cores = nr_cpu_ids;
+ for (i = 0; i < pool->num_cores; i++) {
+ pool->cores[i].page_hint = 0;
+ pool->cores[i].allocated = 0;
+ pool->cores[i].next_candidate = 0;
+ }
+ pool->check_waiting_invoked = 0;
+
+ return 0;
+
+error:
+ kfree(pool->descriptors);
+ kfree(pool->cores);
+ pool->region = NULL;
+ return result;
+}
+
+/**
+ * homa_pool_destroy() - Destructor for homa_pool. After this method
+ * returns, the object should not be used unless it has been reinitialized.
+ * @pool: Pool to destroy.
+ */
+void homa_pool_destroy(struct homa_pool *pool)
+{
+ if (!pool->region)
+ return;
+ kfree(pool->descriptors);
+ kfree(pool->cores);
+ pool->region = NULL;
+}
+
+/**
+ * homa_pool_get_rcvbuf() - Return information needed to handle getsockopt
+ * for HOMA_SO_RCVBUF.
+ * @hsk: Socket on which getsockopt request was made.
+ * @args: Store info here.
+ */
+void homa_pool_get_rcvbuf(struct homa_sock *hsk,
+ struct homa_rcvbuf_args *args)
+{
+ homa_sock_lock(hsk, "homa_pool_get_rcvbuf");
+ args->start = (uintptr_t)hsk->buffer_pool->region;
+ args->length = hsk->buffer_pool->num_bpages << HOMA_BPAGE_SHIFT;
+ homa_sock_unlock(hsk);
+}
+
+/**
+ * homa_pool_get_pages() - Allocate one or more full pages from the pool.
+ * @pool: Pool from which to allocate pages
+ * @num_pages: Number of pages needed
+ * @pages: The indices of the allocated pages are stored here; caller
+ * must ensure this array is big enough. Reference counts have
+ * been set to 1 on all of these pages (or 2 if set_owner
+ * was specified).
+ * @set_owner: If nonzero, the current core is marked as owner of all
+ * of the allocated pages (and the expiration time is also
+ * set). Otherwise the pages are left unowned.
+ * Return: 0 for success, -1 if there wasn't enough free space in the pool.
+ */
+int homa_pool_get_pages(struct homa_pool *pool, int num_pages, __u32 *pages,
+ int set_owner)
+{
+ int core_num = raw_smp_processor_id();
+ struct homa_pool_core *core;
+ __u64 now = sched_clock();
+ int alloced = 0;
+ int limit = 0;
+
+ core = &pool->cores[core_num];
+ if (atomic_sub_return(num_pages, &pool->free_bpages) < 0) {
+ atomic_add(num_pages, &pool->free_bpages);
+ return -1;
+ }
+
+ /* Once we get to this point we know we will be able to find
+ * enough free pages; now we just have to find them.
+ */
+ while (alloced != num_pages) {
+ struct homa_bpage *bpage;
+ int cur, ref_count;
+
+ /* If we don't need to use all of the bpages in the pool,
+ * then try to use only the ones with low indexes. This
+ * will reduce the cache footprint for the pool by reusing
+ * a few bpages over and over. Specifically this code will
+ * not consider any candidate page whose index is >= limit.
+ * Limit is chosen to make sure there are a reasonable
+ * number of free pages in the range, so we won't have to
+ * check a huge number of pages.
+ */
+ if (limit == 0) {
+ int extra;
+
+ limit = pool->num_bpages
+ - atomic_read(&pool->free_bpages);
+ extra = limit >> 2;
+ limit += (extra < MIN_EXTRA) ? MIN_EXTRA : extra;
+ if (limit > pool->num_bpages)
+ limit = pool->num_bpages;
+ }
+
+ cur = core->next_candidate;
+ core->next_candidate++;
+ if (cur >= limit) {
+ core->next_candidate = 0;
+
+ /* Must recompute the limit for each new loop through
+ * the bpage array: we may need to consider a larger
+ * range of pages because of concurrent allocations.
+ */
+ limit = 0;
+ continue;
+ }
+ bpage = &pool->descriptors[cur];
+
+ /* Figure out whether this candidate is free (or can be
+ * stolen). Do a quick check without locking the page, and
+ * if the page looks promising, then lock it and check again
+ * (must check again in case someone else snuck in and
+ * grabbed the page).
+ */
+ ref_count = atomic_read(&bpage->refs);
+ if (ref_count >= 2 || (ref_count == 1 && (bpage->owner < 0 ||
+ bpage->expiration > now)))
+ continue;
+ if (!spin_trylock_bh(&bpage->lock))
+ continue;
+ ref_count = atomic_read(&bpage->refs);
+ if (ref_count >= 2 || (ref_count == 1 && (bpage->owner < 0 ||
+ bpage->expiration > now))) {
+ spin_unlock_bh(&bpage->lock);
+ continue;
+ }
+ if (bpage->owner >= 0)
+ atomic_inc(&pool->free_bpages);
+ if (set_owner) {
+ atomic_set(&bpage->refs, 2);
+ bpage->owner = core_num;
+ bpage->expiration = now + 1000 *
+ pool->hsk->homa->bpage_lease_usecs;
+ } else {
+ atomic_set(&bpage->refs, 1);
+ bpage->owner = -1;
+ }
+ spin_unlock_bh(&bpage->lock);
+ pages[alloced] = cur;
+ alloced++;
+ }
+ return 0;
+}
+
+/**
+ * homa_pool_allocate() - Allocate buffer space for an RPC.
+ * @rpc: RPC that needs space allocated for its incoming message (space must
+ * not already have been allocated). The fields @msgin->num_buffers
+ * and @msgin->buffers are filled in. Must be locked by caller.
+ * Return: The return value is normally 0, which means either buffer space
+ * was allocated or the @rpc was queued on @hsk->waiting. If a fatal error
+ * occurred, such as no buffer pool present, then a negative errno is
+ * returned.
+ */
+int homa_pool_allocate(struct homa_rpc *rpc)
+{
+ struct homa_pool *pool = rpc->hsk->buffer_pool;
+ int full_pages, partial, i, core_id;
+ __u32 pages[HOMA_MAX_BPAGES];
+ struct homa_pool_core *core;
+ struct homa_bpage *bpage;
+ struct homa_rpc *other;
+
+ if (!pool->region)
+ return -ENOMEM;
+
+ /* First allocate any full bpages that are needed. */
+ full_pages = rpc->msgin.length >> HOMA_BPAGE_SHIFT;
+ if (unlikely(full_pages)) {
+ if (homa_pool_get_pages(pool, full_pages, pages, 0) != 0)
+ goto out_of_space;
+ for (i = 0; i < full_pages; i++)
+ rpc->msgin.bpage_offsets[i] = pages[i] <<
+ HOMA_BPAGE_SHIFT;
+ }
+ rpc->msgin.num_bpages = full_pages;
+
+ /* The last chunk may be less than a full bpage; for this we use
+ * the bpage that we own (and reuse it for multiple messages).
+ */
+ partial = rpc->msgin.length & (HOMA_BPAGE_SIZE - 1);
+ if (unlikely(partial == 0))
+ goto success;
+ core_id = raw_smp_processor_id();
+ core = &pool->cores[core_id];
+ bpage = &pool->descriptors[core->page_hint];
+ if (!spin_trylock_bh(&bpage->lock))
+ spin_lock_bh(&bpage->lock);
+ if (bpage->owner != core_id) {
+ spin_unlock_bh(&bpage->lock);
+ goto new_page;
+ }
+ if ((core->allocated + partial) > HOMA_BPAGE_SIZE) {
+ if (atomic_read(&bpage->refs) == 1) {
+ /* Bpage is totally free, so we can reuse it. */
+ core->allocated = 0;
+ } else {
+ bpage->owner = -1;
+
+ /* We know the reference count can't reach zero here
+ * because of check above, so we won't have to decrement
+ * pool->free_bpages.
+ */
+ atomic_dec_return(&bpage->refs);
+ spin_unlock_bh(&bpage->lock);
+ goto new_page;
+ }
+ }
+ bpage->expiration = sched_clock() +
+ 1000 * pool->hsk->homa->bpage_lease_usecs;
+ atomic_inc(&bpage->refs);
+ spin_unlock_bh(&bpage->lock);
+ goto allocate_partial;
+
+ /* Can't use the current page; get another one. */
+new_page:
+ if (homa_pool_get_pages(pool, 1, pages, 1) != 0) {
+ homa_pool_release_buffers(pool, rpc->msgin.num_bpages,
+ rpc->msgin.bpage_offsets);
+ rpc->msgin.num_bpages = 0;
+ goto out_of_space;
+ }
+ core->page_hint = pages[0];
+ core->allocated = 0;
+
+allocate_partial:
+ rpc->msgin.bpage_offsets[rpc->msgin.num_bpages] = core->allocated
+ + (core->page_hint << HOMA_BPAGE_SHIFT);
+ rpc->msgin.num_bpages++;
+ core->allocated += partial;
+
+success:
+ return 0;
+
+ /* We get here if there wasn't enough buffer space for this
+ * message; add the RPC to hsk->waiting_for_bufs.
+ */
+out_of_space:
+ homa_sock_lock(pool->hsk, "homa_pool_allocate");
+ list_for_each_entry(other, &pool->hsk->waiting_for_bufs, buf_links) {
+ if (other->msgin.length > rpc->msgin.length) {
+ list_add_tail(&rpc->buf_links, &other->buf_links);
+ goto queued;
+ }
+ }
+ list_add_tail_rcu(&rpc->buf_links, &pool->hsk->waiting_for_bufs);
+
+queued:
+ set_bpages_needed(pool);
+ homa_sock_unlock(pool->hsk);
+ return 0;
+}
+
+/**
+ * homa_pool_get_buffer() - Given an RPC, figure out where to store incoming
+ * message data.
+ * @rpc: RPC for which incoming message data is being processed; its
+ * msgin must be properly initialized and buffer space must have
+ * been allocated for the message.
+ * @offset: Offset within @rpc's incoming message.
+ * @available: Will be filled in with the number of bytes of space available
+ * at the returned address (could be zero if offset is
+ * (erroneously) past the end of the message).
+ * Return: The application's virtual address for buffer space corresponding
+ * to @offset in the incoming message for @rpc.
+ */
+void __user *homa_pool_get_buffer(struct homa_rpc *rpc, int offset,
+ int *available)
+{
+ int bpage_index, bpage_offset;
+
+ bpage_index = offset >> HOMA_BPAGE_SHIFT;
+ if (offset >= rpc->msgin.length) {
+ WARN_ONCE(true, "%s got offset %d >= message length %d\n",
+ __func__, offset, rpc->msgin.length);
+ *available = 0;
+ return NULL;
+ }
+ bpage_offset = offset & (HOMA_BPAGE_SIZE - 1);
+ *available = (bpage_index < (rpc->msgin.num_bpages - 1))
+ ? HOMA_BPAGE_SIZE - bpage_offset
+ : rpc->msgin.length - offset;
+ return rpc->hsk->buffer_pool->region +
+ rpc->msgin.bpage_offsets[bpage_index] + bpage_offset;
+}
+
+/**
+ * homa_pool_release_buffers() - Release buffer space so that it can be
+ * reused.
+ * @pool: Pool that the buffer space belongs to. Doesn't need to
+ * be locked.
+ * @num_buffers: How many buffers to release.
+ * @buffers: Points to @num_buffers values, each of which is an offset
+ * from the start of the pool to the buffer to be released.
+ * Return: 0 for success, otherwise a negative errno.
+ */
+int homa_pool_release_buffers(struct homa_pool *pool, int num_buffers,
+ __u32 *buffers)
+{
+ int result = 0;
+ int i;
+
+ if (!pool->region)
+ return result;
+ for (i = 0; i < num_buffers; i++) {
+ __u32 bpage_index = buffers[i] >> HOMA_BPAGE_SHIFT;
+ struct homa_bpage *bpage = &pool->descriptors[bpage_index];
+
+ if (bpage_index < pool->num_bpages) {
+ if (atomic_dec_return(&bpage->refs) == 0)
+ atomic_inc(&pool->free_bpages);
+ } else {
+ result = -EINVAL;
+ }
+ }
+ return result;
+}
+
+/**
+ * homa_pool_check_waiting() - Checks to see if there are enough free
+ * bpages to wake up any RPCs that were blocked. Whenever
+ * homa_pool_release_buffers is invoked, this function must be invoked later,
+ * at a point when the caller holds no locks (homa_pool_release_buffers may
+ * be invoked with locks held, so it can't safely invoke this function).
+ * This is regrettably tricky, but I can't think of a better solution.
+ * @pool: Information about the buffer pool.
+ */
+void homa_pool_check_waiting(struct homa_pool *pool)
+{
+ if (!pool->region)
+ return;
+ while (atomic_read(&pool->free_bpages) >= pool->bpages_needed) {
+ struct homa_rpc *rpc;
+
+ homa_sock_lock(pool->hsk, "buffer pool");
+ if (list_empty(&pool->hsk->waiting_for_bufs)) {
+ pool->bpages_needed = INT_MAX;
+ homa_sock_unlock(pool->hsk);
+ break;
+ }
+ rpc = list_first_entry(&pool->hsk->waiting_for_bufs,
+ struct homa_rpc, buf_links);
+ if (!homa_rpc_try_lock(rpc, "homa_pool_check_waiting")) {
+ /* Can't just spin on the RPC lock because we're
+ * holding the socket lock (see sync.txt). Instead,
+ * release the socket lock and try the entire
+ * operation again.
+ */
+ homa_sock_unlock(pool->hsk);
+ continue;
+ }
+ list_del_init(&rpc->buf_links);
+ if (list_empty(&pool->hsk->waiting_for_bufs))
+ pool->bpages_needed = INT_MAX;
+ else
+ set_bpages_needed(pool);
+ homa_sock_unlock(pool->hsk);
+ homa_pool_allocate(rpc);
+ if (rpc->msgin.num_bpages > 0)
+ /* Allocation succeeded; "wake up" the RPC. */
+ rpc->msgin.resend_all = 1;
+ homa_rpc_unlock(rpc);
+ }
+}
diff --git a/net/homa/homa_pool.h b/net/homa/homa_pool.h
new file mode 100644
index 000000000000..6dbe7d77dd07
--- /dev/null
+++ b/net/homa/homa_pool.h
@@ -0,0 +1,154 @@
+/* SPDX-License-Identifier: BSD-2-Clause */
+
+/* This file contains definitions used to manage user-space buffer pools.
+ */
+
+#ifndef _HOMA_POOL_H
+#define _HOMA_POOL_H
+
+#include "homa_rpc.h"
+
+/**
+ * struct homa_bpage - Contains information about a single page in
+ * a buffer pool.
+ */
+struct homa_bpage {
+ union {
+ /**
+ * @cache_line: Ensures that each homa_bpage object
+ * is exactly one cache line long.
+ */
+ char cache_line[L1_CACHE_BYTES];
+ struct {
+ /** @lock: to synchronize shared access. */
+ spinlock_t lock;
+
+ /**
+ * @refs: Counts number of distinct uses of this
+ * bpage (1 tick for each message that is using
+ * this page, plus an additional tick if the @owner
+ * field is set).
+ */
+ atomic_t refs;
+
+ /**
+ * @owner: kernel core that currently owns this page
+ * (< 0 if none).
+ */
+ int owner;
+
+ /**
+ * @expiration: time (in sched_clock() units) after
+ * which it's OK to steal this page from its current
+ * owner (if @refs is 1).
+ */
+ __u64 expiration;
+ };
+ };
+};
+
+/**
+ * struct homa_pool_core - Holds core-specific data for a homa_pool (a bpage
+ * out of which that core is allocating small chunks).
+ */
+struct homa_pool_core {
+ union {
+ /**
+ * @cache_line: Ensures that each object is exactly one
+ * cache line long.
+ */
+ char cache_line[L1_CACHE_BYTES];
+ struct {
+ /**
+ * @page_hint: Index of bpage in pool->descriptors,
+ * which may be owned by this core. If so, we'll use it
+ * for allocating partial pages.
+ */
+ int page_hint;
+
+ /**
+ * @allocated: if the page given by @page_hint is
+ * owned by this core, this variable gives the number of
+ * (initial) bytes that have already been allocated
+ * from the page.
+ */
+ int allocated;
+
+ /**
+ * @next_candidate: when searching for free bpages,
+ * check this index next.
+ */
+ int next_candidate;
+ };
+ };
+};
+
+/**
+ * struct homa_pool - Describes a pool of buffer space for incoming
+ * messages for a particular socket; managed by homa_pool.c. The pool is
+ * divided up into "bpages", which are a multiple of the hardware page size.
+ * A bpage may be owned by a particular core so that it can more efficiently
+ * allocate space for small messages.
+ */
+struct homa_pool {
+ /**
+ * @hsk: the socket that this pool belongs to.
+ */
+ struct homa_sock *hsk;
+
+ /**
+ * @region: beginning of the pool's region (in the app's virtual
+ * memory). Divided into bpages. 0 means the pool hasn't yet been
+ * initialized.
+ */
+ char __user *region;
+
+ /** @num_bpages: total number of bpages in the pool. */
+ int num_bpages;
+
+ /** @descriptors: kmalloced area containing one entry for each bpage. */
+ struct homa_bpage *descriptors;
+
+ /**
+ * @free_bpages: the number of pages still available for allocation
+ * by homa_pool_get pages. This equals the number of pages with zero
+ * reference counts, minus the number of pages that have been claimed
+ * by homa_get_pool_pages but not yet allocated.
+ */
+ atomic_t free_bpages;
+
+ /**
+ * @bpages_needed: the number of free bpages required to satisfy the
+ * needs of the first RPC on @hsk->waiting_for_bufs, or INT_MAX if
+ * that queue is empty.
+ */
+ int bpages_needed;
+
+ /** @cores: core-specific info; dynamically allocated. */
+ struct homa_pool_core *cores;
+
+ /** @num_cores: number of elements in @cores. */
+ int num_cores;
+
+ /**
+ * @check_waiting_invoked: incremented during unit tests when
+ * homa_pool_check_waiting is invoked.
+ */
+ int check_waiting_invoked;
+};
+
+int homa_pool_allocate(struct homa_rpc *rpc);
+void homa_pool_check_waiting(struct homa_pool *pool);
+void homa_pool_destroy(struct homa_pool *pool);
+void __user *homa_pool_get_buffer(struct homa_rpc *rpc, int offset,
+ int *available);
+int homa_pool_get_pages(struct homa_pool *pool, int num_pages,
+ __u32 *pages, int leave_locked);
+void homa_pool_get_rcvbuf(struct homa_sock *hsk,
+ struct homa_rcvbuf_args *args);
+int homa_pool_init(struct homa_sock *hsk, void *buf_region,
+ __u64 region_size);
+int homa_pool_release_buffers(struct homa_pool *pool,
+ int num_buffers, __u32 *buffers);
+
+#endif /* _HOMA_POOL_H */
--
2.34.1
Powered by blists - more mailing lists