[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250502233729.64220-11-ouster@cs.stanford.edu>
Date: Fri, 2 May 2025 16:37:23 -0700
From: John Ousterhout <ouster@...stanford.edu>
To: netdev@...r.kernel.org
Cc: pabeni@...hat.com,
edumazet@...gle.com,
horms@...nel.org,
kuba@...nel.org,
John Ousterhout <ouster@...stanford.edu>
Subject: [PATCH net-next v8 10/15] net: homa: create homa_outgoing.c
This file does most of the work of transmitting outgoing messages.
It is responsible for copying data from user space into skbs and
it also implements the "pacer", which throttles output if necessary
to prevent queue buildup in the NIC. Note: the pacer eventually
needs to be replaced with a Homa-specific qdisc, which can better
manage simultaneous transmissions by Homa and TCP. The current
implementation can coexist with TCP and doesn't harm TCP, but
Homa's latency suffers when TCP runs concurrently.
Signed-off-by: John Ousterhout <ouster@...stanford.edu>
---
Changes for v7:
* Implement accounting for bytes in tx skbs
* Rename UNKNOWN packet type to RPC_UNKNOWN
* Use new RPC reference counts; eliminates need for RCU
* Remove locker argument from locking functions
* Use u64 and __u64 properly
* Fix incorrect skb check in homa_message_out_fill
---
net/homa/homa_outgoing.c | 575 +++++++++++++++++++++++++++++++++++++++
1 file changed, 575 insertions(+)
create mode 100644 net/homa/homa_outgoing.c
diff --git a/net/homa/homa_outgoing.c b/net/homa/homa_outgoing.c
new file mode 100644
index 000000000000..c2d32af56d8a
--- /dev/null
+++ b/net/homa/homa_outgoing.c
@@ -0,0 +1,575 @@
+// SPDX-License-Identifier: BSD-2-Clause
+
+/* This file contains functions related to the sender side of message
+ * transmission. It also contains utility functions for sending packets.
+ */
+
+#include "homa_impl.h"
+#include "homa_pacer.h"
+#include "homa_peer.h"
+#include "homa_rpc.h"
+#include "homa_wire.h"
+#include "homa_stub.h"
+
+/**
+ * homa_message_out_init() - Initialize rpc->msgout.
+ * @rpc: RPC whose output message should be initialized. Must be
+ * locked by caller.
+ * @length: Number of bytes that will eventually be in rpc->msgout.
+ */
+void homa_message_out_init(struct homa_rpc *rpc, int length)
+ __must_hold(rpc_bucket_lock)
+{
+ memset(&rpc->msgout, 0, sizeof(rpc->msgout));
+ rpc->msgout.length = length;
+ rpc->msgout.next_xmit = &rpc->msgout.packets;
+ rpc->msgout.init_ns = sched_clock();
+}
+
+/**
+ * homa_fill_data_interleaved() - This function is invoked to fill in the
+ * part of a data packet after the initial header, when GSO is being used.
+ * homa_seg_hdrs must be interleaved with the data to provide the correct
+ * offset for each segment.
+ * @rpc: RPC whose output message is being created. Must be
+ * locked by caller.
+ * @skb: The packet being filled. The initial homa_data_hdr was
+ * created and initialized by the caller and the
+ * homa_skb_info has been filled in with the packet geometry.
+ * @iter: Describes location(s) of (remaining) message data in user
+ * space.
+ * Return: Either a negative errno or 0 (for success).
+ */
+int homa_fill_data_interleaved(struct homa_rpc *rpc, struct sk_buff *skb,
+ struct iov_iter *iter)
+ __must_hold(rpc_bucket_lock)
+{
+ struct homa_skb_info *homa_info = homa_get_skb_info(skb);
+ int seg_length = homa_info->seg_length;
+ int bytes_left = homa_info->data_bytes;
+ int offset = homa_info->offset;
+ int err;
+
+ /* Each iteration of the following loop adds info for one packet,
+ * which includes a homa_seg_hdr followed by the data for that
+ * segment. The first homa_seg_hdr was already added by the caller.
+ */
+ while (1) {
+ struct homa_seg_hdr seg;
+
+ if (bytes_left < seg_length)
+ seg_length = bytes_left;
+ err = homa_skb_append_from_iter(rpc->hsk->homa, skb, iter,
+ seg_length);
+ if (err != 0)
+ return err;
+ bytes_left -= seg_length;
+ offset += seg_length;
+
+ if (bytes_left == 0)
+ break;
+
+ seg.offset = htonl(offset);
+ err = homa_skb_append_to_frag(rpc->hsk->homa, skb, &seg,
+ sizeof(seg));
+ if (err != 0)
+ return err;
+ }
+ return 0;
+}
+
+/**
+ * homa_new_data_packet() - Allocate a new sk_buff and fill it with a Homa
+ * data packet. The resulting packet will be a GSO packet that will eventually
+ * be segmented by the NIC.
+ * @rpc: RPC that packet will belong to (msgout must have been
+ * initialized). Must be locked by caller.
+ * @iter: Describes location(s) of (remaining) message data in user
+ * space.
+ * @offset: Offset in the message of the first byte of data in this
+ * packet.
+ * @length: How many bytes of data to include in the skb. Caller must
+ * ensure that this amount of data isn't too much for a
+ * well-formed GSO packet, and that iter has at least this
+ * much data.
+ * @max_seg_data: Maximum number of bytes of message data that can go in
+ * a single segment of the GSO packet.
+ * Return: A pointer to the new packet, or a negative errno.
+ */
+struct sk_buff *homa_new_data_packet(struct homa_rpc *rpc,
+ struct iov_iter *iter, int offset,
+ int length, int max_seg_data)
+ __must_hold(rpc_bucket_lock)
+{
+ struct homa_skb_info *homa_info;
+ struct homa_data_hdr *h;
+ struct sk_buff *skb;
+ int err, gso_size;
+ u64 segs;
+
+ segs = length + max_seg_data - 1;
+ do_div(segs, max_seg_data);
+
+ /* Initialize the overall skb. */
+ skb = homa_skb_new_tx(sizeof32(struct homa_data_hdr) + length +
+ (segs - 1) * sizeof32(struct homa_seg_hdr));
+ if (!skb)
+ return ERR_PTR(-ENOMEM);
+
+ /* Fill in the Homa header (which will be replicated in every
+ * network packet by GSO).
+ */
+ h = (struct homa_data_hdr *)skb_put(skb, sizeof(struct homa_data_hdr));
+ h->common.sport = htons(rpc->hsk->port);
+ h->common.dport = htons(rpc->dport);
+ h->common.sequence = htonl(offset);
+ h->common.type = DATA;
+ homa_set_doff(h, sizeof(struct homa_data_hdr));
+ h->common.checksum = 0;
+ h->common.sender_id = cpu_to_be64(rpc->id);
+ h->message_length = htonl(rpc->msgout.length);
+ h->ack.client_id = 0;
+ homa_peer_get_acks(rpc->peer, 1, &h->ack);
+ h->retransmit = 0;
+ h->seg.offset = htonl(offset);
+
+ homa_info = homa_get_skb_info(skb);
+ homa_info->next_skb = NULL;
+ homa_info->wire_bytes = length + segs * (sizeof(struct homa_data_hdr)
+ + rpc->hsk->ip_header_length + HOMA_ETH_OVERHEAD);
+ homa_info->data_bytes = length;
+ homa_info->seg_length = max_seg_data;
+ homa_info->offset = offset;
+
+ if (segs > 1) {
+ homa_set_doff(h, sizeof(struct homa_data_hdr) -
+ sizeof32(struct homa_seg_hdr));
+ gso_size = max_seg_data + sizeof(struct homa_seg_hdr);
+ err = homa_fill_data_interleaved(rpc, skb, iter);
+ } else {
+ gso_size = max_seg_data;
+ err = homa_skb_append_from_iter(rpc->hsk->homa, skb, iter,
+ length);
+ }
+ if (err)
+ goto error;
+
+ if (segs > 1) {
+ skb_shinfo(skb)->gso_segs = segs;
+ skb_shinfo(skb)->gso_size = gso_size;
+
+ /* It's unclear what gso_type should be used to force software
+ * GSO; the value below seems to work...
+ */
+ skb_shinfo(skb)->gso_type =
+ rpc->hsk->homa->gso_force_software ? 0xd : SKB_GSO_TCPV6;
+ }
+ return skb;
+
+error:
+ homa_skb_free_tx(rpc->hsk->homa, skb);
+ return ERR_PTR(err);
+}
+
+/**
+ * homa_message_out_fill() - Initializes information for sending a message
+ * for an RPC (either request or response); copies the message data from
+ * user space and (possibly) begins transmitting the message.
+ * @rpc: RPC for which to send message; this function must not
+ * previously have been called for the RPC. Must be locked. The RPC
+ * will be unlocked while copying data, but will be locked again
+ * before returning.
+ * @iter: Describes location(s) of message data in user space.
+ * @xmit: Nonzero means this method should start transmitting packets;
+ * transmission will be overlapped with copying from user space.
+ * Zero means the caller will initiate transmission after this
+ * function returns.
+ *
+ * Return: 0 for success, or a negative errno for failure. It is possible
+ * for the RPC to be freed while this function is active. If that
+ * happens, copying will cease, -EINVAL will be returned, and
+ * rpc->state will be RPC_DEAD.
+ */
+int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit)
+ __must_hold(rpc_bucket_lock)
+{
+ /* Geometry information for packets:
+ * mtu: largest size for an on-the-wire packet (including
+ * all headers through IP header, but not Ethernet
+ * header).
+ * max_seg_data: largest amount of Homa message data that fits
+ * in an on-the-wire packet (after segmentation).
+ * max_gso_data: largest amount of Homa message data that fits
+ * in a GSO packet (before segmentation).
+ */
+ int mtu, max_seg_data, max_gso_data;
+
+ struct sk_buff **last_link;
+ struct dst_entry *dst;
+ u64 segs_per_gso;
+ int overlap_xmit;
+
+ /* Bytes of the message that haven't yet been copied into skbs. */
+ int bytes_left;
+
+ int gso_size;
+ int err;
+
+ homa_rpc_hold(rpc);
+ if (unlikely(iter->count > HOMA_MAX_MESSAGE_LENGTH ||
+ iter->count == 0)) {
+ err = -EINVAL;
+ goto error;
+ }
+ homa_message_out_init(rpc, iter->count);
+
+ /* Compute the geometry of packets. */
+ dst = homa_get_dst(rpc->peer, rpc->hsk);
+ mtu = dst_mtu(dst);
+ max_seg_data = mtu - rpc->hsk->ip_header_length
+ - sizeof(struct homa_data_hdr);
+ gso_size = dst->dev->gso_max_size;
+ if (gso_size > rpc->hsk->homa->max_gso_size)
+ gso_size = rpc->hsk->homa->max_gso_size;
+
+ /* Round gso_size down to an even # of mtus. */
+ segs_per_gso = gso_size - rpc->hsk->ip_header_length -
+ sizeof(struct homa_data_hdr) +
+ sizeof(struct homa_seg_hdr);
+ do_div(segs_per_gso, max_seg_data +
+ sizeof(struct homa_seg_hdr));
+ if (segs_per_gso == 0)
+ segs_per_gso = 1;
+ max_gso_data = segs_per_gso * max_seg_data;
+
+ overlap_xmit = rpc->msgout.length > 2 * max_gso_data;
+ homa_skb_stash_pages(rpc->hsk->homa, rpc->msgout.length);
+
+ /* Each iteration of the loop below creates one GSO packet. */
+ last_link = &rpc->msgout.packets;
+ for (bytes_left = rpc->msgout.length; bytes_left > 0; ) {
+ int skb_data_bytes, offset;
+ struct sk_buff *skb;
+
+ homa_rpc_unlock(rpc);
+ skb_data_bytes = max_gso_data;
+ offset = rpc->msgout.length - bytes_left;
+ if (skb_data_bytes > bytes_left)
+ skb_data_bytes = bytes_left;
+ skb = homa_new_data_packet(rpc, iter, offset, skb_data_bytes,
+ max_seg_data);
+ if (IS_ERR(skb)) {
+ err = PTR_ERR(skb);
+ homa_rpc_lock(rpc);
+ goto error;
+ }
+ bytes_left -= skb_data_bytes;
+
+ homa_rpc_lock(rpc);
+ if (rpc->state == RPC_DEAD) {
+ /* RPC was freed while we were copying. */
+ err = -EINVAL;
+ homa_skb_free_tx(rpc->hsk->homa, skb);
+ goto error;
+ }
+ *last_link = skb;
+ last_link = &(homa_get_skb_info(skb)->next_skb);
+ *last_link = NULL;
+ rpc->msgout.num_skbs++;
+ rpc->msgout.skb_memory += skb->truesize;
+ rpc->msgout.copied_from_user = rpc->msgout.length - bytes_left;
+ if (overlap_xmit && list_empty(&rpc->throttled_links) &&
+ xmit)
+ homa_pacer_manage_rpc(rpc);
+ }
+ refcount_add(rpc->msgout.skb_memory, &rpc->hsk->sock.sk_wmem_alloc);
+ homa_rpc_put(rpc);
+ if (!overlap_xmit && xmit)
+ homa_xmit_data(rpc, false);
+ return 0;
+
+error:
+ refcount_add(rpc->msgout.skb_memory, &rpc->hsk->sock.sk_wmem_alloc);
+ homa_rpc_put(rpc);
+ return err;
+}
+
+/**
+ * homa_xmit_control() - Send a control packet to the other end of an RPC.
+ * @type: Packet type, such as DATA.
+ * @contents: Address of buffer containing the contents of the packet.
+ * Only information after the common header must be valid;
+ * the common header will be filled in by this function.
+ * @length: Length of @contents (including the common header).
+ * @rpc: The packet will go to the socket that handles the other end
+ * of this RPC. Addressing info for the packet, including all of
+ * the fields of homa_common_hdr except type, will be set from this.
+ * Caller must hold either the lock or a reference.
+ *
+ * Return: Either zero (for success), or a negative errno value if there
+ * was a problem.
+ */
+int homa_xmit_control(enum homa_packet_type type, void *contents,
+ size_t length, struct homa_rpc *rpc)
+{
+ struct homa_common_hdr *h = contents;
+
+ h->type = type;
+ h->sport = htons(rpc->hsk->port);
+ h->dport = htons(rpc->dport);
+ h->sender_id = cpu_to_be64(rpc->id);
+ return __homa_xmit_control(contents, length, rpc->peer, rpc->hsk);
+}
+
+/**
+ * __homa_xmit_control() - Lower-level version of homa_xmit_control: sends
+ * a control packet.
+ * @contents: Address of buffer containing the contents of the packet.
+ * The caller must have filled in all of the information,
+ * including the common header.
+ * @length: Length of @contents.
+ * @peer: Destination to which the packet will be sent.
+ * @hsk: Socket via which the packet will be sent.
+ *
+ * Return: Either zero (for success), or a negative errno value if there
+ * was a problem.
+ */
+int __homa_xmit_control(void *contents, size_t length, struct homa_peer *peer,
+ struct homa_sock *hsk)
+{
+ struct homa_common_hdr *h;
+ struct dst_entry *dst;
+ struct sk_buff *skb;
+ int extra_bytes;
+ int result;
+
+ dst = homa_get_dst(peer, hsk);
+ skb = homa_skb_new_tx(HOMA_MAX_HEADER);
+ if (unlikely(!skb))
+ return -ENOBUFS;
+ dst_hold(dst);
+ skb_dst_set(skb, dst);
+
+ h = skb_put(skb, length);
+ memcpy(h, contents, length);
+ extra_bytes = HOMA_MIN_PKT_LENGTH - length;
+ if (extra_bytes > 0)
+ memset(skb_put(skb, extra_bytes), 0, extra_bytes);
+ skb->ooo_okay = 1;
+ skb_get(skb);
+ if (hsk->inet.sk.sk_family == AF_INET6)
+ result = ip6_xmit(&hsk->inet.sk, skb, &peer->flow.u.ip6, 0,
+ NULL, 0, 0);
+ else
+ result = ip_queue_xmit(&hsk->inet.sk, skb, &peer->flow);
+ if (unlikely(result != 0)) {
+ /* It appears that ip*_xmit frees skbuffs after
+ * errors; the following code is to raise an alert if
+ * this isn't actually the case. The extra skb_get above
+ * and kfree_skb call below are needed to do the check
+ * accurately (otherwise the buffer could be freed and
+ * its memory used for some other purpose, resulting in
+ * a bogus "reference count").
+ */
+ if (refcount_read(&skb->users) > 1) {
+ if (hsk->inet.sk.sk_family == AF_INET6)
+ pr_notice("ip6_xmit didn't free Homa control packet (type %d) after error %d\n",
+ h->type, result);
+ else
+ pr_notice("ip_queue_xmit didn't free Homa control packet (type %d) after error %d\n",
+ h->type, result);
+ }
+ }
+ kfree_skb(skb);
+ return result;
+}
+
+/**
+ * homa_xmit_unknown() - Send an RPC_UNKNOWN packet to a peer.
+ * @skb: Buffer containing an incoming packet; identifies the peer to
+ * which the RPC_UNKNOWN packet should be sent.
+ * @hsk: Socket that should be used to send the RPC_UNKNOWN packet.
+ */
+void homa_xmit_unknown(struct sk_buff *skb, struct homa_sock *hsk)
+{
+ struct homa_common_hdr *h = (struct homa_common_hdr *)skb->data;
+ struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
+ struct homa_rpc_unknown_hdr unknown;
+ struct homa_peer *peer;
+
+ unknown.common.sport = h->dport;
+ unknown.common.dport = h->sport;
+ unknown.common.type = RPC_UNKNOWN;
+ unknown.common.sender_id = cpu_to_be64(homa_local_id(h->sender_id));
+ peer = homa_peer_find(hsk->homa->peers, &saddr, &hsk->inet);
+ if (!IS_ERR(peer))
+ __homa_xmit_control(&unknown, sizeof(unknown), peer, hsk);
+}
+
+/**
+ * homa_xmit_data() - If an RPC has outbound data packets that are permitted
+ * to be transmitted according to the scheduling mechanism, arrange for
+ * them to be sent (some may be sent immediately; others may be sent
+ * later by the pacer thread).
+ * @rpc: RPC to check for transmittable packets. Must be locked by
+ * caller. Note: this function will release the RPC lock while
+ * passing packets through the RPC stack, then reacquire it
+ * before returning. It is possible that the RPC gets freed
+ * when the lock isn't held, in which case the state will
+ * be RPC_DEAD on return.
+ * @force: True means send at least one packet, even if the NIC queue
+ * is too long. False means that zero packets may be sent, if
+ * the NIC queue is sufficiently long.
+ */
+void homa_xmit_data(struct homa_rpc *rpc, bool force)
+ __must_hold(rpc_bucket_lock)
+{
+ struct homa *homa = rpc->hsk->homa;
+
+ homa_rpc_hold(rpc);
+ while (*rpc->msgout.next_xmit) {
+ struct sk_buff *skb = *rpc->msgout.next_xmit;
+
+ if ((rpc->msgout.length - rpc->msgout.next_xmit_offset)
+ >= homa->pacer->throttle_min_bytes) {
+ if (!homa_pacer_check_nic_q(homa->pacer, skb, force)) {
+ homa_pacer_manage_rpc(rpc);
+ break;
+ }
+ }
+
+ rpc->msgout.next_xmit = &(homa_get_skb_info(skb)->next_skb);
+ rpc->msgout.next_xmit_offset +=
+ homa_get_skb_info(skb)->data_bytes;
+
+ homa_rpc_hold(rpc);
+ homa_rpc_unlock(rpc);
+ skb_get(skb);
+ __homa_xmit_data(skb, rpc);
+ force = false;
+ homa_rpc_lock(rpc);
+ homa_rpc_put(rpc);
+ if (rpc->state == RPC_DEAD)
+ break;
+ }
+ homa_rpc_put(rpc);
+}
+
+/**
+ * __homa_xmit_data() - Handles packet transmission stuff that is common
+ * to homa_xmit_data and homa_resend_data.
+ * @skb: Packet to be sent. The packet will be freed after transmission
+ * (and also if errors prevented transmission).
+ * @rpc: Information about the RPC that the packet belongs to.
+ */
+void __homa_xmit_data(struct sk_buff *skb, struct homa_rpc *rpc)
+{
+ struct dst_entry *dst;
+
+ dst = homa_get_dst(rpc->peer, rpc->hsk);
+ dst_hold(dst);
+ skb_dst_set(skb, dst);
+
+ skb->ooo_okay = 1;
+ skb->ip_summed = CHECKSUM_PARTIAL;
+ skb->csum_start = skb_transport_header(skb) - skb->head;
+ skb->csum_offset = offsetof(struct homa_common_hdr, checksum);
+ if (rpc->hsk->inet.sk.sk_family == AF_INET6)
+ ip6_xmit(&rpc->hsk->inet.sk, skb, &rpc->peer->flow.u.ip6,
+ 0, NULL, 0, 0);
+ else
+ ip_queue_xmit(&rpc->hsk->inet.sk, skb, &rpc->peer->flow);
+}
+
+/**
+ * homa_resend_data() - This function is invoked as part of handling RESEND
+ * requests. It retransmits the packet(s) containing a given range of bytes
+ * from a message.
+ * @rpc: RPC for which data should be resent.
+ * @start: Offset within @rpc->msgout of the first byte to retransmit.
+ * @end: Offset within @rpc->msgout of the byte just after the last one
+ * to retransmit.
+ */
+void homa_resend_data(struct homa_rpc *rpc, int start, int end)
+ __must_hold(rpc_bucket_lock)
+{
+ struct homa_skb_info *homa_info;
+ struct sk_buff *skb;
+
+ if (end <= start)
+ return;
+
+ /* Each iteration of this loop checks one packet in the message
+ * to see if it contains segments that need to be retransmitted.
+ */
+ for (skb = rpc->msgout.packets; skb; skb = homa_info->next_skb) {
+ int seg_offset, offset, seg_length, data_left;
+ struct homa_data_hdr *h;
+
+ homa_info = homa_get_skb_info(skb);
+ offset = homa_info->offset;
+ if (offset >= end)
+ break;
+ if (start >= (offset + homa_info->data_bytes))
+ continue;
+
+ offset = homa_info->offset;
+ seg_offset = sizeof32(struct homa_data_hdr);
+ data_left = homa_info->data_bytes;
+ if (skb_shinfo(skb)->gso_segs <= 1) {
+ seg_length = data_left;
+ } else {
+ seg_length = homa_info->seg_length;
+ h = (struct homa_data_hdr *)skb_transport_header(skb);
+ }
+ for ( ; data_left > 0; data_left -= seg_length,
+ offset += seg_length,
+ seg_offset += skb_shinfo(skb)->gso_size) {
+ struct homa_skb_info *new_homa_info;
+ struct sk_buff *new_skb;
+ int err;
+
+ if (seg_length > data_left)
+ seg_length = data_left;
+
+ if (end <= offset)
+ goto resend_done;
+ if ((offset + seg_length) <= start)
+ continue;
+
+ /* This segment must be retransmitted. */
+ new_skb = homa_skb_new_tx(sizeof(struct homa_data_hdr)
+ + seg_length);
+ if (unlikely(!new_skb))
+ goto resend_done;
+ h = __skb_put_data(new_skb, skb_transport_header(skb),
+ sizeof32(struct homa_data_hdr));
+ h->common.sequence = htonl(offset);
+ h->seg.offset = htonl(offset);
+ h->retransmit = 1;
+ err = homa_skb_append_from_skb(rpc->hsk->homa, new_skb,
+ skb, seg_offset,
+ seg_length);
+ if (err != 0) {
+ pr_err("%s got error %d from homa_skb_append_from_skb\n",
+ __func__, err);
+ kfree_skb(new_skb);
+ goto resend_done;
+ }
+
+ new_homa_info = homa_get_skb_info(new_skb);
+ new_homa_info->wire_bytes = rpc->hsk->ip_header_length
+ + sizeof(struct homa_data_hdr)
+ + seg_length + HOMA_ETH_OVERHEAD;
+ new_homa_info->data_bytes = seg_length;
+ new_homa_info->seg_length = seg_length;
+ new_homa_info->offset = offset;
+ homa_pacer_check_nic_q(rpc->hsk->homa->pacer,
+ new_skb, true);
+ __homa_xmit_data(new_skb, rpc);
+ }
+ }
+
+resend_done:
+ return;
+}
--
2.43.0
Powered by blists - more mailing lists