def test_weak_keys_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_keyed_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'keys')
self.check_weak_destroy_while_iterating(dict, objects, 'items')
self.check_weak_destroy_while_iterating(dict, objects, 'values')
self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
dict, objects = self.make_weak_keyed_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.items())
next(it)
# Schedule a key/value for removal and recreate it
v = objects.pop().arg
gc.collect() # just in case
yield Object(v), v
finally:
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
python类contextmanager()的实例源码
def test_weak_values_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_valued_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'keys')
self.check_weak_destroy_while_iterating(dict, objects, 'items')
self.check_weak_destroy_while_iterating(dict, objects, 'values')
self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
dict, objects = self.make_weak_valued_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.items())
next(it)
# Schedule a key/value for removal and recreate it
k = objects.pop().arg
gc.collect() # just in case
yield k, Object(k)
finally:
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def run_xz_decompress(stdout, stderr=None):
"""Run xz --decompress and return a contextmanager object for the proc."""
xz = None
try:
xz = subprocess.Popen(["xz", "--decompress", "--stdout"],
stdin=subprocess.PIPE, stdout=stdout,
stderr=stderr)
yield xz
finally:
if not xz:
raise OSError("You must have an 'xz' binary in PATH.")
xz.stdin.close()
result = xz.wait()
if result != 0:
raise IOError("xz --decompress returned %d." % result)
def redirect_stderr(x):
""" Redirects stderr to another file-like object.
This is some compatibility code to support Python 3.4.
"""
if hasattr(contextlib, 'redirect_stderr'):
result = contextlib.redirect_stderr
else:
@contextlib.contextmanager
def result(x):
""" Stand-in for Python 3.5's `redirect_stderr`.
Notes: Non-reentrant, non-threadsafe
"""
old_stderr = sys.stderr
sys.stderr = x
yield
sys.stder = old_stderr
return result(x)
###############################################################################
def query2(self, query, bindata=None):
"""
Alias for query method which make use of the python 'with' statement.
The contextmanager approach ensures that the generated cursor object instance
is always closed whenever the execution goes out of the 'with' statement block.
Example:
>> with self.console.storage.query(query) as cursor:
>> if not cursor.EOF:
>> cursor.getRow()
>> ...
:param query: The query to execute.
:param bindata: Data to bind to the given query.
"""
cursor = None
try:
cursor = self.query(query, bindata)
yield cursor
finally:
if cursor:
cursor.close()
def reconfigure(ns=namespace, **kwargs):
"""Reconfigures the given kwargs, restoring the current configuration for
only those kwargs when the contextmanager exits.
Args:
ns: Namespace of the conf
"""
conf_namespace = staticconf.config.get_namespace(ns)
starting_config = {
k: v for k, v in conf_namespace.get_config_values().iteritems()
if k in kwargs
}
staticconf.DictConfiguration(kwargs, namespace=ns)
try:
yield
finally:
final_config = {
k: v for k, v in conf_namespace.get_config_values().iteritems()
if k not in kwargs
}
final_config.update(starting_config)
staticconf.config.get_namespace(ns).clear()
staticconf.DictConfiguration(final_config, namespace=ns)
def _register_signal_handlers(self):
"""Register the handler SIGUSR2, which will toggle a profiler on and off.
"""
try:
signal.signal(signal.SIGINT, self._handle_shutdown_signal)
signal.signal(signal.SIGTERM, self._handle_shutdown_signal)
signal.signal(signal.SIGUSR2, self._handle_profiler_signal)
yield
finally:
# Cleanup for the profiler signal handler has to happen here,
# because signals that are handled don't unwind up the stack in the
# way that normal methods do. Any contextmanager or finally
# statement won't live past the handler function returning.
signal.signal(signal.SIGUSR2, signal.SIG_DFL)
if self._profiler_running:
self._disable_profiler()
test_driver.py 文件源码
项目:Trusted-Platform-Module-nova
作者: BU-NU-CLOUD-SP16
项目源码
文件源码
阅读 20
收藏 0
点赞 0
评论 0
def test_create_without_pause(self):
self.flags(virt_type='lxc', group='libvirt')
@contextlib.contextmanager
def fake_lxc_disk_handler(*args, **kwargs):
yield
drvr = libvirt_driver.LibvirtDriver(fake.FakeVirtAPI(), False)
instance = objects.Instance(**self.test_instance)
with test.nested(
mock.patch.object(drvr, '_lxc_disk_handler',
side_effect=fake_lxc_disk_handler),
mock.patch.object(drvr, 'plug_vifs'),
mock.patch.object(drvr, 'firewall_driver'),
mock.patch.object(drvr, '_create_domain'),
mock.patch.object(drvr, 'cleanup')) as (
_handler, cleanup, firewall_driver, create, plug_vifs):
domain = drvr._create_domain_and_network(self.context, 'xml',
instance, None, None)
self.assertEqual(0, create.call_args_list[0][1]['pause'])
self.assertEqual(0, domain.resume.call_count)
def test_contextmanager_finally(self):
state = []
@contextmanager
def woohoo():
state.append(1)
try:
yield 42
finally:
state.append(999)
with self.assertRaises(ZeroDivisionError):
with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError()
self.assertEqual(state, [1, 42, 999])
def test_contextmanager_except(self):
state = []
@contextmanager
def woohoo():
state.append(1)
try:
yield 42
except ZeroDivisionError as e:
state.append(e.args[0])
self.assertEqual(state, [1, 42, 999])
with woohoo() as x:
self.assertEqual(state, [1])
self.assertEqual(x, 42)
state.append(x)
raise ZeroDivisionError(999)
self.assertEqual(state, [1, 42, 999])
def test_contextmanager_except_pep479(self):
code = """\
from __future__ import generator_stop
from contextlib import contextmanager
@contextmanager
def woohoo():
yield
"""
locals = {}
exec(code, locals, locals)
woohoo = locals['woohoo']
stop_exc = StopIteration('spam')
try:
with woohoo():
raise stop_exc
except Exception as ex:
self.assertIs(ex, stop_exc)
else:
self.fail('StopIteration was suppressed')
def test_contextmanager_as_decorator(self):
@contextmanager
def woohoo(y):
state.append(y)
yield
state.append(999)
state = []
@woohoo(1)
def test(x):
self.assertEqual(state, [1])
state.append(x)
test('something')
self.assertEqual(state, [1, 'something', 999])
# Issue #11647: Ensure the decorated function is 'reusable'
state = []
test('something else')
self.assertEqual(state, [1, 'something else', 999])
# Detailed exception chaining checks only make sense on Python 3
def cross_validation_lock(self):
"""
A contextmanager for running a block with our cross validation lock set
to True.
At the end of the block, the lock's value is restored to its value
prior to entering the block.
"""
if self._cross_validation_lock:
yield
return
else:
try:
self._cross_validation_lock = True
yield
finally:
self._cross_validation_lock = False
def setup_with_context_manager(testcase, cm):
"""Use a contextmanager to setUp a test case.
If you have a context manager you like::
with ctxmgr(a, b, c) as v:
# do something with v
and you want to have that effect for a test case, call this function from
your setUp, and it will start the context manager for your test, and end it
when the test is done::
def setUp(self):
self.v = setup_with_context_manager(self, ctxmgr(a, b, c))
def test_foo(self):
# do something with self.v
"""
val = cm.__enter__()
testcase.addCleanup(cm.__exit__, None, None, None)
return val
def contextmanager(fn):
def oops(*args, **kw):
raise RuntimeError("Python 2.5 or above is required to use "
"context managers.")
oops.__name__ = fn.__name__
return oops
def observation(observe, notify):
"""Simple boilerplate to link to the 'with' statement.
Contextlib's contextmanager decorator is a very convenient way to
create simple context managers, specifically the __enter__ and
__exit__ special methods.
"""
proxy = Observation(observe, notify)
try:
yield proxy
finally:
proxy.close()
def named_context(self, name):
@contextlib.contextmanager
def context():
self.named_contexts.append(name)
try:
yield
finally:
self.assertEqual(self.named_contexts.pop(), name)
return context
def named_context(self, name):
@contextlib.contextmanager
def context():
self.named_contexts.append(name)
try:
yield
finally:
self.assertEqual(self.named_contexts.pop(), name)
return context
def named_context(self, name):
@contextlib.contextmanager
def context():
self.named_contexts.append(name)
try:
yield
finally:
self.assertEqual(self.named_contexts.pop(), name)
return context
def _validContext(func):
# Defined inside USBContext so we can access "self.__*".
@contextlib.contextmanager
def refcount(self):
with self.__context_cond:
if not self.__context_p and self.__auto_open:
# BBB
warnings.warn(
'Use "with USBContext() as context:" for safer cleanup'
' on interpreter shutdown. See also USBContext.open().',
DeprecationWarning,
)
self.open()
self.__context_refcount += 1
try:
yield
finally:
with self.__context_cond:
self.__context_refcount -= 1
if not self.__context_refcount:
self.__context_cond.notifyAll()
if inspect.isgeneratorfunction(func):
def wrapper(self, *args, **kw):
with refcount(self):
if self.__context_p:
# pylint: disable=not-callable
for value in func(self, *args, **kw):
# pylint: enable=not-callable
yield value
else:
def wrapper(self, *args, **kw):
with refcount(self):
if self.__context_p:
# pylint: disable=not-callable
return func(self, *args, **kw)
# pylint: enable=not-callable
functools.update_wrapper(wrapper, func)
return wrapper
# pylint: enable=no-self-argument,protected-access