[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <CANn89iJ26WjmTBrEKwMJbQCKWYFmz2h25T+kOgLASXPvsDR1BQ@mail.gmail.com>
Date: Tue, 2 Sep 2025 00:19:08 -0700
From: Eric Dumazet <edumazet@...gle.com>
To: John Ousterhout <ouster@...stanford.edu>
Cc: netdev@...r.kernel.org, pabeni@...hat.com, horms@...nel.org,
kuba@...nel.org
Subject: Re: [PATCH net-next v15 12/15] net: homa: create homa_incoming.c
On Mon, Aug 18, 2025 at 1:56 PM John Ousterhout <ouster@...stanford.edu> wrote:
>
> This file contains most of the code for handling incoming packets,
> including top-level dispatching code plus specific handlers for each
> pack type. It also contains code for dispatching fully-received
> messages to waiting application threads.
>
> Signed-off-by: John Ousterhout <ouster@...stanford.edu>
>
> ---
> Changes for v14:
> * Use new homa_rpc_tx_end function
> * Fix race in homa_wait_shared (an RPC could get lost if it became
> ready at the same time that homa_interest_wait returned with an error)
> * Handle nonblocking behavior here, rather than in homa_interest.c
> * Change API for homa_wait_private to distinguish errors in an RPC from
> errors that prevented the wait operation from completing.
>
> Changes for v11:
> * Cleanup and simplify use of RPC reference counts.
> * Cleanup sparse annotations.
> * Rework the mechanism for waking up RPCs that stalled waiting for
> buffer pool space.
>
> Changes for v10:
> * Revise sparse annotations to eliminate __context__ definition
> * Refactor resend mechanism (new function homa_request_retrans replaces
> homa_gap_retry)
> * Remove log messages after alloc errors
> * Fix socket cleanup race
>
> Changes for v9:
> * Add support for homa_net objects
> * Use new homa_clock abstraction layer
> * Various name improvements (e.g. use "alloc" instead of "new" for functions
> that allocate memory)
>
> Changes for v7:
> * API change for homa_rpc_handoff
> * Refactor waiting mechanism for incoming packets: simplify wait
> criteria and use standard Linux mechanisms for waiting, use
> new homa_interest struct
> * Reject unauthorized incoming request messages
> * Improve documentation for code that spins (and reduce spin length)
> * Use RPC reference counts, eliminate RPC_HANDING_OFF flag
> * Replace erroneous use of "safe" list iteration with "rcu" version
> * Remove locker argument from locking functions
> * Check incoming messages against HOMA_MAX_MESSAGE_LENGTH
> * Use u64 and __u64 properly
> ---
> net/homa/homa_impl.h | 15 +
> net/homa/homa_incoming.c | 886 +++++++++++++++++++++++++++++++++++++++
> 2 files changed, 901 insertions(+)
> create mode 100644 net/homa/homa_incoming.c
>
> diff --git a/net/homa/homa_impl.h b/net/homa/homa_impl.h
> index 49ca4abfb50b..3d91b7f44de9 100644
> --- a/net/homa/homa_impl.h
> +++ b/net/homa/homa_impl.h
> @@ -421,22 +421,37 @@ static inline bool homa_make_header_avl(struct sk_buff *skb)
>
> extern unsigned int homa_net_id;
>
> +void homa_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk,
> + struct homa_rpc *rpc);
> +void homa_add_packet(struct homa_rpc *rpc, struct sk_buff *skb);
> +int homa_copy_to_user(struct homa_rpc *rpc);
> +void homa_data_pkt(struct sk_buff *skb, struct homa_rpc *rpc);
> void homa_destroy(struct homa *homa);
> +void homa_dispatch_pkts(struct sk_buff *skb);
> int homa_fill_data_interleaved(struct homa_rpc *rpc,
> struct sk_buff *skb, struct iov_iter *iter);
> +struct homa_gap *homa_gap_alloc(struct list_head *next, int start, int end);
> int homa_init(struct homa *homa);
> int homa_message_out_fill(struct homa_rpc *rpc,
> struct iov_iter *iter, int xmit);
> void homa_message_out_init(struct homa_rpc *rpc, int length);
> +void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk,
> + struct homa_rpc *rpc);
> void homa_net_destroy(struct homa_net *hnet);
> int homa_net_init(struct homa_net *hnet, struct net *net,
> struct homa *homa);
> +void homa_request_retrans(struct homa_rpc *rpc);
> +void homa_resend_pkt(struct sk_buff *skb, struct homa_rpc *rpc,
> + struct homa_sock *hsk);
> void homa_rpc_handoff(struct homa_rpc *rpc);
> int homa_rpc_tx_end(struct homa_rpc *rpc);
> void homa_spin(int ns);
> struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc,
> struct iov_iter *iter, int offset,
> int length, int max_seg_data);
> +void homa_rpc_unknown_pkt(struct sk_buff *skb, struct homa_rpc *rpc);
> +int homa_wait_private(struct homa_rpc *rpc, int nonblocking);
> +struct homa_rpc *homa_wait_shared(struct homa_sock *hsk, int nonblocking);
> int homa_xmit_control(enum homa_packet_type type, void *contents,
> size_t length, struct homa_rpc *rpc);
> int __homa_xmit_control(void *contents, size_t length,
> diff --git a/net/homa/homa_incoming.c b/net/homa/homa_incoming.c
> new file mode 100644
> index 000000000000..c485dd98cba9
> --- /dev/null
> +++ b/net/homa/homa_incoming.c
> @@ -0,0 +1,886 @@
> +// SPDX-License-Identifier: BSD-2-Clause or GPL-2.0+
> +
> +/* This file contains functions that handle incoming Homa messages. */
> +
> +#include "homa_impl.h"
> +#include "homa_interest.h"
> +#include "homa_peer.h"
> +#include "homa_pool.h"
> +
> +/**
> + * homa_message_in_init() - Constructor for homa_message_in.
> + * @rpc: RPC whose msgin structure should be initialized. The
> + * msgin struct is assumed to be zeroes.
> + * @length: Total number of bytes in message.
> + * Return: Zero for successful initialization, or a negative errno
> + * if rpc->msgin could not be initialized.
> + */
> +int homa_message_in_init(struct homa_rpc *rpc, int length)
> + __must_hold(rpc->bucket->lock)
> +{
> + int err;
> +
> + if (length > HOMA_MAX_MESSAGE_LENGTH)
> + return -EINVAL;
> +
> + rpc->msgin.length = length;
> + skb_queue_head_init(&rpc->msgin.packets);
Do you need the lock, or can you use __skb_queue_head_init() here for clarity ?
> + INIT_LIST_HEAD(&rpc->msgin.gaps);
> + rpc->msgin.bytes_remaining = length;
> + err = homa_pool_alloc_msg(rpc);
> + if (err != 0) {
> + rpc->msgin.length = -1;
> + return err;
> + }
> + return 0;
> +}
> +
> +/**
> + * homa_gap_alloc() - Allocate a new gap and add it to a gap list.
> + * @next: Add the new gap just before this list element.
> + * @start: Offset of first byte covered by the gap.
> + * @end: Offset of byte just after the last one covered by the gap.
> + * Return: Pointer to the new gap, or NULL if memory couldn't be allocated
> + * for the gap object.
> + */
> +struct homa_gap *homa_gap_alloc(struct list_head *next, int start, int end)
> +{
> + struct homa_gap *gap;
> +
> + gap = kmalloc(sizeof(*gap), GFP_ATOMIC);
> + if (!gap)
> + return NULL;
> + gap->start = start;
> + gap->end = end;
> + gap->time = homa_clock();
> + list_add_tail(&gap->links, next);
> + return gap;
> +}
> +
> +/**
> + * homa_request_retrans() - The function is invoked when it appears that
> + * data packets for a message have been lost. It issues RESEND requests
> + * as appropriate and may modify the state of the RPC.
> + * @rpc: RPC for which incoming data is delinquent; must be locked by
> + * caller.
> + */
> +void homa_request_retrans(struct homa_rpc *rpc)
> + __must_hold(rpc->bucket->lock)
> +{
> + struct homa_resend_hdr resend;
> + struct homa_gap *gap;
> + int offset, length;
> +
> + if (rpc->msgin.length >= 0) {
> + /* Issue RESENDS for any gaps in incoming data. */
> + list_for_each_entry(gap, &rpc->msgin.gaps, links) {
> + resend.offset = htonl(gap->start);
> + resend.length = htonl(gap->end - gap->start);
> + homa_xmit_control(RESEND, &resend, sizeof(resend), rpc);
> + }
> +
> + /* Issue a RESEND for any granted data after the last gap. */
> + offset = rpc->msgin.recv_end;
> + length = rpc->msgin.length - rpc->msgin.recv_end;
> + if (length <= 0)
> + return;
> + } else {
> + /* No data has been received for the RPC. Ask the sender to
> + * resend everything it has sent so far.
> + */
> + offset = 0;
> + length = -1;
> + }
> +
> + resend.offset = htonl(offset);
> + resend.length = htonl(length);
> + homa_xmit_control(RESEND, &resend, sizeof(resend), rpc);
> +}
> +
> +/**
> + * homa_add_packet() - Add an incoming packet to the contents of a
> + * partially received message.
> + * @rpc: Add the packet to the msgin for this RPC.
> + * @skb: The new packet. This function takes ownership of the packet
> + * (the packet will either be freed or added to rpc->msgin.packets).
> + */
> +void homa_add_packet(struct homa_rpc *rpc, struct sk_buff *skb)
> + __must_hold(rpc->bucket->lock)
> +{
> + struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data;
> + struct homa_gap *gap, *dummy, *gap2;
> + int start = ntohl(h->seg.offset);
> + int length = homa_data_len(skb);
> + int end = start + length;
> +
> + if ((start + length) > rpc->msgin.length)
> + goto discard;
> +
> + if (start == rpc->msgin.recv_end) {
> + /* Common case: packet is sequential. */
> + rpc->msgin.recv_end += length;
> + goto keep;
> + }
> +
> + if (start > rpc->msgin.recv_end) {
> + /* Packet creates a new gap. */
> + if (!homa_gap_alloc(&rpc->msgin.gaps,
> + rpc->msgin.recv_end, start))
> + goto discard;
> + rpc->msgin.recv_end = end;
> + goto keep;
> + }
> +
> + /* Must now check to see if the packet fills in part or all of
> + * an existing gap.
> + */
> + list_for_each_entry_safe(gap, dummy, &rpc->msgin.gaps, links) {
> + /* Is packet at the start of this gap? */
> + if (start <= gap->start) {
> + if (end <= gap->start)
> + continue;
> + if (start < gap->start)
> + goto discard;
> + if (end > gap->end)
> + goto discard;
> + gap->start = end;
> + if (gap->start >= gap->end) {
> + list_del(&gap->links);
> + kfree(gap);
> + }
> + goto keep;
> + }
> +
> + /* Is packet at the end of this gap? BTW, at this point we know
> + * the packet can't cover the entire gap.
> + */
> + if (end >= gap->end) {
> + if (start >= gap->end)
> + continue;
> + if (end > gap->end)
> + goto discard;
> + gap->end = start;
> + goto keep;
> + }
> +
> + /* Packet is in the middle of the gap; must split the gap. */
> + gap2 = homa_gap_alloc(&gap->links, gap->start, start);
> + if (!gap2)
> + goto discard;
> + gap2->time = gap->time;
> + gap->start = end;
> + goto keep;
> + }
> +
> +discard:
> + kfree_skb(skb);
> + return;
> +
> +keep:
> + __skb_queue_tail(&rpc->msgin.packets, skb);
> + rpc->msgin.bytes_remaining -= length;
> +}
> +
> +/**
> + * homa_copy_to_user() - Copy as much data as possible from incoming
> + * packet buffers to buffers in user space.
> + * @rpc: RPC for which data should be copied. Must be locked by caller.
> + * Return: Zero for success or a negative errno if there is an error.
> + * It is possible for the RPC to be freed while this function
> + * executes (it releases and reacquires the RPC lock). If that
> + * happens, -EINVAL will be returned and the state of @rpc
> + * will be RPC_DEAD. Clears the RPC_PKTS_READY bit in @rpc->flags
> + * if all available packets have been copied out.
> + */
> +int homa_copy_to_user(struct homa_rpc *rpc)
> + __must_hold(rpc->bucket->lock)
> +{
> +#define MAX_SKBS 20
> + struct sk_buff *skbs[MAX_SKBS];
> + int error = 0;
> + int n = 0; /* Number of filled entries in skbs. */
> + int i;
> +
> + /* Tricky note: we can't hold the RPC lock while we're actually
> + * copying to user space, because (a) it's illegal to hold a spinlock
> + * while copying to user space and (b) we'd like for homa_softirq
> + * to add more packets to the RPC while we're copying these out.
> + * So, collect a bunch of packets to copy, then release the lock,
> + * copy them, and reacquire the lock.
> + */
> + while (true) {
> + struct sk_buff *skb;
> +
> + if (rpc->state == RPC_DEAD) {
> + error = -EINVAL;
> + break;
> + }
> +
> + skb = __skb_dequeue(&rpc->msgin.packets);
> + if (skb) {
> + skbs[n] = skb;
> + n++;
> + if (n < MAX_SKBS)
> + continue;
> + }
> + if (n == 0) {
> + atomic_andnot(RPC_PKTS_READY, &rpc->flags);
All networking uses clear_bit() instead...
> + break;
> + }
> +
> + /* At this point we've collected a batch of packets (or
> + * run out of packets); copy any available packets out to
> + * user space.
> + */
> + homa_rpc_unlock(rpc);
> +
> + /* Each iteration of this loop copies out one skb. */
> + for (i = 0; i < n; i++) {
> + struct homa_data_hdr *h = (struct homa_data_hdr *)
> + skbs[i]->data;
> + int pkt_length = homa_data_len(skbs[i]);
> + int offset = ntohl(h->seg.offset);
> + int buf_bytes, chunk_size;
> + struct iov_iter iter;
> + int copied = 0;
> + char __user *dst;
> +
> + /* Each iteration of this loop copies to one
> + * user buffer.
> + */
> + while (copied < pkt_length) {
> + chunk_size = pkt_length - copied;
> + dst = homa_pool_get_buffer(rpc, offset + copied,
> + &buf_bytes);
> + if (buf_bytes < chunk_size) {
> + if (buf_bytes == 0) {
> + /* skb has data beyond message
> + * end?
> + */
> + break;
> + }
> + chunk_size = buf_bytes;
> + }
> + error = import_ubuf(READ, dst, chunk_size,
> + &iter);
> + if (error)
> + goto free_skbs;
> + error = skb_copy_datagram_iter(skbs[i],
> + sizeof(*h) +
> + copied, &iter,
> + chunk_size);
> + if (error)
> + goto free_skbs;
> + copied += chunk_size;
> + }
> + }
> +
> +free_skbs:
> + for (i = 0; i < n; i++)
> + kfree_skb(skbs[i]);
There is a big difference between kfree_skb() and consume_skb()
> + n = 0;
> + atomic_or(APP_NEEDS_LOCK, &rpc->flags);
> + homa_rpc_lock(rpc);
> + atomic_andnot(APP_NEEDS_LOCK, &rpc->flags);
This construct would probably need a helper.
> + if (error)
> + break;
> + }
> + return error;
> +}
> +
> +/**
> + * homa_dispatch_pkts() - Top-level function that processes a batch of packets,
> + * all related to the same RPC.
> + * @skb: First packet in the batch, linked through skb->next.
> + */
> +void homa_dispatch_pkts(struct sk_buff *skb)
> +{
> +#define MAX_ACKS 10
> + const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
> + struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data;
> + u64 id = homa_local_id(h->common.sender_id);
> + int dport = ntohs(h->common.dport);
> +
> + /* Used to collect acks from data packets so we can process them
> + * all at the end (can't process them inline because that may
> + * require locking conflicting RPCs). If we run out of space just
> + * ignore the extra acks; they'll be regenerated later through the
> + * explicit mechanism.
> + */
> + struct homa_ack acks[MAX_ACKS];
> + struct homa_rpc *rpc = NULL;
> + struct homa_sock *hsk;
> + struct homa_net *hnet;
> + struct sk_buff *next;
> + int num_acks = 0;
> +
> + /* Find the appropriate socket.*/
> + hnet = homa_net_from_skb(skb);
> + hsk = homa_sock_find(hnet, dport);
> + if (!hsk || (!homa_is_client(id) && !hsk->is_server)) {
> + if (skb_is_ipv6(skb))
> + icmp6_send(skb, ICMPV6_DEST_UNREACH,
> + ICMPV6_PORT_UNREACH, 0, NULL, IP6CB(skb));
> + else
> + icmp_send(skb, ICMP_DEST_UNREACH,
> + ICMP_PORT_UNREACH, 0);
> + while (skb) {
> + next = skb->next;
> + kfree_skb(skb);
> + skb = next;
> + }
> + if (hsk)
> + sock_put(&hsk->sock);
> + return;
> + }
> +
> + /* Each iteration through the following loop processes one packet. */
> + for (; skb; skb = next) {
> + h = (struct homa_data_hdr *)skb->data;
> + next = skb->next;
> +
> + /* Relinquish the RPC lock temporarily if it's needed
> + * elsewhere.
> + */
> + if (rpc) {
> + int flags = atomic_read(&rpc->flags);
> +
> + if (flags & APP_NEEDS_LOCK) {
> + homa_rpc_unlock(rpc);
> +
> + /* This short spin is needed to ensure that the
> + * other thread gets the lock before this thread
> + * grabs it again below (the need for this
> + * was confirmed experimentally in 2/2025;
> + * without it, the handoff fails 20-25% of the
> + * time). Furthermore, the call to homa_spin
> + * seems to allow the other thread to acquire
> + * the lock more quickly.
> + */
> + homa_spin(100);
> + homa_rpc_lock(rpc);
> + }
> + }
> +
> + /* If we don't already have an RPC, find it, lock it,
> + * and create a reference on it.
> + */
> + if (!rpc) {
> + if (!homa_is_client(id)) {
> + /* We are the server for this RPC. */
> + if (h->common.type == DATA) {
> + int created;
> +
> + /* Create a new RPC if one doesn't
> + * already exist.
> + */
> + rpc = homa_rpc_alloc_server(hsk, &saddr,
> + h,
> + &created);
> + if (IS_ERR(rpc)) {
> + rpc = NULL;
> + goto discard;
> + }
> + } else {
> + rpc = homa_rpc_find_server(hsk, &saddr,
> + id);
> + }
> + } else {
> + rpc = homa_rpc_find_client(hsk, id);
> + }
> + if (rpc)
> + homa_rpc_hold(rpc);
> + }
> + if (unlikely(!rpc)) {
> + if (h->common.type != NEED_ACK &&
> + h->common.type != ACK &&
> + h->common.type != RESEND)
> + goto discard;
> + } else {
> + if (h->common.type == DATA ||
> + h->common.type == BUSY)
> + rpc->silent_ticks = 0;
> + rpc->peer->outstanding_resends = 0;
> + }
> +
> + switch (h->common.type) {
> + case DATA:
> + if (h->ack.client_id) {
> + /* Save the ack for processing later, when we
> + * have released the RPC lock.
> + */
> + if (num_acks < MAX_ACKS) {
> + acks[num_acks] = h->ack;
> + num_acks++;
> + }
> + }
> + homa_data_pkt(skb, rpc);
> + break;
> + case RESEND:
> + homa_resend_pkt(skb, rpc, hsk);
> + break;
> + case RPC_UNKNOWN:
> + homa_rpc_unknown_pkt(skb, rpc);
> + break;
> + case BUSY:
> + /* Nothing to do for these packets except reset
> + * silent_ticks, which happened above.
> + */
> + goto discard;
> + case NEED_ACK:
> + homa_need_ack_pkt(skb, hsk, rpc);
> + break;
> + case ACK:
> + homa_ack_pkt(skb, hsk, rpc);
> + break;
> + goto discard;
> + }
> + continue;
> +
> +discard:
> + kfree_skb(skb);
> + }
> + if (rpc) {
> + homa_rpc_put(rpc);
> + homa_rpc_unlock(rpc);
> + }
> +
> + while (num_acks > 0) {
> + num_acks--;
> + homa_rpc_acked(hsk, &saddr, &acks[num_acks]);
> + }
> +
> + if (hsk->dead_skbs >= 2 * hsk->homa->dead_buffs_limit)
> + /* We get here if other approaches are not keeping up with
> + * reaping dead RPCs. See "RPC Reaping Strategy" in
> + * homa_rpc_reap code for details.
> + */
> + homa_rpc_reap(hsk, false);
> + sock_put(&hsk->sock);
> +}
> +
> +/**
> + * homa_data_pkt() - Handler for incoming DATA packets
> + * @skb: Incoming packet; size known to be large enough for the header.
> + * This function now owns the packet.
> + * @rpc: Information about the RPC corresponding to this packet.
> + * Must be locked by the caller.
> + */
> +void homa_data_pkt(struct sk_buff *skb, struct homa_rpc *rpc)
> + __must_hold(rpc->bucket->lock)
> +{
> + struct homa_data_hdr *h = (struct homa_data_hdr *)skb->data;
> +
> + if (rpc->state != RPC_INCOMING && homa_is_client(rpc->id)) {
> + if (unlikely(rpc->state != RPC_OUTGOING))
> + goto discard;
> + rpc->state = RPC_INCOMING;
> + if (homa_message_in_init(rpc, ntohl(h->message_length)) != 0)
> + goto discard;
> + } else if (rpc->state != RPC_INCOMING) {
> + /* Must be server; note that homa_rpc_alloc_server already
> + * initialized msgin and allocated buffers.
> + */
> + if (unlikely(rpc->msgin.length >= 0))
> + goto discard;
> + }
> +
> + if (rpc->msgin.num_bpages == 0)
> + /* Drop packets that arrive when we can't allocate buffer
> + * space. If we keep them around, packet buffer usage can
> + * exceed available cache space, resulting in poor
> + * performance.
> + */
> + goto discard;
> +
> + homa_add_packet(rpc, skb);
> +
> + if (skb_queue_len(&rpc->msgin.packets) != 0 &&
> + !(atomic_read(&rpc->flags) & RPC_PKTS_READY)) {
> + atomic_or(RPC_PKTS_READY, &rpc->flags);
> + homa_rpc_handoff(rpc);
> + }
> +
> + return;
> +
> +discard:
> + kfree_skb(skb);
> +}
> +
> +/**
> + * homa_resend_pkt() - Handler for incoming RESEND packets
> + * @skb: Incoming packet; size already verified large enough for header.
> + * This function now owns the packet.
> + * @rpc: Information about the RPC corresponding to this packet; must
> + * be locked by caller, but may be NULL if there is no RPC matching
> + * this packet
> + * @hsk: Socket on which the packet was received.
> + */
> +void homa_resend_pkt(struct sk_buff *skb, struct homa_rpc *rpc,
> + struct homa_sock *hsk)
> + __must_hold(rpc->bucket->lock)
> +{
> + struct homa_resend_hdr *h = (struct homa_resend_hdr *)skb->data;
> + int offset = ntohl(h->offset);
> + int length = ntohl(h->length);
> + int end = offset + length;
> + struct homa_busy_hdr busy;
> + int tx_end;
> +
> + if (!rpc) {
> + homa_xmit_unknown(skb, hsk);
> + goto done;
> + }
> +
> + tx_end = homa_rpc_tx_end(rpc);
> + if (!homa_is_client(rpc->id) && rpc->state != RPC_OUTGOING) {
> + /* We are the server for this RPC and don't yet have a
> + * response message, so send BUSY to keep the client
> + * waiting.
> + */
> + homa_xmit_control(BUSY, &busy, sizeof(busy), rpc);
> + goto done;
> + }
> +
> + if (length == -1)
> + end = tx_end;
> +
> + homa_resend_data(rpc, offset, (end > tx_end) ? tx_end : end);
> +
> + if (offset >= tx_end) {
> + /* We have chosen not to transmit any of the requested data;
> + * send BUSY so the receiver knows we are alive.
> + */
> + homa_xmit_control(BUSY, &busy, sizeof(busy), rpc);
> + goto done;
> + }
> +
> +done:
> + kfree_skb(skb);
> +}
> +
> +/**
> + * homa_rpc_unknown_pkt() - Handler for incoming RPC_UNKNOWN packets.
> + * @skb: Incoming packet; size known to be large enough for the header.
> + * This function now owns the packet.
> + * @rpc: Information about the RPC corresponding to this packet. Must
> + * be locked by caller.
> + */
> +void homa_rpc_unknown_pkt(struct sk_buff *skb, struct homa_rpc *rpc)
> + __must_hold(rpc->bucket->lock)
> +{
> + if (homa_is_client(rpc->id)) {
> + if (rpc->state == RPC_OUTGOING) {
> + int tx_end = homa_rpc_tx_end(rpc);
> +
> + /* It appears that everything we've already transmitted
> + * has been lost; retransmit it.
> + */
> + homa_resend_data(rpc, 0, tx_end);
> + goto done;
> + }
> + } else {
> + homa_rpc_end(rpc);
> + }
> +done:
> + kfree_skb(skb);
> +}
> +
> +/**
> + * homa_need_ack_pkt() - Handler for incoming NEED_ACK packets
> + * @skb: Incoming packet; size already verified large enough for header.
> + * This function now owns the packet.
> + * @hsk: Socket on which the packet was received.
> + * @rpc: The RPC named in the packet header, or NULL if no such
> + * RPC exists. The RPC has been locked by the caller.
> + */
> +void homa_need_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk,
> + struct homa_rpc *rpc)
> + __must_hold(rpc->bucket->lock)
> +{
> + struct homa_common_hdr *h = (struct homa_common_hdr *)skb->data;
> + const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
> + u64 id = homa_local_id(h->sender_id);
> + struct homa_ack_hdr ack;
> + struct homa_peer *peer;
> +
> + /* Don't ack if it's not safe for the peer to purge its state
> + * for this RPC (the RPC still exists and we haven't received
> + * the entire response), or if we can't find peer info.
> + */
> + if (rpc && (rpc->state != RPC_INCOMING ||
> + rpc->msgin.bytes_remaining)) {
> + homa_request_retrans(rpc);
> + goto done;
> + } else {
> + peer = homa_peer_get(hsk, &saddr);
> + if (IS_ERR(peer))
> + goto done;
> + }
> +
> + /* Send an ACK for this RPC. At the same time, include all of the
> + * other acks available for the peer. Note: can't use rpc below,
> + * since it may be NULL.
> + */
> + ack.common.type = ACK;
> + ack.common.sport = h->dport;
> + ack.common.dport = h->sport;
> + ack.common.sender_id = cpu_to_be64(id);
> + ack.num_acks = htons(homa_peer_get_acks(peer,
> + HOMA_MAX_ACKS_PER_PKT,
> + ack.acks));
> + __homa_xmit_control(&ack, sizeof(ack), peer, hsk);
> + homa_peer_release(peer);
> +
> +done:
> + kfree_skb(skb);
Please double check all your kfree_skb() vs consume_skb()
perf record -a -e skb:kfree_skb sleep 60
vs
perf record -a -e skb:consume_skb sleep 60
As a bonus, you can use kfree_skb_reason(skb, some_reason) for future
bug hunting
> +}
> +
> +/**
> + * homa_ack_pkt() - Handler for incoming ACK packets
> + * @skb: Incoming packet; size already verified large enough for header.
> + * This function now owns the packet.
> + * @hsk: Socket on which the packet was received.
> + * @rpc: The RPC named in the packet header, or NULL if no such
> + * RPC exists. The RPC lock will be dead on return.
> + */
> +void homa_ack_pkt(struct sk_buff *skb, struct homa_sock *hsk,
> + struct homa_rpc *rpc)
> + __must_hold(rpc->bucket->lock)
> +{
> + const struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
> + struct homa_ack_hdr *h = (struct homa_ack_hdr *)skb->data;
> + int i, count;
> +
> + if (rpc)
> + homa_rpc_end(rpc);
> +
> + count = ntohs(h->num_acks);
> + if (count > 0) {
> + if (rpc) {
> + /* Must temporarily release rpc's lock because
> + * homa_rpc_acked needs to acquire RPC locks.
> + */
> + homa_rpc_unlock(rpc);
> + for (i = 0; i < count; i++)
> + homa_rpc_acked(hsk, &saddr, &h->acks[i]);
> + homa_rpc_lock(rpc);
> + } else {
> + for (i = 0; i < count; i++)
> + homa_rpc_acked(hsk, &saddr, &h->acks[i]);
> + }
> + }
> + kfree_skb(skb);
> +}
> +
> +/**
> + * homa_wait_private() - Waits until the response has been received for
> + * a specific RPC or the RPC has failed with an error.
> + * @rpc: RPC to wait for; an error will be returned if the RPC is
> + * not a client RPC or not private. Must be locked by caller.
> + * @nonblocking: Nonzero means return immediately if @rpc not ready.
> + * Return: 0 means that @rpc is ready for attention: either its response
> + * has been received or it has an unrecoverable error such as
> + * ETIMEDOUT (in rpc->error). Nonzero means some other error
> + * (such as EINTR or EINVAL) occurred before @rpc became ready
> + * for attention; in this case the return value is a negative
> + * errno.
> + */
> +int homa_wait_private(struct homa_rpc *rpc, int nonblocking)
> + __must_hold(rpc->bucket->lock)
> +{
> + struct homa_interest interest;
> + int result;
> +
> + if (!(atomic_read(&rpc->flags) & RPC_PRIVATE))
> + return -EINVAL;
> +
> + /* Each iteration through this loop waits until rpc needs attention
> + * in some way (e.g. packets have arrived), then deals with that need
> + * (e.g. copy to user space). It may take many iterations until the
> + * RPC is ready for the application.
> + */
> + while (1) {
> + result = 0;
> + if (!rpc->error)
> + rpc->error = homa_copy_to_user(rpc);
> + if (rpc->error)
> + break;
> + if (rpc->msgin.length >= 0 &&
> + rpc->msgin.bytes_remaining == 0 &&
> + skb_queue_len(&rpc->msgin.packets) == 0)
> + break;
> +
> + if (nonblocking) {
> + result = -EAGAIN;
> + break;
> + }
> +
> + result = homa_interest_init_private(&interest, rpc);
> + if (result != 0)
> + break;
> +
> + homa_rpc_unlock(rpc);
> + result = homa_interest_wait(&interest);
> +
> + atomic_or(APP_NEEDS_LOCK, &rpc->flags);
> + homa_rpc_lock(rpc);
> + atomic_andnot(APP_NEEDS_LOCK, &rpc->flags);
reuse the helper.
> + homa_interest_unlink_private(&interest);
> +
> + /* Abort on error, but if the interest actually got ready
> + * in the meantime the ignore the error (loop back around
> + * to process the RPC).
> + */
> + if (result != 0 && atomic_read(&interest.ready) == 0)
> + break;
> + }
> +
> + return result;
> +}
> +
> +/**
> + * homa_wait_shared() - Wait for the completion of any non-private
> + * incoming message on a socket.
> + * @hsk: Socket on which to wait. Must not be locked.
> + * @nonblocking: Nonzero means return immediately if no RPC is ready.
> + *
> + * Return: Pointer to an RPC with a complete incoming message or nonzero
> + * error field, or a negative errno (usually -EINTR). If an RPC
> + * is returned it will be locked and referenced; the caller
> + * must release the lock and the reference.
> + */
> +struct homa_rpc *homa_wait_shared(struct homa_sock *hsk, int nonblocking)
> + __cond_acquires(rpc->bucket->lock)
> +{
> + struct homa_interest interest;
> + struct homa_rpc *rpc;
> + int result;
> +
> + INIT_LIST_HEAD(&interest.links);
> + init_waitqueue_head(&interest.wait_queue);
> + /* Each iteration through this loop waits until an RPC needs attention
> + * in some way (e.g. packets have arrived), then deals with that need
> + * (e.g. copy to user space). It may take many iterations until an
> + * RPC is ready for the application.
> + */
> + while (1) {
> + homa_sock_lock(hsk);
> + if (hsk->shutdown) {
> + rpc = ERR_PTR(-ESHUTDOWN);
> + homa_sock_unlock(hsk);
> + goto done;
> + }
> + if (!list_empty(&hsk->ready_rpcs)) {
> + rpc = list_first_entry(&hsk->ready_rpcs,
> + struct homa_rpc,
> + ready_links);
> + homa_rpc_hold(rpc);
> + list_del_init(&rpc->ready_links);
> + if (!list_empty(&hsk->ready_rpcs)) {
> + /* There are still more RPCs available, so
> + * let Linux know.
> + */
> + hsk->sock.sk_data_ready(&hsk->sock);
> + }
> + homa_sock_unlock(hsk);
> + } else if (nonblocking) {
> + rpc = ERR_PTR(-EAGAIN);
> + homa_sock_unlock(hsk);
> +
> + /* This is a good time to cleanup dead RPCS. */
> + homa_rpc_reap(hsk, false);
> + goto done;
> + } else {
> + homa_interest_init_shared(&interest, hsk);
> + homa_sock_unlock(hsk);
> + result = homa_interest_wait(&interest);
> +
> + if (result != 0) {
> + int ready;
> +
> + /* homa_interest_wait returned an error, so we
> + * have to do two things. First, unlink the
> + * interest from the socket. Second, check to
> + * see if in the meantime the interest received
> + * a handoff. If so, ignore the error. Very
> + * important to hold the socket lock while
> + * checking, in order to eliminate races with
> + * homa_rpc_handoff.
> + */
> + homa_sock_lock(hsk);
> + homa_interest_unlink_shared(&interest);
> + ready = atomic_read(&interest.ready);
> + homa_sock_unlock(hsk);
> + if (ready == 0) {
> + rpc = ERR_PTR(result);
> + goto done;
> + }
> + }
> +
> + rpc = interest.rpc;
> + if (!rpc) {
> + rpc = ERR_PTR(-ESHUTDOWN);
> + goto done;
> + }
> + }
> +
> + atomic_or(APP_NEEDS_LOCK, &rpc->flags);
> + homa_rpc_lock(rpc);
> + atomic_andnot(APP_NEEDS_LOCK, &rpc->flags);
Reuse the helper here.
> + if (!rpc->error)
> + rpc->error = homa_copy_to_user(rpc);
> + if (rpc->error) {
> + if (rpc->state != RPC_DEAD)
> + break;
> + } else if (rpc->msgin.bytes_remaining == 0 &&
> + skb_queue_len(&rpc->msgin.packets) == 0)
> + break;
> + homa_rpc_put(rpc);
> + homa_rpc_unlock(rpc);
> + }
> +
> +done:
> + return rpc;
> +}
> +
> +/**
> + * homa_rpc_handoff() - This function is called when the input message for
> + * an RPC is ready for attention from a user thread. It notifies a waiting
> + * reader and/or queues the RPC, as appropriate.
> + * @rpc: RPC to handoff; must be locked.
> + */
> +void homa_rpc_handoff(struct homa_rpc *rpc)
> + __must_hold(rpc->bucket->lock)
> +{
> + struct homa_sock *hsk = rpc->hsk;
> + struct homa_interest *interest;
> +
> + if (atomic_read(&rpc->flags) & RPC_PRIVATE) {
> + homa_interest_notify_private(rpc);
> + return;
> + }
> +
> + /* Shared RPC; if there is a waiting thread, hand off the RPC;
> + * otherwise enqueue it.
> + */
> + homa_sock_lock(hsk);
> + if (hsk->shutdown) {
> + homa_sock_unlock(hsk);
> + return;
> + }
> + if (!list_empty(&hsk->interests)) {
> + interest = list_first_entry(&hsk->interests,
> + struct homa_interest, links);
> + list_del_init(&interest->links);
> + interest->rpc = rpc;
> + homa_rpc_hold(rpc);
> + atomic_set_release(&interest->ready, 1);
> + wake_up(&interest->wait_queue);
> + } else if (list_empty(&rpc->ready_links)) {
> + list_add_tail(&rpc->ready_links, &hsk->ready_rpcs);
> + hsk->sock.sk_data_ready(&hsk->sock);
> + }
> + homa_sock_unlock(hsk);
> +}
> +
> --
> 2.43.0
>
Powered by blists - more mailing lists