Use our existing nstool C helper, add python wrappers to easily run
commands in various namespaces.
Signed-off-by: David Gibson
---
test/tasst/__main__.py | 1 +
test/tasst/unshare.py | 166 +++++++++++++++++++++++++++++++++++++++++
2 files changed, 167 insertions(+)
create mode 100644 test/tasst/unshare.py
diff --git a/test/tasst/__main__.py b/test/tasst/__main__.py
index 4ea4c593..9cba8985 100644
--- a/test/tasst/__main__.py
+++ b/test/tasst/__main__.py
@@ -17,6 +17,7 @@ import exeter
MODULES = [
'cmdsite',
+ 'unshare',
]
diff --git a/test/tasst/unshare.py b/test/tasst/unshare.py
new file mode 100644
index 00000000..15b760b5
--- /dev/null
+++ b/test/tasst/unshare.py
@@ -0,0 +1,166 @@
+#! /usr/bin/env python3
+
+# SPDX-License-Identifier: GPL-2.0-or-later
+#
+# Copyright Red Hat
+# Author: David Gibson
+
+"""
+Test A Simple Socket Transport
+
+unshare.py - Create and run commands in Linux namespaces
+"""
+
+import contextlib
+import os
+import subprocess
+import tempfile
+from typing import Any, Callable, Iterator
+
+import exeter
+
+from . import cmdsite
+
+
+# FIXME: Can this be made more portable?
+UNIX_PATH_MAX = 108
+
+NSTOOL_BIN = './nstool'
+
+
+class Unshare(cmdsite.CmdSite):
+ """A bundle of Linux namespaces managed by nstool"""
+
+ sockpath: str
+ parent: cmdsite.CmdSite
+ _pid: int
+
+ def __init__(self, name: str, sockpath: str,
+ parent: cmdsite.CmdSite = cmdsite.BUILD_HOST,
+ parent_priv: bool = False) -> None:
+ if len(sockpath) > UNIX_PATH_MAX:
+ raise ValueError(
+ f'Unix domain socket path "{sockpath}" is too long'
+ )
+
+ super().__init__(name)
+ self.sockpath = sockpath
+ self.parent = parent
+ self.parent_priv = parent_priv
+ self.parent.fg(NSTOOL_BIN, 'info', '-wp', self.sockpath, timeout=1)
+
+ # PID of the nstool hold process as seen by another site which can
+ # see the nstool socket (important when using PID namespaces)
+ def relative_pid(self, relative_to: cmdsite.CmdSite) -> int | None:
+ cmd = [NSTOOL_BIN, 'info', '-p', self.sockpath]
+ relpid = int(relative_to.output(*cmd))
+ if not relpid:
+ return None
+ return relpid
+
+ def popen(self, *cmd: str, privilege: bool = False,
+ **kwargs: Any) -> subprocess.Popen[bytes]:
+ hostcmd = [NSTOOL_BIN, 'exec']
+ if privilege:
+ hostcmd.append('--keep-caps')
+ hostcmd += [self.sockpath, '--']
+ hostcmd += list(cmd)
+ return self.parent.popen(*hostcmd, privilege=self.parent_priv,
+ **kwargs)
+
+
+@contextlib.contextmanager
+def unshare(name: str, *opts: str,
+ parent: cmdsite.CmdSite = cmdsite.BUILD_HOST,
+ privilege: bool = False) -> Iterator[Unshare]:
+ # Create path for temporary nstool Unix socket
+ with tempfile.TemporaryDirectory() as tmpd:
+ sockpath = os.path.join(tmpd, name)
+ cmd = ['unshare'] + list(opts)
+ cmd += ['--', NSTOOL_BIN, 'hold', sockpath]
+ with parent.bg(*cmd, privilege=privilege) as holder:
+ try:
+ yield Unshare(name, sockpath, parent=parent,
+ parent_priv=privilege)
+ finally:
+ try:
+ parent.fg(NSTOOL_BIN, 'stop', sockpath)
+ finally:
+ try:
+ holder.run(timeout=0.1)
+ holder.kill()
+ finally:
+ try:
+ os.remove(sockpath)
+ except FileNotFoundError:
+ pass
+
+
+def _userns_setup() -> Iterator[cmdsite.CmdSite]:
+ with unshare('usernetns', '-Ucn') as site:
+ yield site
+
+
+def _nested_setup() -> Iterator[cmdsite.CmdSite]:
+ with unshare('userns', '-Uc') as userns:
+ with unshare('netns', '-n', parent=userns, privilege=True) as netns:
+ yield netns
+
+
+def _pidns_setup() -> Iterator[cmdsite.CmdSite]:
+ with unshare('pidns', '-Upfn') as site:
+ yield site
+
+
+def connect_site() -> Iterator[Unshare]:
+ with tempfile.TemporaryDirectory() as tmpd:
+ sockpath = os.path.join(tmpd, 'nons')
+ holdcmd = [NSTOOL_BIN, 'hold', sockpath]
+ with subprocess.Popen(holdcmd) as holder:
+ try:
+ yield Unshare("fakens", sockpath)
+ finally:
+ holder.kill()
+ os.remove(sockpath)
+
+
+def selftests() -> None:
+ @exeter.test
+ def test_userns() -> None:
+ cmd = ['capsh', '--has-p=CAP_SETUID']
+ status = cmdsite.BUILD_HOST.fg(*cmd, check=False)
+ assert status.returncode != 0
+ with unshare('userns', '-Ucn') as ns:
+ ns.fg(*cmd, privilege=True)
+
+ @exeter.test
+ def test_relative_pid() -> None:
+ with unshare('pidns', '-Upfn') as site:
+ # The holder is init (pid 1) within its own pidns
+ exeter.assert_eq(site.relative_pid(site), 1)
+
+ def sockdir_cleanup(setup: Callable[[], Iterator[cmdsite.CmdSite]]) \
+ -> None:
+ cm = contextlib.contextmanager(setup)
+
+ def mess(sockpaths: list[str]) -> None:
+ with cm() as site:
+ while isinstance(site, Unshare):
+ sockpaths.append(site.sockpath)
+ site = site.parent
+
+ sockpaths: list[str] = []
+ mess(sockpaths)
+ assert sockpaths
+ for path in sockpaths:
+ assert not os.path.exists(os.path.dirname(path))
+
+ # General tests for all the nstool examples
+ for setup in [_userns_setup, _nested_setup, _pidns_setup]:
+ # Common cmdsite.CmdSite & NetSite tests
+ cmdsite.CmdSite.test(setup)
+
+ exeter.register(f'{setup.__qualname__}|sockdir_cleanup',
+ sockdir_cleanup, setup)
+
+ cmdsite.CmdSite.test(connect_site)
--
2.46.0