def reconfigure(**kwargs):
"""Reconfigures the given kwargs, restoring the current configuration for
only those kwargs when the contextmanager exits.
"""
conf_namespace = staticconf.config.get_namespace(namespace)
starting_config = {
k: v for k, v in conf_namespace.get_config_values().iteritems()
if k in kwargs
}
configure_from_dict(kwargs)
try:
yield
finally:
final_config = {
k: v for k, v in conf_namespace.get_config_values().iteritems()
if k not in kwargs
}
final_config.update(starting_config)
staticconf.config.get_namespace(namespace).clear()
configure_from_dict(final_config)
python类contextmanager()的实例源码
def capture_new_data_pipeline_messages(topic):
"""contextmanager that moves to the tail of the given topic, and waits to
receive new messages, returning a function that can be called zero or more
times which will retrieve decoded data pipeline messages from the topic.
Returns:
Callable[[int], List[Message]]: Function that takes a single
optional argument, count, and returns up to count decoded data pipeline
messages. This function does not block, and will return however many
messages are available immediately. Default count is 100.
"""
with capture_new_messages(topic) as get_kafka_messages:
def get_data_pipeline_messages(count=100):
kafka_messages = get_kafka_messages(count)
return [
create_from_offset_and_message(kafka_message)
for kafka_message in kafka_messages
]
yield get_data_pipeline_messages
def randommock():
"""Returns a contextmanager that mocks random.random() at a specific value
Usage::
def test_something(randommock):
with randommock(0.55):
# test stuff...
"""
@contextlib.contextmanager
def _randommock(value):
with mock.patch('random.random') as mock_random:
mock_random.return_value = value
yield
return _randommock
def _get_logger_for_contextmanager(log):
"""Get the canonical logger from a context manager.
Parameters
----------
log : Logger or None
The explicit logger passed to the context manager.
Returns
-------
log : Logger
The logger to use in the context manager.
"""
if log is not None:
return log
# We need to walk up through the context manager, then through
# @contextmanager and finally into the top level calling frame.
return _logger_for_frame(_getframe(3))
def test_generator_based_context_manager_throw():
@contextlib.contextmanager
@_core.enable_ki_protection
def protected_manager():
assert _core.currently_ki_protected()
try:
yield
finally:
assert _core.currently_ki_protected()
with protected_manager():
assert not _core.currently_ki_protected()
with pytest.raises(KeyError):
# This is the one that used to fail
with protected_manager():
raise KeyError
def cross_validation_lock(self):
"""
A contextmanager for running a block with our cross validation lock set
to True.
At the end of the block, the lock's value is restored to its value
prior to entering the block.
"""
if self._cross_validation_lock:
yield
return
else:
try:
self._cross_validation_lock = True
yield
finally:
self._cross_validation_lock = False
def logged_in():
@contextlib.contextmanager
def _login(user):
setattr(ctx_stack.top, 'jwt_user', None)
if isinstance(user, str) and user == 'NOT_LOGGED_IN':
_TOKENS.append(None)
res = None
else:
_TOKENS.append(
flask_jwt.create_access_token(identity=user.id, fresh=True)
)
res = user
yield res
_TOKENS.pop(-1)
setattr(ctx_stack.top, 'jwt_user', None)
yield _login
_TOKENS.clear()
_TOKENS.clear()
def test_weak_keys_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_keyed_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'keys')
self.check_weak_destroy_while_iterating(dict, objects, 'items')
self.check_weak_destroy_while_iterating(dict, objects, 'values')
self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
dict, objects = self.make_weak_keyed_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.items())
next(it)
# Schedule a key/value for removal and recreate it
v = objects.pop().arg
gc.collect() # just in case
yield Object(v), v
finally:
it = None # should commit all removals
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_values_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_valued_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'keys')
self.check_weak_destroy_while_iterating(dict, objects, 'items')
self.check_weak_destroy_while_iterating(dict, objects, 'values')
self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
dict, objects = self.make_weak_valued_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.items())
next(it)
# Schedule a key/value for removal and recreate it
k = objects.pop().arg
gc.collect() # just in case
yield k, Object(k)
finally:
it = None # should commit all removals
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_keys_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_keyed_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs')
dict, objects = self.make_weak_keyed_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.iteritems())
next(it)
# Schedule a key/value for removal and recreate it
v = objects.pop().arg
gc.collect() # just in case
yield Object(v), v
finally:
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_values_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_valued_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
dict, objects = self.make_weak_valued_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.iteritems())
next(it)
# Schedule a key/value for removal and recreate it
k = objects.pop().arg
gc.collect() # just in case
yield k, Object(k)
finally:
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_keys_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_keyed_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs')
dict, objects = self.make_weak_keyed_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.iteritems())
next(it)
# Schedule a key/value for removal and recreate it
v = objects.pop().arg
gc.collect() # just in case
yield Object(v), v
finally:
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_values_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_valued_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
dict, objects = self.make_weak_valued_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.iteritems())
next(it)
# Schedule a key/value for removal and recreate it
k = objects.pop().arg
gc.collect() # just in case
yield k, Object(k)
finally:
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def redirect_stderr(x):
""" Redirects stderr to another file-like object.
This is some compatibility code to support Python 3.4.
"""
if hasattr(contextlib, 'redirect_stderr'):
result = contextlib.redirect_stderr
else:
@contextlib.contextmanager
def result(x):
""" Stand-in for Python 3.5's `redirect_stderr`.
Notes: Non-reentrant, non-threadsafe
"""
old_stderr = sys.stderr
sys.stderr = x
yield
sys.stder = old_stderr
return result(x)
###############################################################################
def test_weak_keys_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_keyed_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'keys')
self.check_weak_destroy_while_iterating(dict, objects, 'items')
self.check_weak_destroy_while_iterating(dict, objects, 'values')
self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
dict, objects = self.make_weak_keyed_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.items())
next(it)
# Schedule a key/value for removal and recreate it
v = objects.pop().arg
gc.collect() # just in case
yield Object(v), v
finally:
it = None # should commit all removals
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_values_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_valued_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'keys')
self.check_weak_destroy_while_iterating(dict, objects, 'items')
self.check_weak_destroy_while_iterating(dict, objects, 'values')
self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
dict, objects = self.make_weak_valued_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.items())
next(it)
# Schedule a key/value for removal and recreate it
k = objects.pop().arg
gc.collect() # just in case
yield k, Object(k)
finally:
it = None # should commit all removals
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def setup_with_context_manager(testcase, cm):
"""
Use a contextmanager in a test setUp that persists until teardown.
So instead of:
with ctxmgr(a, b, c) as v:
# do something with v that only persists for the `with` statement
use:
def setUp(self):
self.v = setup_with_context_manager(self, ctxmgr(a, b, c))
def test_foo(self):
# do something with self.v
"""
val = cm.__enter__()
testcase.addCleanup(cm.__exit__, None, None, None)
return val
def open_expiring(filename, at, unless_modified=True, unless_accessed=True, *args):
"""A contextmanager that provides a open file descriptor to a file that
will be scheduled for expiry on exit.
:param str filename: The file to open and schedule for expiry.
:param at: A datetime object or string as recognized by the `at` command.
:param bool unless_modified: Whether a condition has to be added to the job
expiry script to expire the path only if it has not been modified since
it was scheduled for expiry. Default: True
:param bool unless_accessed: Whether a condition has to be added to the job
expiry script to expire the path only if it has not been accessed since
it was scheduled for expiry. Default: True
:param *args: Any additional arguments to be passed on to the `open` builtin.
:return: An open file object.
:rtype: file
"""
try:
with open(filename, *args) as fd:
yield fd
except:
raise
else:
expire_path(filename, at, unless_modified, unless_accessed)
def cross_validation_lock(self):
"""
A contextmanager for running a block with our cross validation lock set
to True.
At the end of the block, the lock's value is restored to its value
prior to entering the block.
"""
if self._cross_validation_lock:
yield
return
else:
try:
self._cross_validation_lock = True
yield
finally:
self._cross_validation_lock = False
def test_weak_keys_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_keyed_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
self.check_weak_destroy_while_iterating(dict, objects, 'iterkeyrefs')
dict, objects = self.make_weak_keyed_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.iteritems())
next(it)
# Schedule a key/value for removal and recreate it
v = objects.pop().arg
gc.collect() # just in case
yield Object(v), v
finally:
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_values_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_valued_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'iterkeys')
self.check_weak_destroy_while_iterating(dict, objects, 'iteritems')
self.check_weak_destroy_while_iterating(dict, objects, 'itervalues')
self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
dict, objects = self.make_weak_valued_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.iteritems())
next(it)
# Schedule a key/value for removal and recreate it
k = objects.pop().arg
gc.collect() # just in case
yield k, Object(k)
finally:
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
def test_weak_keys_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_keyed_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'keys')
self.check_weak_destroy_while_iterating(dict, objects, 'items')
self.check_weak_destroy_while_iterating(dict, objects, 'values')
self.check_weak_destroy_while_iterating(dict, objects, 'keyrefs')
dict, objects = self.make_weak_keyed_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.items())
next(it)
# Schedule a key/value for removal and recreate it
v = objects.pop().arg
gc.collect() # just in case
yield Object(v), v
finally:
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
# Issue #21173: len() fragile when keys are both implicitly and
# explicitly removed.
dict, objects = self.make_weak_keyed_dict()
self.check_weak_del_and_len_while_iterating(dict, testcontext)
def test_weak_values_destroy_while_iterating(self):
# Issue #7105: iterators shouldn't crash when a key is implicitly removed
dict, objects = self.make_weak_valued_dict()
self.check_weak_destroy_while_iterating(dict, objects, 'keys')
self.check_weak_destroy_while_iterating(dict, objects, 'items')
self.check_weak_destroy_while_iterating(dict, objects, 'values')
self.check_weak_destroy_while_iterating(dict, objects, 'itervaluerefs')
self.check_weak_destroy_while_iterating(dict, objects, 'valuerefs')
dict, objects = self.make_weak_valued_dict()
@contextlib.contextmanager
def testcontext():
try:
it = iter(dict.items())
next(it)
# Schedule a key/value for removal and recreate it
k = objects.pop().arg
gc.collect() # just in case
yield k, Object(k)
finally:
it = None # should commit all removals
gc.collect()
self.check_weak_destroy_and_mutate_while_iterating(dict, testcontext)
dict, objects = self.make_weak_valued_dict()
self.check_weak_del_and_len_while_iterating(dict, testcontext)
def ViewMethod(w, tname, visibility, out_tname, name, params=(), throws=(),
override=False):
view_tname = 'View' if tname == out_tname else out_tname + '.View'
with Method(w, visibility, view_tname, name, params, throws,
override):
@contextlib.contextmanager
def view():
w('return new {}(model) {{', view_tname)
with w.indent():
@contextlib.contextmanager
def enumerator_method(src_var=None):
with EnumeratorMethod(w, 'public', out_tname,
source=src_var and (src_var, 'View.this', tname)) as enumerator:
yield enumerator
yield enumerator_method
yield view
w('}};')
def check_flake8(file_name, contents):
from flake8.main import check_code
if not file_name.endswith(".py"):
raise StopIteration
@contextlib.contextmanager
def stdout_redirect(where):
sys.stdout = where
try:
yield where
finally:
sys.stdout = sys.__stdout__
ignore = {
"W291", # trailing whitespace; the standard tidy process will enforce no trailing whitespace
"E501", # 80 character line length; the standard tidy process will enforce line length
}
output = StringIO.StringIO()
with stdout_redirect(output):
check_code(contents, ignore=ignore)
for error in output.getvalue().splitlines():
_, line_num, _, message = error.split(":", 3)
yield line_num, message.strip()
def z_add_owned_piece(self, p):
#freeze:wasfrozen = False
#freeze:if self.isFrozen:
#freeze: wasfrozen = True
#freeze: self.unfreeze()
self.owned_pieces.add(p)
self._update()
#freeze:if wasfrozen:
#freeze: self.freeze()
#freeze:def freeze(self):
#freeze: self.isFrozen = True
#freeze: self.owned_pieces = list(self.owned_pieces)
#freeze:def unfreeze(self):
#freeze: self.isFrozen = False
#freeze: self.owned_pieces = set(self.owned_pieces)
#freeze:@contextlib.contextmanager
#freeze:def frozen(self):
#freeze: self.freeze()
#freeze: yield
#freeze: self.unfreeze()
def set_game(self, g):
self.game = g
self.cfg.set_game(g)
#freeze:def freeze(self):
#freeze: """Lock the board in place."""
#freeze: self.isfrozen = True
#freeze: #self.pieces = list(self.pieces)
#freeze: for player in self.players:
#freeze: player.freeze()
#freeze:def unfreeze(self):
#freeze: """Unlock the board."""
#freeze: self.isfrozen = False
#freeze: #self.pieces = set(self.pieces)
#freeze: for player in self.players:
#freeze: player.unfreeze()
#freeze:@contextlib.contextmanager
#freeze:def frozen(self):
#freeze: self.freeze()
#freeze: yield
#freeze: self.unfreeze()
def singleton_contextmanager(func):
class CtxManager():
def __init__(self, func):
self.count = 0
self.func_cm = contextmanager(func)
self._lock = RLock()
def __enter__(self):
with self._lock:
if self.count == 0:
self.ctm = self.func_cm()
self.obj = self.ctm.__enter__()
self.count += 1
def __exit__(self, *args):
with self._lock:
self.count -= 1
if self.count > 0:
return
self.ctm.__exit__(*sys.exc_info())
del self.ctm
del self.obj
return CtxManager(func)
def singleton_contextmanager_method(func):
cached_attr_name = '__singleton_contextmanager_method__' + func.__name__
# Wrap with a context manager to get proper IPython documentation
@contextmanager
@wraps(func)
def inner(self):
with _singleton_contextmanager_method_attr_lock:
try:
cm = getattr(self, cached_attr_name)
except AttributeError:
cm = singleton_contextmanager(partial(func, self))
setattr(self, cached_attr_name, cm)
with cm as val:
yield val
return inner
def open(filename, **credentials):
"""
A contextmanager to open the KeePass file with `filename`. Use a `password`
and/or `keyfile` named argument for decryption.
Files are identified using their signature and a reader suitable for
the file format is intialized and returned.
Note: `keyfile` is currently not supported for v3 KeePass files.
"""
kdb = None
try:
with io.open(filename, 'rb') as stream:
signature = common.read_signature(stream)
cls = get_kdb_reader(signature)
kdb = cls(stream, **credentials)
yield kdb
kdb.close()
except:
if kdb:
kdb.close()
raise