def test_content_spec_b_offset_write_is_4G(self):
# 4GiB is just outside 32 bits.
with ExitStack() as resources:
cm = resources.enter_context(
self.assertRaises(GadgetSpecificationError))
parse("""\
volumes:
first-image:
schema: gpt
bootloader: u-boot
structure:
- type: 00000000-0000-0000-0000-0000deadbeef
size: 400M
content:
- image: foo.img
offset-write: 4G
""")
# XXX https://github.com/alecthomas/voluptuous/issues/239
front, colon, end = str(cm.exception).rpartition(':')
self.assertEqual(
front,
'Invalid gadget.yaml @ volumes:first-image:structure:0:content:0')
self.assertIn(end, ['offset-write', 'image'])
python类ExitStack()的实例源码
def test_wrong_content_1(self):
with ExitStack() as resources:
cm = resources.enter_context(
self.assertRaises(GadgetSpecificationError))
parse("""\
volumes:
first-image:
schema: gpt
bootloader: u-boot
structure:
- type: 00000000-0000-0000-0000-0000deadbeef
size: 400M
filesystem: none
content:
- source: subdir/
target: /
""")
self.assertEqual(str(cm.exception),
'filesystem: none missing image file name')
def test_wrong_content_2(self):
with ExitStack() as resources:
cm = resources.enter_context(
self.assertRaises(GadgetSpecificationError))
parse("""\
volumes:
first-image:
schema: gpt
bootloader: u-boot
structure:
- type: 00000000-0000-0000-0000-0000deadbeef
size: 400M
filesystem: ext4
content:
- image: foo.img
""")
self.assertEqual(str(cm.exception),
'filesystem: vfat|ext4 missing source/target')
def test_mbr_structure_not_at_offset_zero_explicit(self):
with ExitStack() as resources:
cm = resources.enter_context(
self.assertRaises(GadgetSpecificationError))
parse("""\
volumes:
first-image:
bootloader: u-boot
structure:
- role: mbr
type: 00000000-0000-0000-0000-0000deadbeef
size: 446
offset: 10
""")
self.assertEqual(str(cm.exception),
'mbr structure must start at offset 0')
def test_mbr_structure_not_at_offset_zero_implicit(self):
with ExitStack() as resources:
cm = resources.enter_context(
self.assertRaises(GadgetSpecificationError))
parse("""\
volumes:
first-image:
bootloader: u-boot
structure:
- type: 00000000-0000-0000-0000-0000deadbeef
size: 2M
- role: mbr
type: 00000000-0000-0000-0000-0000deadbeef
size: 446
""")
self.assertEqual(str(cm.exception),
'mbr structure must start at offset 0')
def test_mkfs_ext4(self):
with ExitStack() as resources:
tmpdir = resources.enter_context(TemporaryDirectory())
results_dir = os.path.join(tmpdir, 'results')
mock = MountMocker(results_dir)
resources.enter_context(
patch('ubuntu_image.helpers.run', mock.run))
# Create a temporary directory and populate it with some stuff.
contents_dir = resources.enter_context(TemporaryDirectory())
with open(os.path.join(contents_dir, 'a.dat'), 'wb') as fp:
fp.write(b'01234')
with open(os.path.join(contents_dir, 'b.dat'), 'wb') as fp:
fp.write(b'56789')
# And a fake image file.
img_file = resources.enter_context(NamedTemporaryFile())
mkfs_ext4(img_file, contents_dir)
# Two files were put in the "mountpoint" directory, but because of
# above, we have to check them in the results copy.
with open(os.path.join(mock.results_dir, 'a.dat'), 'rb') as fp:
self.assertEqual(fp.read(), b'01234')
with open(os.path.join(mock.results_dir, 'b.dat'), 'rb') as fp:
self.assertEqual(fp.read(), b'56789')
def test_mkfs_ext4_no_contents(self):
with ExitStack() as resources:
tmpdir = resources.enter_context(TemporaryDirectory())
results_dir = os.path.join(tmpdir, 'results')
mock = MountMocker(results_dir)
resources.enter_context(
patch('ubuntu_image.helpers.run', mock.run))
# Create a temporary directory, but this time without contents.
contents_dir = resources.enter_context(TemporaryDirectory())
# And a fake image file.
img_file = resources.enter_context(NamedTemporaryFile())
mkfs_ext4(img_file, contents_dir)
# Because there were no contents, the `sudo cp` was never called,
# the mock's shutil.copytree() was also never called, therefore
# the results_dir was never created.
self.assertFalse(os.path.exists(results_dir))
def test_mkfs_ext4_preserve_ownership(self):
with ExitStack() as resources:
tmpdir = resources.enter_context(TemporaryDirectory())
results_dir = os.path.join(tmpdir, 'results')
mock = MountMocker(results_dir)
resources.enter_context(
patch('ubuntu_image.helpers.run', mock.run))
# Create a temporary directory and populate it with some stuff.
contents_dir = resources.enter_context(TemporaryDirectory())
with open(os.path.join(contents_dir, 'a.dat'), 'wb') as fp:
fp.write(b'01234')
# And a fake image file.
img_file = resources.enter_context(NamedTemporaryFile())
mkfs_ext4(img_file, contents_dir, preserve_ownership=True)
with open(os.path.join(mock.results_dir, 'a.dat'), 'rb') as fp:
self.assertEqual(fp.read(), b'01234')
self.assertTrue(mock.preserves_ownership)
def test_hook_fired(self):
with ExitStack() as resources:
hooksdir = resources.enter_context(TemporaryDirectory())
hookfile = os.path.join(hooksdir, 'test-hook')
resultfile = os.path.join(hooksdir, 'result')
env = {'UBUNTU_IMAGE_TEST_ENV': 'true'}
with open(hookfile, 'w') as fp:
fp.write("""\
#!/bin/sh
echo -n "$UBUNTU_IMAGE_TEST_ENV" >>{}
""".format(resultfile))
os.chmod(hookfile, 0o744)
manager = HookManager([hooksdir])
manager.fire('test-hook', env)
# Check if the script ran once as expected.
self.assertTrue(os.path.exists(resultfile))
with open(resultfile, 'r') as fp:
self.assertEqual(fp.read(), 'true')
def test_hook_error(self):
with ExitStack() as resources:
hooksdir = resources.enter_context(TemporaryDirectory())
hookfile = os.path.join(hooksdir, 'test-hook')
with open(hookfile, 'w') as fp:
fp.write(dedent("""\
#!/bin/sh
echo -n "error" 1>&2
exit 1
"""))
os.chmod(hookfile, 0o744)
manager = HookManager([hooksdir])
# Check if hook script failures are properly reported
with self.assertRaises(HookError) as cm:
manager.fire('test-hook')
self.assertEqual(cm.exception.hook_name, 'test-hook')
self.assertEqual(cm.exception.hook_path, hookfile)
self.assertEqual(cm.exception.hook_retcode, 1)
self.assertEqual(cm.exception.hook_stderr, 'error')
def atomic(dst, encoding='utf-8'):
"""Open a temporary file for writing using the given encoding.
The context manager returns an open file object, into which you can write
text or bytes depending on the encoding it was opened with. Upon exit,
the temporary file is moved atomically to the destination. If an
exception occurs, the temporary file is removed.
:param dst: The path name of the target file.
:param encoding: The encoding to use for the open file. If None, then
file is opened in binary mode.
"""
directory = os.path.dirname(dst)
mode = 'wb' if encoding is None else 'wt'
with ExitStack() as resources:
fp = resources.enter_context(NamedTemporaryFile(
mode=mode, encoding=encoding, dir=directory, delete=False))
yield fp
os.rename(fp.name, dst)
def update_changelog(repo, series, version):
# Update d/changelog.
with ExitStack() as resources:
debian_changelog = os.path.join(
repo.working_dir, 'debian', 'changelog')
infp = resources.enter_context(
open(debian_changelog, 'r', encoding='utf-8'))
outfp = resources.enter_context(atomic(debian_changelog))
changelog = Changelog(infp)
# Currently, master is always Zesty.
changelog.distributions = series
series_version = {
'bionic': '18.04',
'artful': '17.10',
'zesty': '17.04',
'xenial': '16.04',
}[series]
new_version = '{}+{}ubuntu1'.format(version, series_version)
changelog.version = new_version
changelog.write_to_open_file(outfp)
return new_version
def __call__(self, f):
@wrapt.decorator
def test_with_fixtures(wrapped, instance, args, kwargs):
fixture_instances = [i for i in list(args) + list(kwargs.values())
if i.__class__ in self.fixture_classes or i.scenario in self.requested_fixtures]
dependency_ordered_fixtures = self.topological_sort_instances(fixture_instances)
if six.PY2:
with contextlib.nested(*list(dependency_ordered_fixtures)):
return wrapped(*args, **kwargs)
else:
with contextlib.ExitStack() as stack:
for fixture in dependency_ordered_fixtures:
stack.enter_context(fixture)
return wrapped(*args, **kwargs)
ff = test_with_fixtures(f)
arg_names = self.fixture_arg_names(ff)
return pytest.mark.parametrize(','.join(arg_names), self.fixture_permutations())(ff)
def test_create_snapshot(self):
raw_traces = [(5, (('a.py', 2),))]
with contextlib.ExitStack() as stack:
stack.enter_context(patch.object(tracemalloc, 'is_tracing',
return_value=True))
stack.enter_context(patch.object(tracemalloc, 'get_traceback_limit',
return_value=5))
stack.enter_context(patch.object(tracemalloc, '_get_traces',
return_value=raw_traces))
snapshot = tracemalloc.take_snapshot()
self.assertEqual(snapshot.traceback_limit, 5)
self.assertEqual(len(snapshot.traces), 1)
trace = snapshot.traces[0]
self.assertEqual(trace.size, 5)
self.assertEqual(len(trace.traceback), 1)
self.assertEqual(trace.traceback[0].filename, 'a.py')
self.assertEqual(trace.traceback[0].lineno, 2)
def setUp(self):
old_log_level = log.getEffectiveLevel()
self.addCleanup(log.setLevel, old_log_level)
self.resources = ExitStack()
# Create a new event loop, and arrange for that loop to end almost
# immediately. This will allow the calls to main() in these tests to
# also exit almost immediately. Otherwise, the foreground test
# process will hang.
#
# I think this introduces a race condition. It depends on whether the
# call_later() can possibly run before the run_forever() does, or could
# cause it to not complete all its tasks. In that case, you'd likely
# get an error or warning on stderr, which may or may not cause the
# test to fail. I've only seen this happen once and don't have enough
# information to know for sure.
default_loop = asyncio.get_event_loop()
loop = asyncio.new_event_loop()
loop.call_later(0.1, loop.stop)
self.resources.callback(asyncio.set_event_loop, default_loop)
asyncio.set_event_loop(loop)
self.addCleanup(self.resources.close)
def test_exception_handler_exception(self):
handler = ErroringErrorHandler()
controller = ErrorController(handler)
controller.start()
self.addCleanup(controller.stop)
with ExitStack() as resources:
# Suppress logging to the console during the tests. Depending on
# timing, the exception may or may not be logged.
resources.enter_context(patch('aiosmtpd.smtp.log.exception'))
client = resources.enter_context(
SMTP(controller.hostname, controller.port))
code, response = client.helo('example.com')
self.assertEqual(code, 500)
self.assertEqual(response,
b'Error: (ValueError) ErroringErrorHandler test')
self.assertIsInstance(handler.error, ValueError)