[<prev] [next>] [<thread-prev] [thread-next>] [day] [month] [year] [list]
Message-ID: <af4bd23b0a4b417b9ce26d77c1d153cfaf78ec64.1766433800.git.fmaurer@redhat.com>
Date: Mon, 22 Dec 2025 21:57:35 +0100
From: Felix Maurer <fmaurer@...hat.com>
To: netdev@...r.kernel.org
Cc: davem@...emloft.net,
edumazet@...gle.com,
kuba@...nel.org,
pabeni@...hat.com,
horms@...nel.org,
jkarrenpalo@...il.com,
tglx@...utronix.de,
mingo@...nel.org,
allison.henderson@...cle.com,
matttbe@...nel.org,
petrm@...dia.com,
bigeasy@...utronix.de
Subject: [RFC net 5/6] hsr: Implement more robust duplicate discard for PRP
The PRP duplicate discard algorithm does not work reliably with certain
link faults. Especially with packet loss on one link, the duplicate discard
algorithm drops valid packets which leads to packet loss on the PRP
interface where the link fault should in theory be perfectly recoverable by
PRP. This happens because the algorithm opens the drop window on the lossy
link, covering received and lost sequence numbers. If the other, non-lossy
link receives the duplicate for a lost frame, it is within the drop window
of the lossy link and therefore dropped.
Since IEC 62439-3:2012, a node has one sequence number counter for frames
it sends, instead of one sequence number counter for each destination.
Therefore, a node can not expect to receive contiguous sequence numbers
from a sender. A missing sequence number can be totally normal (if the
sender intermittently communicates with another node) or mean a frame was
lost.
The algorithm, as previously implemented in commit 05fd00e5e7b1 ("net: hsr:
Fix PRP duplicate detection"), was part of IEC 62439-3:2010 (HSRv0/PRPv0)
but was removed with IEC 62439-3:2012 (HSRv1/PRPv1). Since that, no
algorithm is specified but up to implementers. It should be "designed such
that it never rejects a legitimate frame, while occasional acceptance of a
duplicate can be tolerated" (IEC 62439-3:2021).
For the duplicate discard algorithm, this means that 1) we need to track
the sequence numbers individually to account for non-contiguous sequence
numbers, and 2) we should always err on the side of accepting a duplicate
than dropping a valid frame.
The idea of the new algorithm is to store the seen sequence numbers in a
bitmap. To keep the size of the bitmap in control, we store it as a "sparse
bitmap" where the bitmap is split into blocks and not all blocks exist at
the same time. The sparse bitmap is implemented using an xarray that keeps
the references to the individual blocks and a backing ring buffer that
stores the actual blocks. New blocks are initialized in the buffer and
added to the xarray as needed when new frames arrive. Existing blocks are
removed in two conditions:
1. The block found for an arriving sequence number is old and therefore not
relevant to the duplicate discard algorithm anymore, i.e., it has been
added more than than the entry forget time ago. In this case, the block
is removed from the xarray and marked as forgotten (by setting its
timestamp to 0).
2. Space is needed in the ring buffer for a new block. In this case, the
block is removed from the xarray, if it hasn't already been forgotten
(by 1.). Afterwards, the new block is initialized in its place.
All of this still happens under seq_out_lock, to prevent concurrent
access to the blocks.
Fixes: 05fd00e5e7b1 ("net: hsr: Fix PRP duplicate detection")
Signed-off-by: Felix Maurer <fmaurer@...hat.com>
---
net/hsr/hsr_framereg.c | 181 ++++++++++++++++++++++++++---------------
net/hsr/hsr_framereg.h | 24 +++++-
2 files changed, 138 insertions(+), 67 deletions(-)
diff --git a/net/hsr/hsr_framereg.c b/net/hsr/hsr_framereg.c
index 3a2a2fa7a0a3..c6e9d3a2648a 100644
--- a/net/hsr/hsr_framereg.c
+++ b/net/hsr/hsr_framereg.c
@@ -126,13 +126,28 @@ void hsr_del_self_node(struct hsr_priv *hsr)
kfree_rcu(old, rcu_head);
}
+static void hsr_free_node(struct hsr_node *node)
+{
+ xa_destroy(&node->seq_blocks);
+ kfree(node->block_buf);
+ kfree(node);
+}
+
+static void hsr_free_node_rcu(struct rcu_head *rn)
+{
+ struct hsr_node *node = container_of(rn, struct hsr_node, rcu_head);
+ hsr_free_node(node);
+}
+
void hsr_del_nodes(struct list_head *node_db)
{
struct hsr_node *node;
struct hsr_node *tmp;
- list_for_each_entry_safe(node, tmp, node_db, mac_list)
- kfree(node);
+ list_for_each_entry_safe(node, tmp, node_db, mac_list) {
+ list_del(&node->mac_list);
+ hsr_free_node(node);
+ }
}
void prp_handle_san_frame(bool san, enum hsr_port_type port,
@@ -176,11 +191,7 @@ static struct hsr_node *hsr_add_node(struct hsr_priv *hsr,
for (i = 0; i < HSR_PT_PORTS; i++) {
new_node->time_in[i] = now;
new_node->time_out[i] = now;
- }
- for (i = 0; i < HSR_PT_PORTS; i++) {
new_node->seq_out[i] = seq_out;
- new_node->seq_expected[i] = seq_out + 1;
- new_node->seq_start[i] = seq_out + 1;
}
if (san && hsr->proto_ops->handle_san_frame)
@@ -194,11 +205,20 @@ static struct hsr_node *hsr_add_node(struct hsr_priv *hsr,
if (ether_addr_equal(node->macaddress_B, addr))
goto out;
}
+
+ new_node->block_buf = kcalloc(HSR_MAX_SEQ_BLOCKS,
+ sizeof(struct hsr_seq_block),
+ GFP_ATOMIC);
+ if (!new_node->block_buf)
+ goto out;
+
+ xa_init(&new_node->seq_blocks);
list_add_tail_rcu(&new_node->mac_list, node_db);
spin_unlock_bh(&hsr->list_lock);
return new_node;
out:
spin_unlock_bh(&hsr->list_lock);
+ kfree(new_node->block_buf);
kfree(new_node);
return node;
}
@@ -283,6 +303,9 @@ struct hsr_node *hsr_get_node(struct hsr_port *port, struct list_head *node_db,
/* Use the Supervision frame's info about an eventual macaddress_B for merging
* nodes that has previously had their macaddress_B registered as a separate
* node.
+ * TODO Merging PRP nodes currently looses the info on received sequence numbers
+ * from one node. This may lead to some duplicates being received but does no
+ * harm otherwise.
*/
void hsr_handle_sup_frame(struct hsr_frame_info *frame)
{
@@ -398,7 +421,7 @@ void hsr_handle_sup_frame(struct hsr_frame_info *frame)
if (!node_curr->removed) {
list_del_rcu(&node_curr->mac_list);
node_curr->removed = true;
- kfree_rcu(node_curr, rcu_head);
+ call_rcu(&node_curr->rcu_head, hsr_free_node_rcu);
}
spin_unlock_bh(&hsr->list_lock);
@@ -505,17 +528,66 @@ int hsr_register_frame_out(struct hsr_port *port, struct hsr_frame_info *frame)
return 0;
}
-/* Adaptation of the PRP duplicate discard algorithm described in wireshark
- * wiki (https://wiki.wireshark.org/PRP)
- *
- * A drop window is maintained for both LANs with start sequence set to the
- * first sequence accepted on the LAN that has not been seen on the other LAN,
- * and expected sequence set to the latest received sequence number plus one.
- *
- * When a frame is received on either LAN it is compared against the received
- * frames on the other LAN. If it is outside the drop window of the other LAN
- * the frame is accepted and the drop window is updated.
- * The drop window for the other LAN is reset.
+static bool hsr_seq_block_is_old(struct hsr_seq_block *block)
+{
+ unsigned long expiry = msecs_to_jiffies(HSR_ENTRY_FORGET_TIME);
+
+ return time_is_before_jiffies(block->time + expiry);
+}
+
+static void hsr_forget_seq_block(struct hsr_node *node,
+ struct hsr_seq_block *block)
+{
+ if (block->time)
+ xa_erase(&node->seq_blocks, block->block_idx);
+ block->time = 0;
+}
+
+/* Get the currently active sequence number block. If there is no block yet, or
+ * the existing one is expired, a new block is created. The idea is to maintain
+ * a "sparse bitmap" where a bitmap for the whole sequence number space is
+ * split into blocks and not all blocks exist all the time. The blocks can
+ * expire after time (in low traffic situations) or when they are replaced in
+ * the backing fixed size buffer (in high traffic situations).
+ */
+static struct hsr_seq_block *hsr_get_active_seq_block(struct hsr_node *node, u16 seq_nr)
+{
+ struct hsr_seq_block *block, *res;
+ u16 block_idx;
+
+ block_idx = seq_nr >> HSR_SEQ_BLOCK_SHIFT;
+ block = xa_load(&node->seq_blocks, block_idx);
+
+ if (block && hsr_seq_block_is_old(block)) {
+ hsr_forget_seq_block(node, block);
+ block = NULL;
+ }
+
+ if (!block) {
+ block = &node->block_buf[node->next_block];
+ hsr_forget_seq_block(node, block);
+
+ memset(block, 0, sizeof(*block));
+ block->time = jiffies;
+ block->block_idx = block_idx;
+
+ res = xa_store(&node->seq_blocks, block_idx, block, GFP_ATOMIC);
+ if (xa_is_err(res))
+ return NULL;
+
+ node->next_block = (node->next_block + 1) & (HSR_MAX_SEQ_BLOCKS - 1);
+ }
+
+ return block;
+}
+
+/* PRP duplicate discard algorithm: we maintain a bitmap where we set a bit for
+ * every seen sequence number. The bitmap is split into blocks and the block
+ * management is detailed in hsr_get_active_seq_block(). In any case, we err on
+ * the side of accepting a packet, as the specification requires the algorithm
+ * to be "designed such that it never rejects a legitimate frame, while
+ * occasional acceptance of a duplicate can be tolerated." (IEC 62439-3:2021,
+ * 4.1.10.3).
*
* 'port' is the outgoing interface
* 'frame' is the frame to be sent
@@ -526,18 +598,21 @@ int hsr_register_frame_out(struct hsr_port *port, struct hsr_frame_info *frame)
*/
int prp_register_frame_out(struct hsr_port *port, struct hsr_frame_info *frame)
{
- enum hsr_port_type other_port;
- enum hsr_port_type rcv_port;
+ struct hsr_seq_block *block;
+ u16 sequence_nr, seq_bit;
struct hsr_node *node;
- u16 sequence_diff;
- u16 sequence_exp;
- u16 sequence_nr;
- /* out-going frames are always in order
- * and can be checked the same way as for HSR
- */
- if (frame->port_rcv->type == HSR_PT_MASTER)
- return hsr_register_frame_out(port, frame);
+ node = frame->node_src;
+ sequence_nr = frame->sequence_nr;
+
+ // out-going frames are always in order
+ if (frame->port_rcv->type == HSR_PT_MASTER) {
+ spin_lock_bh(&node->seq_out_lock);
+ node->time_out[port->type] = jiffies;
+ node->seq_out[port->type] = sequence_nr;
+ spin_unlock_bh(&node->seq_out_lock);
+ return 0;
+ }
/* for PRP we should only forward frames from the slave ports
* to the master port
@@ -545,47 +620,25 @@ int prp_register_frame_out(struct hsr_port *port, struct hsr_frame_info *frame)
if (port->type != HSR_PT_MASTER)
return 1;
- node = frame->node_src;
- sequence_nr = frame->sequence_nr;
- sequence_exp = sequence_nr + 1;
- rcv_port = frame->port_rcv->type;
- other_port = rcv_port == HSR_PT_SLAVE_A ? HSR_PT_SLAVE_B :
- HSR_PT_SLAVE_A;
-
spin_lock_bh(&node->seq_out_lock);
- if (time_is_before_jiffies(node->time_out[port->type] +
- msecs_to_jiffies(HSR_ENTRY_FORGET_TIME)) ||
- (node->seq_start[rcv_port] == node->seq_expected[rcv_port] &&
- node->seq_start[other_port] == node->seq_expected[other_port])) {
- /* the node hasn't been sending for a while
- * or both drop windows are empty, forward the frame
- */
- node->seq_start[rcv_port] = sequence_nr;
- } else if (seq_nr_before(sequence_nr, node->seq_expected[other_port]) &&
- seq_nr_before_or_eq(node->seq_start[other_port], sequence_nr)) {
- /* drop the frame, update the drop window for the other port
- * and reset our drop window
- */
- node->seq_start[other_port] = sequence_exp;
- node->seq_expected[rcv_port] = sequence_exp;
- node->seq_start[rcv_port] = node->seq_expected[rcv_port];
- spin_unlock_bh(&node->seq_out_lock);
- return 1;
- }
- /* update the drop window for the port where this frame was received
- * and clear the drop window for the other port
- */
- node->seq_start[other_port] = node->seq_expected[other_port];
- node->seq_expected[rcv_port] = sequence_exp;
- sequence_diff = sequence_exp - node->seq_start[rcv_port];
- if (sequence_diff > PRP_DROP_WINDOW_LEN)
- node->seq_start[rcv_port] = sequence_exp - PRP_DROP_WINDOW_LEN;
+ block = hsr_get_active_seq_block(node, sequence_nr);
+ if (WARN_ON_ONCE(!block))
+ goto out_new;
+
+ seq_bit = sequence_nr & HSR_SEQ_BLOCK_MASK;
+ if (test_and_set_bit(seq_bit, block->seq_nrs))
+ goto out_seen;
node->time_out[port->type] = jiffies;
node->seq_out[port->type] = sequence_nr;
+out_new:
spin_unlock_bh(&node->seq_out_lock);
return 0;
+
+out_seen:
+ spin_unlock_bh(&node->seq_out_lock);
+ return 1;
}
#if IS_MODULE(CONFIG_PRP_DUP_DISCARD_KUNIT_TEST)
@@ -672,7 +725,7 @@ void hsr_prune_nodes(struct timer_list *t)
list_del_rcu(&node->mac_list);
node->removed = true;
/* Note that we need to free this entry later: */
- kfree_rcu(node, rcu_head);
+ call_rcu(&node->rcu_head, hsr_free_node_rcu);
}
}
}
@@ -706,7 +759,7 @@ void hsr_prune_proxy_nodes(struct timer_list *t)
list_del_rcu(&node->mac_list);
node->removed = true;
/* Note that we need to free this entry later: */
- kfree_rcu(node, rcu_head);
+ call_rcu(&node->rcu_head, hsr_free_node_rcu);
}
}
}
diff --git a/net/hsr/hsr_framereg.h b/net/hsr/hsr_framereg.h
index b04948659d84..804543d5191c 100644
--- a/net/hsr/hsr_framereg.h
+++ b/net/hsr/hsr_framereg.h
@@ -74,9 +74,26 @@ bool hsr_is_node_in_db(struct list_head *node_db,
int prp_register_frame_out(struct hsr_port *port, struct hsr_frame_info *frame);
+
+#define HSR_SEQ_BLOCK_SHIFT 7 /* 128 bits */
+#define HSR_SEQ_BLOCK_SIZE (1 << HSR_SEQ_BLOCK_SHIFT)
+#define HSR_SEQ_BLOCK_MASK (HSR_SEQ_BLOCK_SIZE - 1)
+#define HSR_MAX_SEQ_BLOCKS 64
+
+struct hsr_seq_block {
+ unsigned long time;
+ u16 block_idx;
+ DECLARE_BITMAP(seq_nrs, HSR_SEQ_BLOCK_SIZE);
+};
+
+struct hsr_block_buf {
+ struct hsr_seq_block *buf;
+ int next;
+};
+
struct hsr_node {
struct list_head mac_list;
- /* Protect R/W access to seq_out */
+ /* Protect R/W access to seq_out and seq_blocks */
spinlock_t seq_out_lock;
unsigned char macaddress_A[ETH_ALEN];
unsigned char macaddress_B[ETH_ALEN];
@@ -91,8 +108,9 @@ struct hsr_node {
u16 seq_out[HSR_PT_PORTS];
bool removed;
/* PRP specific duplicate handling */
- u16 seq_expected[HSR_PT_PORTS];
- u16 seq_start[HSR_PT_PORTS];
+ struct xarray seq_blocks;
+ struct hsr_seq_block *block_buf;
+ int next_block;
struct rcu_head rcu_head;
};
--
2.52.0
Powered by blists - more mailing lists