[<prev] [next>] [day] [month] [year] [list]
Message-ID: <20260207012151.3972723-1-kuba@kernel.org>
Date: Fri, 6 Feb 2026 17:21:51 -0800
From: Jakub Kicinski <kuba@...nel.org>
To: davem@...emloft.net
Cc: netdev@...r.kernel.org,
edumazet@...gle.com,
pabeni@...hat.com,
andrew+netdev@...n.ch,
horms@...nel.org,
shuah@...nel.org,
linux-kselftest@...r.kernel.org,
Jakub Kicinski <kuba@...nel.org>,
danieller@...dia.com,
petrm@...dia.com
Subject: [PATCH net-next] selftests: drv-net: port_split: convert to ksft format and mark disruptive
The devlink port split test is included in the HW tests,
but it does not obey our KSFT_DISRUPTIVE marking.
If someone tries to run driver tests over SSH on a device
that supports port splitting the outcome will be loss of
connectivity.
Convert the test to conform to our "driver test environment".
Mark it as disruptive.
This is completely untested, I'd appreciate if someone with
the right setup could give this a go (and probably fix up
the bugs I introduced).
CC: danieller@...dia.com
CC: petrm@...dia.com
Signed-off-by: Jakub Kicinski <kuba@...nel.org>
---
.../drivers/net/hw/devlink_port_split.py | 243 ++++++------------
1 file changed, 77 insertions(+), 166 deletions(-)
diff --git a/tools/testing/selftests/drivers/net/hw/devlink_port_split.py b/tools/testing/selftests/drivers/net/hw/devlink_port_split.py
index 2d84c7a0be6b..c7b38f5e94fd 100755
--- a/tools/testing/selftests/drivers/net/hw/devlink_port_split.py
+++ b/tools/testing/selftests/drivers/net/hw/devlink_port_split.py
@@ -1,75 +1,48 @@
#!/usr/bin/env python3
# SPDX-License-Identifier: GPL-2.0
-from subprocess import PIPE, Popen
-import json
-import time
-import argparse
+"""
+Test port split configuration using devlink-port lanes attribute.
+The test is skipped in case the attribute is not available.
+
+First, check that all the ports with 1 lane fail to split.
+Second, check that all the ports with more than 1 lane can be split
+to all valid configurations (e.g., split to 2, split to 4 etc.)
+"""
+
import collections
-import sys
+import json
+import os
-#
-# Test port split configuration using devlink-port lanes attribute.
-# The test is skipped in case the attribute is not available.
-#
-# First, check that all the ports with 1 lane fail to split.
-# Second, check that all the ports with more than 1 lane can be split
-# to all valid configurations (e.g., split to 2, split to 4 etc.)
-#
+from lib.py import ksft_run, ksft_exit, ksft_pr
+from lib.py import ksft_eq, ksft_disruptive
+from lib.py import NetDrvEnv
+from lib.py import KsftSkipEx
+from lib.py import cmd
-# Kselftest framework requirement - SKIP code is 4
-KSFT_SKIP=4
Port = collections.namedtuple('Port', 'bus_info name')
-def run_command(cmd, should_fail=False):
+def get_devlink_ports(dev):
"""
- Run a command in subprocess.
- Return: Tuple of (stdout, stderr).
+ Get a list of physical devlink ports.
+ Return: Array of tuples (bus_info, name).
"""
- p = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True)
- stdout, stderr = p.communicate()
- stdout, stderr = stdout.decode(), stderr.decode()
+ arr = []
- if stderr != "" and not should_fail:
- print("Error sending command: %s" % cmd)
- print(stdout)
- print(stderr)
- return stdout, stderr
+ result = cmd("devlink -j port show")
+ ports = json.loads(result.stdout)['port']
+ validate_devlink_output(ports, 'flavour')
-class devlink_ports(object):
- """
- Class that holds information on the devlink ports, required to the tests;
- if_names: A list of interfaces in the devlink ports.
- """
+ for port in ports:
+ if dev in port:
+ if ports[port]['flavour'] == 'physical':
+ arr.append(Port(bus_info=port, name=ports[port]['netdev']))
- def get_if_names(dev):
- """
- Get a list of physical devlink ports.
- Return: Array of tuples (bus_info/port, if_name).
- """
-
- arr = []
-
- cmd = "devlink -j port show"
- stdout, stderr = run_command(cmd)
- assert stderr == ""
- ports = json.loads(stdout)['port']
-
- validate_devlink_output(ports, 'flavour')
-
- for port in ports:
- if dev in port:
- if ports[port]['flavour'] == 'physical':
- arr.append(Port(bus_info=port, name=ports[port]['netdev']))
-
- return arr
-
- def __init__(self, dev):
- self.if_names = devlink_ports.get_if_names(dev)
+ return arr
def get_max_lanes(port):
@@ -78,10 +51,8 @@ Port = collections.namedtuple('Port', 'bus_info name')
Return: number of lanes, e.g. 1, 2, 4 and 8.
"""
- cmd = "devlink -j port show %s" % port
- stdout, stderr = run_command(cmd)
- assert stderr == ""
- values = list(json.loads(stdout)['port'].values())[0]
+ result = cmd(f"devlink -j port show {port}")
+ values = list(json.loads(result.stdout)['port'].values())[0]
if 'lanes' in values:
lanes = values['lanes']
@@ -96,47 +67,12 @@ Port = collections.namedtuple('Port', 'bus_info name')
Return: split ability, true or false.
"""
- cmd = "devlink -j port show %s" % port.name
- stdout, stderr = run_command(cmd)
- assert stderr == ""
- values = list(json.loads(stdout)['port'].values())[0]
+ result = cmd(f"devlink -j port show {port.name}")
+ values = list(json.loads(result.stdout)['port'].values())[0]
return values['splittable']
-def split(k, port, should_fail=False):
- """
- Split $port into $k ports.
- If should_fail == True, the split should fail. Otherwise, should pass.
- Return: Array of sub ports after splitting.
- If the $port wasn't split, the array will be empty.
- """
-
- cmd = "devlink port split %s count %s" % (port.bus_info, k)
- stdout, stderr = run_command(cmd, should_fail=should_fail)
-
- if should_fail:
- if not test(stderr != "", "%s is unsplittable" % port.name):
- print("split an unsplittable port %s" % port.name)
- return create_split_group(port, k)
- else:
- if stderr == "":
- return create_split_group(port, k)
- print("didn't split a splittable port %s" % port.name)
-
- return []
-
-
-def unsplit(port):
- """
- Unsplit $port.
- """
-
- cmd = "devlink port unsplit %s" % port
- stdout, stderr = run_command(cmd)
- test(stderr == "", "Unsplit port %s" % port)
-
-
def exists(port, dev):
"""
Check if $port exists in the devlink ports.
@@ -144,7 +80,7 @@ Port = collections.namedtuple('Port', 'bus_info name')
"""
return any(dev_port.name == port
- for dev_port in devlink_ports.get_if_names(dev))
+ for dev_port in get_devlink_ports(dev))
def exists_and_lanes(ports, lanes, dev):
@@ -157,29 +93,15 @@ Port = collections.namedtuple('Port', 'bus_info name')
for port in ports:
max_lanes = get_max_lanes(port)
if not exists(port, dev):
- print("port %s doesn't exist in devlink ports" % port)
+ ksft_pr(f"port {port} doesn't exist in devlink ports")
return False
if max_lanes != lanes:
- print("port %s has %d lanes, but %s were expected"
- % (port, lanes, max_lanes))
+ ksft_pr(f"port {port} has {max_lanes} lanes, "
+ f"but {lanes} were expected")
return False
return True
-def test(cond, msg):
- """
- Check $cond and print a message accordingly.
- Return: True is pass, False otherwise.
- """
-
- if cond:
- print("TEST: %-60s [ OK ]" % msg)
- else:
- print("TEST: %-60s [FAIL]" % msg)
-
- return cond
-
-
def create_split_group(port, k):
"""
Create the split group for $port.
@@ -194,11 +116,11 @@ Port = collections.namedtuple('Port', 'bus_info name')
Test that splitting of unsplittable port fails.
"""
- # split to max
- new_split_group = split(k, port, should_fail=True)
-
- if new_split_group != []:
- unsplit(port.bus_info)
+ result = cmd(f"devlink port split {port.bus_info} count {k}", fail=False)
+ if result.ret == 0:
+ ksft_pr(f"split an unsplittable port {port.name}")
+ cmd(f"devlink port unsplit {port.bus_info}")
+ ksft_eq(result.ret != 0, True, f"{port.name} is unsplittable")
def split_splittable_port(port, k, lanes, dev):
@@ -206,20 +128,22 @@ Port = collections.namedtuple('Port', 'bus_info name')
Test that splitting of splittable port passes correctly.
"""
- new_split_group = split(k, port)
+ result = cmd(f"devlink port split {port.bus_info} count {k}", fail=False)
+
+ if result.ret != 0:
+ ksft_pr(f"didn't split a splittable port {port.name}")
+ return
# Once the split command ends, it takes some time to the sub ifaces'
# to get their names. Use udevadm to continue only when all current udev
# events are handled.
- cmd = "udevadm settle"
- stdout, stderr = run_command(cmd)
- assert stderr == ""
+ cmd("udevadm settle")
- if new_split_group != []:
- test(exists_and_lanes(new_split_group, lanes/k, dev),
- "split port %s into %s" % (port.name, k))
+ new_split_group = create_split_group(port, k)
+ ksft_eq(exists_and_lanes(new_split_group, lanes / k, dev), True,
+ f"split port {port.name} into {k}")
- unsplit(port.bus_info)
+ cmd(f"devlink port unsplit {port.bus_info}")
def validate_devlink_output(devlink_data, target_property=None):
@@ -231,7 +155,7 @@ Port = collections.namedtuple('Port', 'bus_info name')
skip_reason = None
if any(devlink_data.values()):
if target_property:
- skip_reason = "{} not found in devlink output, test skipped".format(target_property)
+ skip_reason = f"{target_property} not found in devlink output, test skipped"
for key in devlink_data:
if target_property in devlink_data[key]:
skip_reason = None
@@ -239,44 +163,22 @@ Port = collections.namedtuple('Port', 'bus_info name')
skip_reason = 'devlink output is empty, test skipped'
if skip_reason:
- print(skip_reason)
- sys.exit(KSFT_SKIP)
+ raise KsftSkipEx(skip_reason)
-def make_parser():
- parser = argparse.ArgumentParser(description='A test for port splitting.')
- parser.add_argument('--dev',
- help='The devlink handle of the device under test. ' +
- 'The default is the first registered devlink ' +
- 'handle.')
+@...t_disruptive
+def test_port_split(cfg):
+ """Test port split configuration using devlink-port lanes attribute."""
+ dev = f"pci/{cfg.pci}"
- return parser
+ result = cmd(f"devlink dev show {dev}", fail=False)
+ if result.ret != 0:
+ raise KsftSkipEx(f"devlink device {dev} can not be found")
-
-def main(cmdline=None):
- parser = make_parser()
- args = parser.parse_args(cmdline)
-
- dev = args.dev
- if not dev:
- cmd = "devlink -j dev show"
- stdout, stderr = run_command(cmd)
- assert stderr == ""
-
- validate_devlink_output(json.loads(stdout))
- devs = json.loads(stdout)['dev']
- dev = list(devs.keys())[0]
-
- cmd = "devlink dev show %s" % dev
- stdout, stderr = run_command(cmd)
- if stderr != "":
- print("devlink device %s can not be found" % dev)
- sys.exit(1)
-
- ports = devlink_ports(dev)
+ ports = get_devlink_ports(dev)
found_max_lanes = False
- for port in ports.if_names:
+ for port in ports:
max_lanes = get_max_lanes(port.name)
# If max lanes is 0, do not test port splitting at all
@@ -285,15 +187,15 @@ Port = collections.namedtuple('Port', 'bus_info name')
# If 1 lane, shouldn't be able to split
elif max_lanes == 1:
- test(not get_split_ability(port),
- "%s should not be able to split" % port.name)
+ ksft_eq(get_split_ability(port), False,
+ f"{port.name} should not be able to split")
split_unsplittable_port(port, max_lanes)
# Else, splitting should pass and all the split ports should exist.
else:
lane = max_lanes
- test(get_split_ability(port),
- "%s should be able to split" % port.name)
+ ksft_eq(get_split_ability(port), True,
+ f"{port.name} should be able to split")
while lane > 1:
split_splittable_port(port, lane, max_lanes, dev)
@@ -301,8 +203,17 @@ Port = collections.namedtuple('Port', 'bus_info name')
found_max_lanes = True
if not found_max_lanes:
- print(f"Test not started, no port of device {dev} reports max_lanes")
- sys.exit(KSFT_SKIP)
+ raise KsftSkipEx(f"No port of device {dev} reports max_lanes")
+
+
+def main() -> None:
+ """Ksft boiler plate main"""
+ with NetDrvEnv(__file__, nsim_test=False) as cfg:
+ cfg.pci = os.path.basename(
+ os.path.realpath(f"/sys/class/net/{cfg.ifname}/device")
+ )
+ ksft_run([test_port_split], args=(cfg,))
+ ksft_exit()
if __name__ == "__main__":
--
2.53.0
Powered by blists - more mailing lists