Use our existing nstool C helper, add python wrappers to easily run commands in various namespaces. Signed-off-by: David Gibson <david(a)gibson.dropbear.id.au> --- test/Makefile | 2 +- test/tasst/exesite.py | 8 ++ test/tasst/nstool.py | 168 ++++++++++++++++++++++++++++++++++++++++++ 3 files changed, 177 insertions(+), 1 deletion(-) create mode 100644 test/tasst/nstool.py diff --git a/test/Makefile b/test/Makefile index 58159c83..da542d33 100644 --- a/test/Makefile +++ b/test/Makefile @@ -226,7 +226,7 @@ $(VENV): $(VENV)/bin/pip install -e ./$(PLUGIN) .PHONY: avocado-assets -avocado-assets: +avocado-assets: nstool .PHONY: avocado avocado: avocado-assets $(VENV) diff --git a/test/tasst/exesite.py b/test/tasst/exesite.py index 811b670e..2e15129f 100644 --- a/test/tasst/exesite.py +++ b/test/tasst/exesite.py @@ -161,6 +161,14 @@ def test_site(sitefn): test_has_lo)(sitefn) +def test_isolated_site(sitefn): + def test_isolated_net(s): + with s as site: + assert_eq(site.ifs(), ['lo']) + + return test_output(test_isolated_net)(test_site(sitefn)) + + class RealHost(Site): """Represents the host on which the tests are running (as opposed to some simulated host created by the tests) diff --git a/test/tasst/nstool.py b/test/tasst/nstool.py new file mode 100644 index 00000000..f05c420d --- /dev/null +++ b/test/tasst/nstool.py @@ -0,0 +1,168 @@ +#! /usr/bin/env avocado-runner-avocado-classless + +# SPDX-License-Identifier: GPL-2.0-or-later +# +# Copyright Red Hat +# Author: David Gibson <david(a)gibson.dropbear.id.au> + +""" +Test A Simple Socket Transport + +nstool.py - Run commands in namespaces via 'nstool' +""" + +import contextlib +import os +import tempfile + +from avocado.utils.process import CmdError +from avocado_classless.test import assert_eq, assert_raises, test_output + +from tasst.exesite import Site, REAL_HOST, test_isolated_site, test_site +from tasst.typecheck import typecheck + +# FIXME: Can this be made more portable? # pylint: disable=W0511 +UNIX_PATH_MAX = 108 + +NSTOOL_BIN = './nstool' + + +class NsToolSite(Site): + """A bundle of Linux namespaces managed by nstool""" + + def __init__(self, name, sockpath, parent=REAL_HOST): + if len(sockpath) > UNIX_PATH_MAX: + raise ValueError( + f'Unix domain socket path "{sockpath}" is too long' + ) + + super().__init__(name) + self.sockpath = typecheck(sockpath, str) + self.parent = typecheck(parent, Site) + self._pid = None + + def __enter__(self): + pid = self.parent.output(f'{NSTOOL_BIN} info -wp {self.sockpath}', + verbose=False, timeout=1) + self._pid = int(pid) + return self + + def __exit__(self, *exc_details): + pass + + # PID of the nstool hold process as seen by the test host + def pid(self): + return self._pid + + # PID of the nstool hold process as seen by another site + # (important when using PID namespaces) + def relative_pid(self, relative_to): + relpid = relative_to.output(f'{NSTOOL_BIN} info -p {self.sockpath}') + return int(relpid) + + def hostify(self, cmd, *, sudo=False, **kwargs): + nst_args = self.sockpath + if sudo: + nst_args = '--keep-caps ' + nst_args + return f'{NSTOOL_BIN} exec {nst_args} -- {cmd}', kwargs + + +(a)contextlib.contextmanager +def unshare_site(nsname, unshare_opts, parent=REAL_HOST, sudo=False): + unshare_opts = typecheck(unshare_opts, str) + parent = typecheck(parent, Site) + sudo = typecheck(sudo, bool) + parent.require_cmds('unshare', NSTOOL_BIN) + + # Create path for temporary nstool Unix socket + # + # Using Avocado's workdir often gives paths that are too lonhg for + # Unix sockets + with tempfile.TemporaryDirectory() as tmpd: + sockpath = os.path.join(tmpd, nsname) + holdcmd = f'unshare {unshare_opts} -- {NSTOOL_BIN} hold {sockpath}' + with parent.bg(holdcmd, sudo=sudo) as holder: + try: + with NsToolSite(nsname, sockpath, parent=parent) as site: + yield site + finally: + try: + parent.fg(f'{NSTOOL_BIN} stop {sockpath}') + finally: + try: + holder.run(timeout=0.1) + finally: + try: + os.remove(sockpath) + except FileNotFoundError: + pass + + +TEST_EXC = ValueError + + +def test_sockdir_cleanup(s): + def mess(sockpaths): + with s as site: + ns = site + while isinstance(ns, NsToolSite): + sockpaths.append(ns.sockpath) + ns = ns.parent + raise TEST_EXC + + sockpaths = [] + assert_raises(TEST_EXC, mess, sockpaths) + assert sockpaths + for path in sockpaths: + assert not os.path.exists(os.path.dirname(path)) + + +def test_userns(nstool_site): + REAL_HOST.require_cmds('capsh') + with nstool_site as ns: + ns.require_cmds('capsh') + capcmd = 'capsh --has-p=CAP_SETUID' + assert_raises(CmdError, REAL_HOST.fg, capcmd) + ns.fg(capcmd, sudo=True) + + +@test_output(test_userns, test_sockdir_cleanup) +@test_isolated_site +def userns_site(): + return unshare_site('usernetns', '-Ucn') + + +@test_output(test_sockdir_cleanup) +@test_isolated_site +(a)contextlib.contextmanager +def nested_site(): + with unshare_site('userns', '-Uc') as userns: + with unshare_site('netns', '-n', parent=userns, sudo=True) as netns: + yield netns + + +def test_relative_pid(s): + with s as site: + # The holder is init (pid 1) within its own pidns + assert_eq(site.relative_pid(site), 1) + + +@test_output(test_relative_pid, test_sockdir_cleanup) +@test_isolated_site +def pidns_site(): + return unshare_site('pidns', '-Upfn') + + +@test_site +(a)contextlib.contextmanager +def connect_site(): + with tempfile.TemporaryDirectory() as tmpd: + sockpath = os.path.join(tmpd, 'nons') + holdcmd = f'{NSTOOL_BIN} hold {sockpath}' + try: + with REAL_HOST.bg(holdcmd, ignore_status=True, + context_timeout=0.1): + with NsToolSite("fake ns", sockpath) as site: + yield site + finally: + os.remove(sockpath) -- 2.41.0