lists.openwall.net   lists  /  announce  owl-users  owl-dev  john-users  john-dev  passwdqc-users  yescrypt  popa3d-users  /  oss-security  kernel-hardening  musl  sabotage  tlsify  passwords  /  crypt-dev  xvendor  /  Bugtraq  Full-Disclosure  linux-kernel  linux-netdev  linux-ext4  linux-hardening  linux-cve-announce  PHC 
Open Source and information security mailing list archives
 
Hash Suite: Windows password security audit tool. GUI, reports in PDF.
[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <20250526042819.2526-11-ouster@cs.stanford.edu>
Date: Sun, 25 May 2025 21:28:12 -0700
From: John Ousterhout <ouster@...stanford.edu>
To: netdev@...r.kernel.org
Cc: pabeni@...hat.com,
	edumazet@...gle.com,
	horms@...nel.org,
	kuba@...nel.org,
	John Ousterhout <ouster@...stanford.edu>
Subject: [PATCH net-next v9 10/15] net: homa: create homa_outgoing.c

This file does most of the work of transmitting outgoing messages.
It is also responsible for copying data from user space into skbs.

Signed-off-by: John Ousterhout <ouster@...stanford.edu>

---
Changes for v9:
* Use new homa_clock abstraction layer
* Various name improvements (e.g. use "alloc" instead of "new" for functions
  that allocate memory)
* Eliminate sizeof32 define: use sizeof instead

Changes for v7:
* Implement accounting for bytes in tx skbs
* Rename UNKNOWN packet type to RPC_UNKNOWN
* Use new RPC reference counts; eliminates need for RCU
* Remove locker argument from locking functions
* Use u64 and __u64 properly
* Fix incorrect skb check in homa_message_out_fill
---
 net/homa/homa_impl.h     |  13 +
 net/homa/homa_outgoing.c | 570 +++++++++++++++++++++++++++++++++++++++
 2 files changed, 583 insertions(+)
 create mode 100644 net/homa/homa_outgoing.c

diff --git a/net/homa/homa_impl.h b/net/homa/homa_impl.h
index 47255c764c69..caf914f08c49 100644
--- a/net/homa/homa_impl.h
+++ b/net/homa/homa_impl.h
@@ -385,12 +385,25 @@ static inline bool homa_make_header_avl(struct sk_buff *skb)

 extern unsigned int homa_net_id;

+int      homa_fill_data_interleaved(struct homa_rpc *rpc,
+				    struct sk_buff *skb, struct iov_iter *iter);
+int      homa_message_out_fill(struct homa_rpc *rpc,
+			       struct iov_iter *iter, int xmit);
+void     homa_message_out_init(struct homa_rpc *rpc, int length);
 void     homa_rpc_handoff(struct homa_rpc *rpc);
+struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc,
+				       struct iov_iter *iter, int offset,
+				       int length, int max_seg_data);
 int      homa_xmit_control(enum homa_packet_type type, void *contents,
 			   size_t length, struct homa_rpc *rpc);
+int      __homa_xmit_control(void *contents, size_t length,
+			     struct homa_peer *peer, struct homa_sock *hsk);
 void     homa_xmit_data(struct homa_rpc *rpc, bool force);
+void     homa_xmit_unknown(struct sk_buff *skb, struct homa_sock *hsk);

 int      homa_message_in_init(struct homa_rpc *rpc, int unsched);
