Use our existing nstool C helper, add python wrappers to easily run commands in various namespaces. Signed-off-by: David Gibson <david(a)gibson.dropbear.id.au> --- test/tasst/__main__.py | 1 + test/tasst/unshare.py | 166 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+) create mode 100644 test/tasst/unshare.py diff --git a/test/tasst/__main__.py b/test/tasst/__main__.py index 4ea4c593..9cba8985 100644 --- a/test/tasst/__main__.py +++ b/test/tasst/__main__.py @@ -17,6 +17,7 @@ import exeter MODULES = [ 'cmdsite', + 'unshare', ] diff --git a/test/tasst/unshare.py b/test/tasst/unshare.py new file mode 100644 index 00000000..15b760b5 --- /dev/null +++ b/test/tasst/unshare.py @@ -0,0 +1,166 @@ +#! /usr/bin/env python3 + +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Copyright Red Hat +# Author: David Gibson <david(a)gibson.dropbear.id.au> + +""" +Test A Simple Socket Transport + +unshare.py - Create and run commands in Linux namespaces +""" + +import contextlib +import os +import subprocess +import tempfile +from typing import Any, Callable, Iterator + +import exeter + +from . import cmdsite + + +# FIXME: Can this be made more portable? +UNIX_PATH_MAX = 108 + +NSTOOL_BIN = './nstool' + + +class Unshare(cmdsite.CmdSite): + """A bundle of Linux namespaces managed by nstool""" + + sockpath: str + parent: cmdsite.CmdSite + _pid: int + + def __init__(self, name: str, sockpath: str, + parent: cmdsite.CmdSite = cmdsite.BUILD_HOST, + parent_priv: bool = False) -> None: + if len(sockpath) > UNIX_PATH_MAX: + raise ValueError( + f'Unix domain socket path "{sockpath}" is too long' + ) + + super().__init__(name) + self.sockpath = sockpath + self.parent = parent + self.parent_priv = parent_priv + self.parent.fg(NSTOOL_BIN, 'info', '-wp', self.sockpath, timeout=1) + + # PID of the nstool hold process as seen by another site which can + # see the nstool socket (important when using PID namespaces) + def relative_pid(self, relative_to: cmdsite.CmdSite) -> int | None: + cmd = [NSTOOL_BIN, 'info', '-p', self.sockpath] + relpid = int(relative_to.output(*cmd)) + if not relpid: + return None + return relpid + + def popen(self, *cmd: str, privilege: bool = False, + **kwargs: Any) -> subprocess.Popen[bytes]: + hostcmd = [NSTOOL_BIN, 'exec'] + if privilege: + hostcmd.append('--keep-caps') + hostcmd += [self.sockpath, '--'] + hostcmd += list(cmd) + return self.parent.popen(*hostcmd, privilege=self.parent_priv, + **kwargs) + + +(a)contextlib.contextmanager +def unshare(name: str, *opts: str, + parent: cmdsite.CmdSite = cmdsite.BUILD_HOST, + privilege: bool = False) -> Iterator[Unshare]: + # Create path for temporary nstool Unix socket + with tempfile.TemporaryDirectory() as tmpd: + sockpath = os.path.join(tmpd, name) + cmd = ['unshare'] + list(opts) + cmd += ['--', NSTOOL_BIN, 'hold', sockpath] + with parent.bg(*cmd, privilege=privilege) as holder: + try: + yield Unshare(name, sockpath, parent=parent, + parent_priv=privilege) + finally: + try: + parent.fg(NSTOOL_BIN, 'stop', sockpath) + finally: + try: + holder.run(timeout=0.1) + holder.kill() + finally: + try: + os.remove(sockpath) + except FileNotFoundError: + pass + + +def _userns_setup() -> Iterator[cmdsite.CmdSite]: + with unshare('usernetns', '-Ucn') as site: + yield site + + +def _nested_setup() -> Iterator[cmdsite.CmdSite]: + with unshare('userns', '-Uc') as userns: + with unshare('netns', '-n', parent=userns, privilege=True) as netns: + yield netns + + +def _pidns_setup() -> Iterator[cmdsite.CmdSite]: + with unshare('pidns', '-Upfn') as site: + yield site + + +def connect_site() -> Iterator[Unshare]: + with tempfile.TemporaryDirectory() as tmpd: + sockpath = os.path.join(tmpd, 'nons') + holdcmd = [NSTOOL_BIN, 'hold', sockpath] + with subprocess.Popen(holdcmd) as holder: + try: + yield Unshare("fakens", sockpath) + finally: + holder.kill() + os.remove(sockpath) + + +def selftests() -> None: + @exeter.test + def test_userns() -> None: + cmd = ['capsh', '--has-p=CAP_SETUID'] + status = cmdsite.BUILD_HOST.fg(*cmd, check=False) + assert status.returncode != 0 + with unshare('userns', '-Ucn') as ns: + ns.fg(*cmd, privilege=True) + + @exeter.test + def test_relative_pid() -> None: + with unshare('pidns', '-Upfn') as site: + # The holder is init (pid 1) within its own pidns + exeter.assert_eq(site.relative_pid(site), 1) + + def sockdir_cleanup(setup: Callable[[], Iterator[cmdsite.CmdSite]]) \ + -> None: + cm = contextlib.contextmanager(setup) + + def mess(sockpaths: list[str]) -> None: + with cm() as site: + while isinstance(site, Unshare): + sockpaths.append(site.sockpath) + site = site.parent + + sockpaths: list[str] = [] + mess(sockpaths) + assert sockpaths + for path in sockpaths: + assert not os.path.exists(os.path.dirname(path)) + + # General tests for all the nstool examples + for setup in [_userns_setup, _nested_setup, _pidns_setup]: + # Common cmdsite.CmdSite & NetSite tests + cmdsite.CmdSite.test(setup) + + exeter.register(f'{setup.__qualname__}|sockdir_cleanup', + sockdir_cleanup, setup) + + cmdsite.CmdSite.test(connect_site) -- 2.46.0