Signed-off-by: David Gibson <david(a)gibson.dropbear.id.au> --- test/Makefile | 3 +- test/tasst/__main__.py | 19 +++- test/tasst/cmdsite.py | 193 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 210 insertions(+), 5 deletions(-) create mode 100644 test/tasst/cmdsite.py diff --git a/test/Makefile b/test/Makefile index 1daf1999..f1632f4d 100644 --- a/test/Makefile +++ b/test/Makefile @@ -70,7 +70,8 @@ EXETER_JOBS = $(EXETER_SH:%.sh=%.json) AVOCADO_JOBS = $(EXETER_JOBS) avocado/static_checkers.json -TASST_SRCS = __init__.py __main__.py +TASST_MODS = $(shell python3 -m tasst --modules) +TASST_SRCS = __init__.py __main__.py $(TASST_MODS) EXETER_META = meta/lint.json meta/tasst.json META_JOBS = $(EXETER_META) diff --git a/test/tasst/__main__.py b/test/tasst/__main__.py index 310e31d7..4ea4c593 100644 --- a/test/tasst/__main__.py +++ b/test/tasst/__main__.py @@ -10,13 +10,24 @@ Test A Simple Socket Transport library of test helpers for passt & pasta """ -import exeter +import importlib +import sys +import exeter -(a)exeter.test -def placeholder() -> None: - pass +MODULES = [ + 'cmdsite', +] if __name__ == '__main__': + if sys.argv[1:] == ["--modules"]: + for m in MODULES: + print(m.replace('.', '/') + '.py') + sys.exit(0) + + for m in MODULES: + mod = importlib.import_module('.' + m, __package__) + mod.selftests() + exeter.main() diff --git a/test/tasst/cmdsite.py b/test/tasst/cmdsite.py new file mode 100644 index 00000000..ea2bdaa3 --- /dev/null +++ b/test/tasst/cmdsite.py @@ -0,0 +1,193 @@ +#! /usr/bin/env python3 + +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Copyright Red Hat +# Author: David Gibson <david(a)gibson.dropbear.id.au> + +""" +Test A Simple Socket Transport + +tasst/snh.py - Simulated network hosts for testing +""" + +from __future__ import annotations + +import contextlib +import enum +import subprocess +import sys +from typing import Any, Iterator, Optional + +import exeter + + +class Capture(enum.Enum): + STDOUT = 1 + + +# We might need our own versions of these eventually, but for now we +# can just alias the ones in subprocess +CompletedCmd = subprocess.CompletedProcess[bytes] +TimeoutExpired = subprocess.TimeoutExpired +CmdError = subprocess.CalledProcessError + + +class RunningCmd: + """ + A background process running on a CmdSite + """ + site: CmdSite + cmd: tuple[str, ...] + check: bool + popen: subprocess.Popen[bytes] + + def __init__(self, site: CmdSite, popen: subprocess.Popen[bytes], + *cmd: str, check: bool = True) -> None: + self.site = site + self.popen = popen + self.cmd = cmd + self.check = check + + def run(self, **kwargs: Any) -> CompletedCmd: + stdout, stderr = self.popen.communicate(**kwargs) + cp = CompletedCmd(self.popen.args, self.popen.returncode, + stdout, stderr) + if self.check: + cp.check_returncode() + return cp + + def terminate(self) -> None: + self.popen.terminate() + + def kill(self) -> None: + self.popen.kill() + + +class CmdSite(exeter.Scenario): + """ + A (usually virtual or simulated) location where we can execute + commands and configure networks. + + """ + name: str + + def __init__(self, name: str) -> None: + self.name = name # For debugging + + def output(self, *cmd: str, **kwargs: Any) -> bytes: + proc = self.fg(*cmd, capture=Capture.STDOUT, **kwargs) + return proc.stdout + + def fg(self, *cmd: str, timeout: Optional[float] = None, **kwargs: Any) \ + -> CompletedCmd: + # We don't use subprocess.run() because it kills without + # attempting to terminate on timeout + with self.bg(*cmd, **kwargs) as proc: + res = proc.run(timeout=timeout) + return res + + def sh(self, script: str, **kwargs: Any) -> None: + for cmd in script.splitlines(): + self.fg(cmd, shell=True, **kwargs) + + @contextlib.contextmanager + def bg(self, *cmd: str, capture: Optional[Capture] = None, + check: bool = True, context_timeout: float = 1.0, **kwargs: Any) \ + -> Iterator[RunningCmd]: + if capture == Capture.STDOUT: + kwargs['stdout'] = subprocess.PIPE + print(f"Site {self.name}: {cmd}", file=sys.stderr) + with self.popen(*cmd, **kwargs) as popen: + proc = RunningCmd(self, popen, *cmd, check=check) + try: + yield proc + finally: + try: + popen.wait(timeout=context_timeout) + except subprocess.TimeoutExpired as e: + popen.terminate() + try: + popen.wait(timeout=context_timeout) + except subprocess.TimeoutExpired: + popen.kill() + raise e + + def popen(self, *cmd: str, **kwargs: Any) -> subprocess.Popen[bytes]: + raise NotImplementedError + + @exeter.scenariotest + def test_true(self) -> None: + self.fg('true') + + @exeter.scenariotest + def test_false(self) -> None: + exeter.assert_raises(CmdError, self.fg, 'false') + + @exeter.scenariotest + def test_echo(self) -> None: + msg = 'Hello tasst' + out = self.output('echo', f'{msg}') + exeter.assert_eq(out, msg.encode('utf-8') + b'\n') + + @exeter.scenariotest + def test_timeout(self) -> None: + exeter.assert_raises(TimeoutExpired, self.fg, + 'sleep', 'infinity', timeout=0.1, check=False) + + @exeter.scenariotest + def test_bg_true(self) -> None: + with self.bg('true') as proc: + proc.run() + + @exeter.scenariotest + def test_bg_false(self) -> None: + with self.bg('false') as proc: + exeter.assert_raises(CmdError, proc.run) + + @exeter.scenariotest + def test_bg_echo(self) -> None: + msg = 'Hello tasst' + with self.bg('echo', f'{msg}', capture=Capture.STDOUT) as proc: + res = proc.run() + exeter.assert_eq(res.stdout, msg.encode('utf-8') + b'\n') + + @exeter.scenariotest + def test_bg_timeout(self) -> None: + with self.bg('sleep', 'infinity') as proc: + exeter.assert_raises(TimeoutExpired, proc.run, timeout=0.1) + proc.terminate() + + @exeter.scenariotest + def test_bg_context_timeout(self) -> None: + def run_timeout() -> None: + with self.bg('sleep', 'infinity', context_timeout=0.1): + pass + exeter.assert_raises(TimeoutExpired, run_timeout) + + +class BuildHost(CmdSite): + """ + Represents the host on which the tests are running (as opposed + to some simulated host created by the tests) + """ + + def __init__(self) -> None: + super().__init__('BUILD_HOST') + + def popen(self, *cmd: str, privilege: bool = False, **kwargs: Any) \ + -> subprocess.Popen[bytes]: + assert not privilege, \ + "BUG: Shouldn't run commands with privilege on host" + return subprocess.Popen(cmd, **kwargs) + + +BUILD_HOST = BuildHost() + + +def build_host() -> Iterator[BuildHost]: + yield BUILD_HOST + + +def selftests() -> None: + CmdSite.test(build_host) -- 2.46.0