+void     homa_resend_data(struct homa_rpc *rpc, int start, int end);
+void     __homa_xmit_data(struct sk_buff *skb, struct homa_rpc *rpc);

 /**
  * homa_net_from_net() - Return the struct homa_net associated with a particular
diff --git a/net/homa/homa_outgoing.c b/net/homa/homa_outgoing.c
new file mode 100644
index 000000000000..1ad415971cf2
--- /dev/null
+++ b/net/homa/homa_outgoing.c
@@ -0,0 +1,570 @@
+// SPDX-License-Identifier: BSD-2-Clause
+
+/* This file contains functions related to the sender side of message
+ * transmission. It also contains utility functions for sending packets.
+ */
+
+#include "homa_impl.h"
+#include "homa_pacer.h"
+#include "homa_peer.h"
+#include "homa_rpc.h"
+#include "homa_wire.h"
+#include "homa_stub.h"
+
+/**
+ * homa_message_out_init() - Initialize rpc->msgout.
+ * @rpc:       RPC whose output message should be initialized. Must be
+ *             locked by caller.
+ * @length:    Number of bytes that will eventually be in rpc->msgout.
+ */
+void homa_message_out_init(struct homa_rpc *rpc, int length)
+	__must_hold(rpc_bucket_lock)
+{
+	memset(&rpc->msgout, 0, sizeof(rpc->msgout));
+	rpc->msgout.length = length;
+	rpc->msgout.next_xmit = &rpc->msgout.packets;
+	rpc->msgout.init_time = homa_clock();
+}
+
+/**
+ * homa_fill_data_interleaved() - This function is invoked to fill in the
+ * part of a data packet after the initial header, when GSO is being used.
+ * homa_seg_hdrs must be interleaved with the data to provide the correct
+ * offset for each segment.
+ * @rpc:            RPC whose output message is being created. Must be
+ *                  locked by caller.
+ * @skb:            The packet being filled. The initial homa_data_hdr was
+ *                  created and initialized by the caller and the
+ *                  homa_skb_info has been filled in with the packet geometry.
+ * @iter:           Describes location(s) of (remaining) message data in user
+ *                  space.
+ * Return:          Either a negative errno or 0 (for success).
+ */
+int homa_fill_data_interleaved(struct homa_rpc *rpc, struct sk_buff *skb,
+			       struct iov_iter *iter)
+	__must_hold(rpc_bucket_lock)
+{
+	struct homa_skb_info *homa_info = homa_get_skb_info(skb);
+	int seg_length = homa_info->seg_length;
+	int bytes_left = homa_info->data_bytes;
+	int offset = homa_info->offset;
+	int err;
+
+	/* Each iteration of the following loop adds info for one packet,
+	 * which includes a homa_seg_hdr followed by the data for that
+	 * segment. The first homa_seg_hdr was already added by the caller.
+	 */
+	while (1) {
+		struct homa_seg_hdr seg;
+
+		if (bytes_left < seg_length)
+			seg_length = bytes_left;
+		err = homa_skb_append_from_iter(rpc->hsk->homa, skb, iter,
+						seg_length);
+		if (err != 0)
+			return err;
+		bytes_left -= seg_length;
+		offset += seg_length;
+
+		if (bytes_left == 0)
+			break;
+
+		seg.offset = htonl(offset);
+		err = homa_skb_append_to_frag(rpc->hsk->homa, skb, &seg,
+					      sizeof(seg));
+		if (err != 0)
+			return err;
+	}
+	return 0;
+}
+
+/**
+ * homa_tx_data_pkt_alloc() - Allocate a new sk_buff and fill it with an
+ * outgoing Homa data packet. The resulting packet will be a GSO packet
+ * that will eventually be segmented by the NIC.
+ * @rpc:          RPC that packet will belong to (msgout must have been
+ *                initialized). Must be locked by caller.
+ * @iter:         Describes location(s) of (remaining) message data in user
+ *                space.
+ * @offset:       Offset in the message of the first byte of data in this
+ *                packet.
+ * @length:       How many bytes of data to include in the skb. Caller must
+ *                ensure that this amount of data isn't too much for a
+ *                well-formed GSO packet, and that iter has at least this
+ *                much data.
+ * @max_seg_data: Maximum number of bytes of message data that can go in
+ *                a single segment of the GSO packet.
+ * Return: A pointer to the new packet, or a negative errno.
+ */
+struct sk_buff *homa_tx_data_pkt_alloc(struct homa_rpc *rpc,
+				       struct iov_iter *iter, int offset,
+				       int length, int max_seg_data)
+	__must_hold(rpc_bucket_lock)
+{
+	struct homa_skb_info *homa_info;
+	struct homa_data_hdr *h;
+	struct sk_buff *skb;
+	int err, gso_size;
+	u64 segs;
+
+	segs = length + max_seg_data - 1;
+	do_div(segs, max_seg_data);
+
+	/* Initialize the overall skb. */
+	skb = homa_skb_alloc_tx(sizeof(struct homa_data_hdr) + length +
+			      (segs - 1) * sizeof(struct homa_seg_hdr));
+	if (!skb)
+		return ERR_PTR(-ENOMEM);
+
+	/* Fill in the Homa header (which will be replicated in every
+	 * network packet by GSO).
+	 */
+	h = (struct homa_data_hdr *)skb_put(skb, sizeof(struct homa_data_hdr));
+	h->common.sport = htons(rpc->hsk->port);
+	h->common.dport = htons(rpc->dport);
+	h->common.sequence = htonl(offset);
+	h->common.type = DATA;
+	homa_set_doff(h, sizeof(struct homa_data_hdr));
+	h->common.checksum = 0;
+	h->common.sender_id = cpu_to_be64(rpc->id);
+	h->message_length = htonl(rpc->msgout.length);
+	h->ack.client_id = 0;
+	homa_peer_get_acks(rpc->peer, 1, &h->ack);
+	h->retransmit = 0;
+	h->seg.offset = htonl(offset);
+
+	homa_info = homa_get_skb_info(skb);
+	homa_info->next_skb = NULL;
+	homa_info->wire_bytes = length + segs * (sizeof(struct homa_data_hdr)
+			+  rpc->hsk->ip_header_length + HOMA_ETH_OVERHEAD);
+	homa_info->data_bytes = length;
+	homa_info->seg_length = max_seg_data;
+	homa_info->offset = offset;
+
+	if (segs > 1) {
+		homa_set_doff(h, sizeof(struct homa_data_hdr)  -
+				sizeof(struct homa_seg_hdr));
+		gso_size = max_seg_data + sizeof(struct homa_seg_hdr);
+		err = homa_fill_data_interleaved(rpc, skb, iter);
+	} else {
+		gso_size = max_seg_data;
+		err = homa_skb_append_from_iter(rpc->hsk->homa, skb, iter,
+						length);
+	}
+	if (err)
+		goto error;
+
+	if (segs > 1) {
+		skb_shinfo(skb)->gso_segs = segs;
+		skb_shinfo(skb)->gso_size = gso_size;
+
+		/* It's unclear what gso_type should be used to force software
+		 * GSO; the value below seems to work...
+		 */
+		skb_shinfo(skb)->gso_type =
+		    rpc->hsk->homa->gso_force_software ? 0xd : SKB_GSO_TCPV6;
+	}
+	return skb;
+
+error:
+	homa_skb_free_tx(rpc->hsk->homa, skb);
+	return ERR_PTR(err);
+}
+
+/**
+ * homa_message_out_fill() - Initializes information for sending a message
+ * for an RPC (either request or response); copies the message data from
+ * user space and (possibly) begins transmitting the message.
+ * @rpc:     RPC for which to send message; this function must not
+ *           previously have been called for the RPC. Must be locked. The RPC
+ *           will be unlocked while copying data, but will be locked again
+ *           before returning.
+ * @iter:    Describes location(s) of message data in user space.
+ * @xmit:    Nonzero means this method should start transmitting packets;
+ *           transmission will be overlapped with copying from user space.
+ *           Zero means the caller will initiate transmission after this
+ *           function returns.
+ *
+ * Return:   0 for success, or a negative errno for failure. It is possible
+ *           for the RPC to be freed while this function is active. If that
+ *           happens, copying will cease, -EINVAL will be returned, and
+ *           rpc->state will be RPC_DEAD.
+ */
+int homa_message_out_fill(struct homa_rpc *rpc, struct iov_iter *iter, int xmit)
+	__must_hold(rpc_bucket_lock)
+{
+	/* Geometry information for packets:
+	 * mtu:              largest size for an on-the-wire packet (including
+	 *                   all headers through IP header, but not Ethernet
+	 *                   header).
+	 * max_seg_data:     largest amount of Homa message data that fits
+	 *                   in an on-the-wire packet (after segmentation).
+	 * max_gso_data:     largest amount of Homa message data that fits
+	 *                   in a GSO packet (before segmentation).
+	 */
+	int mtu, max_seg_data, max_gso_data;
+
+	struct sk_buff **last_link;
+	struct dst_entry *dst;
+	u64 segs_per_gso;
+	int overlap_xmit;
+
+	/* Bytes of the message that haven't yet been copied into skbs. */
+	int bytes_left;
+
+	int gso_size;
+	int err;
+
+	homa_rpc_hold(rpc);
+	if (unlikely(iter->count > HOMA_MAX_MESSAGE_LENGTH ||
+		     iter->count == 0)) {
+		err = -EINVAL;
+		goto error;
+	}
+	homa_message_out_init(rpc, iter->count);
+
+	/* Compute the geometry of packets. */
+	dst = homa_get_dst(rpc->peer, rpc->hsk);
+	mtu = dst_mtu(dst);
+	max_seg_data = mtu - rpc->hsk->ip_header_length
+			- sizeof(struct homa_data_hdr);
+	gso_size = dst->dev->gso_max_size;
+	if (gso_size > rpc->hsk->homa->max_gso_size)
+		gso_size = rpc->hsk->homa->max_gso_size;
+	dst_release(dst);
+
+	/* Round gso_size down to an even # of mtus. */
+	segs_per_gso = gso_size - rpc->hsk->ip_header_length -
+			sizeof(struct homa_data_hdr) +
+			sizeof(struct homa_seg_hdr);
+	do_div(segs_per_gso, max_seg_data +
+			sizeof(struct homa_seg_hdr));
+	if (segs_per_gso == 0)
+		segs_per_gso = 1;
+	max_gso_data = segs_per_gso * max_seg_data;
+
+	overlap_xmit = rpc->msgout.length > 2 * max_gso_data;
+	homa_skb_stash_pages(rpc->hsk->homa, rpc->msgout.length);
+
+	/* Each iteration of the loop below creates one GSO packet. */
+	last_link = &rpc->msgout.packets;
+	for (bytes_left = rpc->msgout.length; bytes_left > 0; ) {
+		int skb_data_bytes, offset;
+		struct sk_buff *skb;
+
+		homa_rpc_unlock(rpc);
+		skb_data_bytes = max_gso_data;
+		offset = rpc->msgout.length - bytes_left;
+		if (skb_data_bytes > bytes_left)
+			skb_data_bytes = bytes_left;
+		skb = homa_tx_data_pkt_alloc(rpc, iter, offset, skb_data_bytes,
+					     max_seg_data);
+		if (IS_ERR(skb)) {
+			err = PTR_ERR(skb);
+			homa_rpc_lock(rpc);
+			goto error;
+		}
+		bytes_left -= skb_data_bytes;
+
+		homa_rpc_lock(rpc);
+		if (rpc->state == RPC_DEAD) {
+			/* RPC was freed while we were copying. */
+			err = -EINVAL;
+			homa_skb_free_tx(rpc->hsk->homa, skb);
+			goto error;
+		}
+		*last_link = skb;
+		last_link = &(homa_get_skb_info(skb)->next_skb);
+		*last_link = NULL;
+		rpc->msgout.num_skbs++;
+		rpc->msgout.skb_memory += skb->truesize;
+		rpc->msgout.copied_from_user = rpc->msgout.length - bytes_left;
+		if (overlap_xmit && list_empty(&rpc->throttled_links) &&
+		    xmit)
+			homa_pacer_manage_rpc(rpc);
+	}
+	refcount_add(rpc->msgout.skb_memory, &rpc->hsk->sock.sk_wmem_alloc);
+	homa_rpc_put(rpc);
+	if (!overlap_xmit && xmit)
+		homa_xmit_data(rpc, false);
+	return 0;
+
+error:
+	refcount_add(rpc->msgout.skb_memory, &rpc->hsk->sock.sk_wmem_alloc);
+	homa_rpc_put(rpc);
+	return err;
+}
+
+/**
+ * homa_xmit_control() - Send a control packet to the other end of an RPC.
+ * @type:      Packet type, such as DATA.
+ * @contents:  Address of buffer containing the contents of the packet.
+ *             Only information after the common header must be valid;
+ *             the common header will be filled in by this function.
+ * @length:    Length of @contents (including the common header).
+ * @rpc:       The packet will go to the socket that handles the other end
+ *             of this RPC. Addressing info for the packet, including all of
+ *             the fields of homa_common_hdr except type, will be set from this.
+ *             Caller must hold either the lock or a reference.
+ *
+ * Return:     Either zero (for success), or a negative errno value if there
+ *             was a problem.
+ */
+int homa_xmit_control(enum homa_packet_type type, void *contents,
+		      size_t length, struct homa_rpc *rpc)
+{
+	struct homa_common_hdr *h = contents;
+
+	h->type = type;
+	h->sport = htons(rpc->hsk->port);
+	h->dport = htons(rpc->dport);
+	h->sender_id = cpu_to_be64(rpc->id);
+	return __homa_xmit_control(contents, length, rpc->peer, rpc->hsk);
+}
+
+/**
+ * __homa_xmit_control() - Lower-level version of homa_xmit_control: sends
+ * a control packet.
+ * @contents:  Address of buffer containing the contents of the packet.
+ *             The caller must have filled in all of the information,
+ *             including the common header.
+ * @length:    Length of @contents.
+ * @peer:      Destination to which the packet will be sent.
+ * @hsk:       Socket via which the packet will be sent.
+ *
+ * Return:     Either zero (for success), or a negative errno value if there
+ *             was a problem.
+ */
+int __homa_xmit_control(void *contents, size_t length, struct homa_peer *peer,
+			struct homa_sock *hsk)
+{
+	struct homa_common_hdr *h;
+	struct sk_buff *skb;
+	int extra_bytes;
+	int result;
+
+	skb = homa_skb_alloc_tx(HOMA_MAX_HEADER);
+	if (unlikely(!skb))
+		return -ENOBUFS;
+	skb_dst_set(skb, homa_get_dst(peer, hsk));
+
+	h = skb_put(skb, length);
+	memcpy(h, contents, length);
+	extra_bytes = HOMA_MIN_PKT_LENGTH - length;
+	if (extra_bytes > 0)
+		memset(skb_put(skb, extra_bytes), 0, extra_bytes);
+	skb->ooo_okay = 1;
+	skb_get(skb);
+	if (hsk->inet.sk.sk_family == AF_INET6)
+		result = ip6_xmit(&hsk->inet.sk, skb, &peer->flow.u.ip6, 0,
+				  NULL, 0, 0);
+	else
+		result = ip_queue_xmit(&hsk->inet.sk, skb, &peer->flow);
+	if (unlikely(result != 0)) {
+		/* It appears that ip*_xmit frees skbuffs after
+		 * errors; the following code is to raise an alert if
+		 * this isn't actually the case. The extra skb_get above
+		 * and kfree_skb call below are needed to do the check
+		 * accurately (otherwise the buffer could be freed and
+		 * its memory used for some other purpose, resulting in
+		 * a bogus "reference count").
+		 */
+		if (refcount_read(&skb->users) > 1) {
+			if (hsk->inet.sk.sk_family == AF_INET6)
+				pr_notice("ip6_xmit didn't free Homa control packet (type %d) after error %d\n",
+					  h->type, result);
+			else
+				pr_notice("ip_queue_xmit didn't free Homa control packet (type %d) after error %d\n",
+					  h->type, result);
+		}
+	}
+	kfree_skb(skb);
+	return result;
+}
+
+/**
+ * homa_xmit_unknown() - Send an RPC_UNKNOWN packet to a peer.
+ * @skb:         Buffer containing an incoming packet; identifies the peer to
+ *               which the RPC_UNKNOWN packet should be sent.
+ * @hsk:         Socket that should be used to send the RPC_UNKNOWN packet.
+ */
+void homa_xmit_unknown(struct sk_buff *skb, struct homa_sock *hsk)
+{
+	struct homa_common_hdr *h = (struct homa_common_hdr *)skb->data;
+	struct in6_addr saddr = skb_canonical_ipv6_saddr(skb);
+	struct homa_rpc_unknown_hdr unknown;
+	struct homa_peer *peer;
+
+	unknown.common.sport = h->dport;
+	unknown.common.dport = h->sport;
+	unknown.common.type = RPC_UNKNOWN;
+	unknown.common.sender_id = cpu_to_be64(homa_local_id(h->sender_id));
+	peer = homa_peer_get(hsk, &saddr);
+	if (!IS_ERR(peer))
+		__homa_xmit_control(&unknown, sizeof(unknown), peer, hsk);
+	homa_peer_release(peer);
+}
+
+/**
+ * homa_xmit_data() - If an RPC has outbound data packets that are permitted
+ * to be transmitted according to the scheduling mechanism, arrange for
+ * them to be sent (some may be sent immediately; others may be sent
+ * later by the pacer thread).
+ * @rpc:       RPC to check for transmittable packets. Must be locked by
+ *             caller. Note: this function will release the RPC lock while
+ *             passing packets through the RPC stack, then reacquire it
+ *             before returning. It is possible that the RPC gets freed
+ *             when the lock isn't held, in which case the state will
+ *             be RPC_DEAD on return.
+ * @force:     True means send at least one packet, even if the NIC queue
+ *             is too long. False means that zero packets may be sent, if
+ *             the NIC queue is sufficiently long.
+ */
+void homa_xmit_data(struct homa_rpc *rpc, bool force)
+	__must_hold(rpc_bucket_lock)
+{
+	struct homa *homa = rpc->hsk->homa;
+
+	homa_rpc_hold(rpc);
+	while (*rpc->msgout.next_xmit) {
+		struct sk_buff *skb = *rpc->msgout.next_xmit;
+
+		if ((rpc->msgout.length - rpc->msgout.next_xmit_offset)
+				>= homa->pacer->throttle_min_bytes) {
+			if (!homa_pacer_check_nic_q(homa->pacer, skb, force)) {
+				homa_pacer_manage_rpc(rpc);
+				break;
+			}
+		}
+
+		rpc->msgout.next_xmit = &(homa_get_skb_info(skb)->next_skb);
+		rpc->msgout.next_xmit_offset +=
+				homa_get_skb_info(skb)->data_bytes;
+
+		homa_rpc_hold(rpc);
+		homa_rpc_unlock(rpc);
+		skb_get(skb);
+		__homa_xmit_data(skb, rpc);
+		force = false;
+		homa_rpc_lock(rpc);
+		homa_rpc_put(rpc);
+		if (rpc->state == RPC_DEAD)
+			break;
+	}
+	homa_rpc_put(rpc);
+}
+
+/**
+ * __homa_xmit_data() - Handles packet transmission stuff that is common
+ * to homa_xmit_data and homa_resend_data.
+ * @skb:      Packet to be sent. The packet will be freed after transmission
+ *            (and also if errors prevented transmission).
+ * @rpc:      Information about the RPC that the packet belongs to.
+ */
+void __homa_xmit_data(struct sk_buff *skb, struct homa_rpc *rpc)
+{
+	skb_dst_set(skb, homa_get_dst(rpc->peer, rpc->hsk));
+
+	skb->ooo_okay = 1;
+	skb->ip_summed = CHECKSUM_PARTIAL;
+	skb->csum_start = skb_transport_header(skb) - skb->head;
+	skb->csum_offset = offsetof(struct homa_common_hdr, checksum);
+	if (rpc->hsk->inet.sk.sk_family == AF_INET6)
+		ip6_xmit(&rpc->hsk->inet.sk, skb, &rpc->peer->flow.u.ip6,
+			 0, NULL, 0, 0);
+	else
+		ip_queue_xmit(&rpc->hsk->inet.sk, skb, &rpc->peer->flow);
+}
+
+/**
+ * homa_resend_data() - This function is invoked as part of handling RESEND
+ * requests. It retransmits the packet(s) containing a given range of bytes
+ * from a message.
+ * @rpc:      RPC for which data should be resent.
+ * @start:    Offset within @rpc->msgout of the first byte to retransmit.
+ * @end:      Offset within @rpc->msgout of the byte just after the last one
+ *            to retransmit.
+ */
+void homa_resend_data(struct homa_rpc *rpc, int start, int end)
+	__must_hold(rpc_bucket_lock)
+{
+	struct homa_skb_info *homa_info;
+	struct sk_buff *skb;
+
+	if (end <= start)
+		return;
+
+	/* Each iteration of this loop checks one packet in the message
+	 * to see if it contains segments that need to be retransmitted.
+	 */
+	for (skb = rpc->msgout.packets; skb; skb = homa_info->next_skb) {
+		int seg_offset, offset, seg_length, data_left;
+		struct homa_data_hdr *h;
+
+		homa_info = homa_get_skb_info(skb);
+		offset = homa_info->offset;
+		if (offset >= end)
+			break;
+		if (start >= (offset + homa_info->data_bytes))
+			continue;
+
+		offset = homa_info->offset;
+		seg_offset = sizeof(struct homa_data_hdr);
+		data_left = homa_info->data_bytes;
+		if (skb_shinfo(skb)->gso_segs <= 1) {
+			seg_length = data_left;
+		} else {
+			seg_length = homa_info->seg_length;
+			h = (struct homa_data_hdr *)skb_transport_header(skb);
+		}
+		for ( ; data_left > 0; data_left -= seg_length,
+		     offset += seg_length,
+		     seg_offset += skb_shinfo(skb)->gso_size) {
+			struct homa_skb_info *new_homa_info;
+			struct sk_buff *new_skb;
+			int err;
+
+			if (seg_length > data_left)
+				seg_length = data_left;
+
+			if (end <= offset)
+				goto resend_done;
+			if ((offset + seg_length) <= start)
+				continue;
+
+			/* This segment must be retransmitted. */
+			new_skb = homa_skb_alloc_tx(sizeof(struct homa_data_hdr)
+					+ seg_length);
+			if (unlikely(!new_skb))
+				goto resend_done;
+			h = __skb_put_data(new_skb, skb_transport_header(skb),
+					   sizeof(struct homa_data_hdr));
+			h->common.sequence = htonl(offset);
+			h->seg.offset = htonl(offset);
+			h->retransmit = 1;
+			err = homa_skb_append_from_skb(rpc->hsk->homa, new_skb,
+						       skb, seg_offset,
+						       seg_length);
+			if (err != 0) {
+				pr_err("%s got error %d from homa_skb_append_from_skb\n",
+				       __func__, err);
+				kfree_skb(new_skb);
+				goto resend_done;
+			}
+
+			new_homa_info = homa_get_skb_info(new_skb);
+			new_homa_info->wire_bytes = rpc->hsk->ip_header_length
+					+ sizeof(struct homa_data_hdr)
+					+ seg_length + HOMA_ETH_OVERHEAD;
+			new_homa_info->data_bytes = seg_length;
+			new_homa_info->seg_length = seg_length;
+			new_homa_info->offset = offset;
+			homa_pacer_check_nic_q(rpc->hsk->homa->pacer, new_skb,
+					       true);
+			__homa_xmit_data(new_skb, rpc);
+		}
+	}
+
+resend_done:
+	return;
+}
--
2.43.0


Powered by blists - more mailing lists

Powered by Openwall GNU/*/Linux Powered by OpenVZ