Specifically these operate to configure IP using ip(8) running within any CmdSite. We also include things to allocate addresses in example networks. Signed-off-by: David Gibson <david(a)gibson.dropbear.id.au> --- test/tasst/__main__.py | 1 + test/tasst/ip.py | 234 ++++++++++++++++++++++++++++++++ test/tasst/selftest/__init__.py | 16 +++ 3 files changed, 251 insertions(+) create mode 100644 test/tasst/ip.py create mode 100644 test/tasst/selftest/__init__.py diff --git a/test/tasst/__main__.py b/test/tasst/__main__.py index 9cba8985..e7456e8b 100644 --- a/test/tasst/__main__.py +++ b/test/tasst/__main__.py @@ -17,6 +17,7 @@ import exeter MODULES = [ 'cmdsite', + 'ip', 'unshare', ] diff --git a/test/tasst/ip.py b/test/tasst/ip.py new file mode 100644 index 00000000..7d9b1c11 --- /dev/null +++ b/test/tasst/ip.py @@ -0,0 +1,234 @@ +#! /usr/bin/env python3 + +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Copyright Red Hat +# Author: David Gibson <david(a)gibson.dropbear.id.au> + +""" +Test A Simple Socket Transport + +tasst/ip.py - Configure and read IP on simulated sites +""" + +from __future__ import annotations + +import contextlib +import dataclasses +import ipaddress +import json +from typing import Any, Iterator, Literal, Sequence, cast + +import exeter + +from . import cmdsite, unshare + +Addr = ipaddress.IPv4Address | ipaddress.IPv6Address +AddrMask = ipaddress.IPv4Interface | ipaddress.IPv6Interface +Net = ipaddress.IPv4Network | ipaddress.IPv6Network + + +# Loopback addresses, for convenience +LOOPBACK4 = ipaddress.ip_address('127.0.0.1') +LOOPBACK6 = ipaddress.ip_address('::1') + +# Documentation test networks defined by RFC 5737 +TEST_NET_1 = ipaddress.ip_network('192.0.2.0/24') +TEST_NET_2 = ipaddress.ip_network('198.51.100.0/24') +TEST_NET_3 = ipaddress.ip_network('203.0.113.0/24') + +# Documentation test network defined by RFC 3849 +TEST_NET6 = ipaddress.ip_network('2001:db8::/32') +# Some subnets of that for our usage +TEST_NET6_TASST_A = ipaddress.ip_network('2001:db8:9a55:aaaa::/64') +TEST_NET6_TASST_B = ipaddress.ip_network('2001:db8:9a55:bbbb::/64') +TEST_NET6_TASST_C = ipaddress.ip_network('2001:db8:9a55:cccc::/64') + + +class IpiAllocator: + """IP address allocator""" + + DEFAULT_NETS = (TEST_NET_1, TEST_NET6_TASST_A) + + def __init__(self, *nets: Net | str) -> None: + if not nets: + nets = self.DEFAULT_NETS + + self.nets = [ipaddress.ip_network(n) for n in nets] + self.hostses = [n.hosts() for n in self.nets] + + def next_ipis(self) \ + -> list[ipaddress.IPv4Interface | ipaddress.IPv6Interface]: + addrs = [next(h) for h in self.hostses] + return [ipaddress.ip_interface(f'{a}/{n.prefixlen}') + for a, n in zip(addrs, self.nets)] + + +def ifs(site: cmdsite.CmdSite) -> Sequence[str]: + info = json.loads(site.output('ip', '-j', 'link', 'show')) + return [i['ifname'] for i in info] + + +def ifup(site: cmdsite.CmdSite, ifname: str, *addrs: AddrMask, + dad: Literal['disable', 'optimistic', None] = None) -> None: + if dad == 'disable': + site.fg('sysctl', f'net.ipv6.conf.{ifname}.accept_dad=0', + privilege=True) + elif dad == 'optimistic': + site.fg('sysctl', f'net.ipv6.conf.{ifname}.optimistic_dad=1', + privilege=True) + elif dad is not None: + raise ValueError + + for a in addrs: + site.fg('ip', 'addr', 'add', f'{a.with_prefixlen}', + 'dev', ifname, privilege=True) + + site.fg('ip', 'link', 'set', ifname, 'up', privilege=True) + + +def addrs(site: cmdsite.CmdSite, ifname: str, **criteria: str) \ + -> Sequence[AddrMask]: + info = json.loads(site.output('ip', '-j', 'addr', 'show', f'{ifname}')) + assert len(info) == 1 # We specified a specific interface + + ais = [ai for ai in info[0]['addr_info']] + for key, value in criteria.items(): + ais = [ai for ai in ais if key in ai and ai[key] == value] + + # Return just the parsed, non-tentative addresses + return [ipaddress.ip_interface(f'{ai["local"]}/{ai["prefixlen"]}') + for ai in ais if 'tentative' not in ai] + + +def addr_wait(site: cmdsite.CmdSite, ifname: str, **criteria: str) \ + -> Sequence[AddrMask]: + while True: + a = addrs(site, ifname, **criteria) + if a: + return a + + +def mtu(site: cmdsite.CmdSite, ifname: str) -> int: + cmd = ['ip', '-j', 'link', 'show', ifname] + (info,) = json.loads(site.output(*cmd)) + return cast(int, info['mtu']) + + +def _routes(site: cmdsite.CmdSite, ipv: str, **criteria: str) -> Any: + routes = json.loads(site.output('ip', '-j', f'-{ipv}', 'route')) + for key, value in criteria.items(): + routes = [r for r in routes if key in r and r[key] == value] + + return routes + + +def routes4(site: cmdsite.CmdSite, **criteria: str) -> Any: + return _routes(site, '4', **criteria) + + +def routes6(site: cmdsite.CmdSite, **criteria: str) -> Any: + return _routes(site, '6', **criteria) + + +(a)dataclasses.dataclass +class BaseNetScenario(exeter.Scenario): + """Test that a site has sane looking basic networking""" + site: cmdsite.CmdSite + + @exeter.scenariotest + def has_lo(self) -> None: + assert 'lo' in ifs(self.site) + + @exeter.scenariotest + def lo_addrs(self) -> None: + expected = set(ipaddress.ip_interface(a) + for a in ['127.0.0.1/8', '::1/128']) + assert set(addrs(self.site, 'lo')) == expected + + @exeter.scenariotest + def lo_mtu(self) -> None: + exeter.assert_eq(mtu(self.site, 'lo'), 65536) + + +(a)dataclasses.dataclass +class IsolatedNetScenario(BaseNetScenario): + @exeter.scenariotest + def is_isolated(self) -> None: + exeter.assert_eq(list(ifs(self.site)), ['lo']) + + +def selftests() -> None: + @BaseNetScenario.test + def host() -> Iterator[BaseNetScenario]: + yield BaseNetScenario(cmdsite.BUILD_HOST) + + @IsolatedNetScenario.test + def netns() -> Iterator[IsolatedNetScenario]: + with unshare.unshare("netns", "-Ucn") as ns: + ifup(ns, 'lo') + yield IsolatedNetScenario(ns) + + ifname = 'dummy0' + dummy_ips = {ipaddress.ip_interface(a) for a in + ['192.0.2.1/24', '2001:db8:9a55::1/112', '10.1.2.3/8']} + dummy_routes4 = {i.network for i in dummy_ips + if isinstance(i, ipaddress.IPv4Interface)} + dummy_routes6 = {i.network for i in dummy_ips + if isinstance(i, ipaddress.IPv6Interface)} + dummy_routes6.add(ipaddress.IPv6Network('fe80::/64')) + + @contextlib.contextmanager + def dummy_setup() -> Iterator[cmdsite.CmdSite]: + with unshare.unshare('dummy', '-Un') as site: + site.fg('ip', 'link', 'add', 'type', 'dummy', privilege=True) + ifup(site, 'lo') + ifup(site, ifname, *dummy_ips, dad='disable') + yield site + + @exeter.test + def test_addr() -> None: + with dummy_setup() as site: + actual = set(addrs(site, ifname, scope='global')) + exeter.assert_eq(actual, dummy_ips) + + @exeter.test + def test_routes4() -> None: + with dummy_setup() as site: + actual = set(ipaddress.ip_interface(r['dst']).network + for r in routes4(site, dev=ifname)) + exeter.assert_eq(actual, dummy_routes4) + + @exeter.test + def test_routes6() -> None: + with dummy_setup() as site: + actual = set(ipaddress.ip_interface(r['dst']).network + for r in routes6(site, dev=ifname)) + exeter.assert_eq(actual, dummy_routes6) + + def ipa_test(nets: tuple[Net | str, ...], count: int = 12) -> None: + ipa = IpiAllocator(*nets) + + addrsets: list[set[ipaddress.IPv4Address | ipaddress.IPv6Address]] \ + = [set() for i in range(len(nets))] + for i in range(count): + addrs = ipa.next_ipis() + # Check we got as many addresses as expected + exeter.assert_eq(len(addrs), len(nets)) + for s, a, n in zip(addrsets, addrs, nets): + # Check the addresses belong to the right network + exeter.assert_eq(a.network, ipaddress.ip_network(n)) + s.add(a) + + # Check the addresses are unique + for s in addrsets: + exeter.assert_eq(len(s), count) + + @exeter.test + def ipa_test_default() -> None: + ipa_test(nets=IpiAllocator.DEFAULT_NETS) + + @exeter.test + def ipa_test_custom() -> None: + ipa_test(nets=('10.55.0.0/16', '192.168.55.0/24', + 'fd00:9a57:a000::/48')) diff --git a/test/tasst/selftest/__init__.py b/test/tasst/selftest/__init__.py new file mode 100644 index 00000000..d7742930 --- /dev/null +++ b/test/tasst/selftest/__init__.py @@ -0,0 +1,16 @@ +#! /usr/bin/python3 + +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Copyright Red Hat +# Author: David Gibson <david(a)gibson.dropbear.id.au> + +"""Test A Simple Socket Transport + +selftest/ - Selftests for the tasst library + +Usually, tests for the tasst helper library itself should go next to +the implementation of the thing being tested. Sometimes that's +inconvenient or impossible (usually because it would cause a circular +module dependency). In that case those tests can go here. +""" -- 2.46.0