These are roughly equivalent to the 'context' subsystem in the existing shell test framework. Signed-off-by: David Gibson <david(a)gibson.dropbear.id.au> --- avocado/common.py | 117 +++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 116 insertions(+), 1 deletion(-) diff --git a/avocado/common.py b/avocado/common.py index 036ef3d..94308b8 100644 --- a/avocado/common.py +++ b/avocado/common.py @@ -7,8 +7,15 @@ # Copyright Red Hat # Author: David Gibson <david(a)gibson.dropbear.id.au> +import json +import os.path +import sys + import avocado -from avocado.utils.process import system_output, CmdError +from avocado.utils.process import system_output, SubProcess, CmdError + + +NSTOOL = './test/nstool' class BaseTest(avocado.Test): @@ -19,6 +26,51 @@ class BaseTest(avocado.Test): return system_output(cmd, **kwargs) +class NsTool: + def __init__(self, sockpath): + self.sockpath = sockpath + pid = system_output( + '{} info -wp {}'.format(NSTOOL, sockpath), timeout=1) + self.pid = int(pid) + print('NsTool object: sockpath={} PID={}'.format(sockpath, self.pid), + file=sys.stderr) + + def pid(self): + return self.pid + + def _xcmd(self, cmd, sudo=False): + if sudo: + opts = '--keep-caps' + else: + opts = '' + return '{} exec {} {} -- {}'.format(NSTOOL, opts, self.sockpath, cmd) + + def subprocess(self, cmd, sudo=False, **kwargs): + return SubProcess(self._xcmd(cmd, sudo), **kwargs) + + def system_output(self, cmd, sudo=False, **kwargs): + return system_output(self._xcmd(cmd, sudo), **kwargs) + + +class NsToolUnshare(NsTool): + def __init__(self, workdir, sockname, unshare_opts, parent=None): + sockpath = os.path.join(workdir, sockname) + holdcmd = 'unshare {} -- {} hold {}'.format( + unshare_opts, NSTOOL, sockpath) + if parent is None: + self.holder = SubProcess(holdcmd) + else: + self.holder = parent.subprocess(holdcmd, sudo=True) + + self.holder.start() + super().__init__(sockpath) + + def __del__(self): + cmd = '{} stop {}'.format(NSTOOL, self.sockpath) + system_output(cmd) + self.holder.stop() + + # # Tests for the test infrastructure itself # @@ -29,3 +81,66 @@ class HostExecTests(BaseTest): def test_false(self): self.assertRaises(CmdError, self.hostx, 'false') + + +class UserNsTests(BaseTest): + def setUp(self): + super().setUp() + + self.ns = NsToolUnshare(self.workdir, 'userns', '-Uc') + + def tearDown(self): + del(self.ns) + + super().tearDown() + + def test(self): + capcmd = 'capsh --has-p=CAP_SETUID' + self.assertRaises(CmdError, self.hostx, capcmd) + self.ns.system_output(capcmd, sudo=True) + + +class NestedNsTests(BaseTest): + def setUp(self): + super().setUp() + + self.userns = NsToolUnshare(self.workdir, 'userns', '-Uc') + self.netns = NsToolUnshare( + self.workdir, 'netns', '-n', parent=self.userns) + + def tearDown(self): + del(self.netns) + del(self.userns) + super().tearDown() + + def test_unnested(self): + # Shouldn't have permission to create a netns without nesting + # it in the userns + self.assertRaises(CmdError, NsToolUnshare, + self.workdir, 'netns2', '-n') + + def test_nested(self): + self.netns.system_output('true') + output = self.netns.system_output('ip -j link show') + ifs = json.loads(output) + self.assertEquals(len(ifs), 1) + self.assertEquals(ifs[0]['ifname'], 'lo') + + +class NsConnectTests(BaseTest): + def setUp(self): + super().setUp() + + self.sockpath = os.path.join(self.workdir, 'hostns') + holdcmd = '{} hold {}'.format(NSTOOL, self.sockpath) + self.holder = SubProcess(holdcmd) + self.holder.start() + + def tearDown(self): + self.holder.stop() + + super().tearDown() + + def test_connect(self): + hostns = NsTool(self.sockpath) + hostns.system_output('true') -- 2.40.0