def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
python类nested()的实例源码
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def test_spawn(self):
self._create_instance()
self._create_network()
with contextlib.nested(
mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
mock.patch.object(EC2Driver, '_process_network_info'),
mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
) as (mock_image, mock_network, mock_secgrp):
mock_image.return_value = 'ami-1234abc'
mock_network.return_value = (self.subnet_id, '192.168.10.5', None,
None)
mock_secgrp.return_value = []
self._create_nova_vm()
fake_instances = self.fake_ec2_conn.get_only_instances()
self.assertEqual(len(fake_instances), 1)
inst = fake_instances[0]
self.assertEqual(inst.vpc_id, self.vpc.id)
self.assertEqual(self.subnet_id, inst.subnet_id)
self.assertEqual(inst.tags['Name'], 'fake_instance')
self.assertEqual(inst.tags['openstack_id'], self.uuid)
self.assertEqual(inst.image_id, 'ami-1234abc')
self.assertEqual(inst.region.name, self.region_name)
self.assertEqual(inst.key_name, 'None')
self.assertEqual(inst.instance_type, 't2.small')
self.reset()
def test_spawn_with_key(self):
self._create_instance(key_name='fake_key', key_data='fake_key_data')
self._create_network()
with contextlib.nested(
mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
mock.patch.object(EC2Driver, '_process_network_info'),
mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
) as (mock_image, mock_network, mock_secgrp):
mock_image.return_value = 'ami-1234abc'
mock_network.return_value = (self.subnet_id, '192.168.10.5', None,
None)
mock_secgrp.return_value = []
self._create_nova_vm()
fake_instances = self.fake_ec2_conn.get_only_instances()
self.assertEqual(len(fake_instances), 1)
inst = fake_instances[0]
self.assertEqual(inst.key_name, 'fake_key')
self.reset()
def test_destory_instance_terminated_on_aws(self):
self._create_vm_in_aws_nova()
fake_instances = self.fake_ec2_conn.get_only_instances()
self.fake_ec2_conn.stop_instances(instance_ids=[fake_instances[0].id])
self.fake_ec2_conn.terminate_instances(
instance_ids=[fake_instances[0].id])
with contextlib.nested(
mock.patch.object(boto.ec2.EC2Connection, 'stop_instances'),
mock.patch.object(boto.ec2.EC2Connection, 'terminate_instances'),
mock.patch.object(EC2Driver, '_wait_for_state'),
) as (fake_stop, fake_terminate, fake_wait):
self.conn.destroy(self.context, self.instance, None, None)
fake_stop.assert_not_called()
fake_terminate.assert_not_called()
fake_wait.assert_not_called()
self.reset()
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def _stubs(self, network_info, subnet_info, ports_info):
self.ipam = quark.ipam.QuarkIpamANY()
with contextlib.nested(
mock.patch("neutron.common.rpc.get_notifier"),
mock.patch("neutron.quota.QUOTAS.limit_check")):
net = network_api.create_network(self.context, network_info)
mac = {'mac_address_range': dict(cidr="AA:BB:CC")}
self.context.is_admin = True
macrng_api.create_mac_address_range(self.context, mac)
self.context.is_admin = False
subnet_info['subnet']['network_id'] = net['id']
sub = subnet_api.create_subnet(self.context, subnet_info)
ports = []
for port_info in ports_info:
port_info['port']['network_id'] = net['id']
ports.append(port_api.create_port(self.context, port_info))
yield net, sub, ports
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def vulture():
""" try to find dead code paths """
with api.quiet():
if not api.local('which vulture').succeeded:
print 'vulture not found, installing it'
api.local('pip install vulture')
ignore_functions_grep = 'egrep -v "{0}"'.format(
'|'.join(VULTURE_IGNORE_FUNCTIONS))
excluded = ",".join(VULTURE_EXCLUDE_PATHS)
excluded_paths = (' --exclude ' + excluded) if excluded else ''
vulture_cmd = '\n vulture {pkg_name}{exclude}{pipes}'
vulture_cmd = vulture_cmd.format(
pkg_name=PKG_NAME,
exclude=excluded_paths,
pipes='|'.join(['', ignore_functions_grep]))
changedir = api.lcd(os.path.dirname(__file__))
warn_only = api.settings(warn_only=True)
be_quit = api.hide('warnings')
with contextlib.nested(changedir, warn_only, be_quit):
result = api.local(vulture_cmd, capture=True)
exit_code = result.return_code
print result.strip()
raise SystemExit(exit_code)
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def _patch_and_run_amira(
self, region_name, queue_name, contents, created_objects):
"""Patches all the external dependencies and runs AMIRA."""
self._results_uploader_mock = MagicMock()
with nested(
patch.object(
S3Handler, '__init__', autospec=True, return_value=None),
patch.object(
S3Handler, 'get_contents_as_string', autospec=True,
side_effect=contents),
patch.object(
SqsHandler, '__init__', autospec=True, return_value=None),
patch.object(
SqsHandler, 'get_created_objects', autospec=True,
side_effect=created_objects),
) as (
__,
self._patched_get_contents_as_string,
__,
self._patched_get_created_objects,
):
amira = AMIRA(region_name, queue_name)
amira.register_results_uploader(self._results_uploader_mock)
amira.run()
def test_run_rule_calls_garbage_collect(ea):
start_time = '2014-09-26T00:00:00Z'
end_time = '2014-09-26T12:00:00Z'
ea.buffer_time = datetime.timedelta(hours=1)
ea.run_every = datetime.timedelta(hours=1)
with contextlib.nested(mock.patch.object(ea.rules[0]['type'], 'garbage_collect'),
mock.patch.object(ea, 'run_query')) as (mock_gc, mock_get_hits):
ea.run_rule(ea.rules[0], ts_to_dt(end_time), ts_to_dt(start_time))
# Running ElastAlert every hour for 12 hours, we should see self.garbage_collect called 12 times.
assert mock_gc.call_count == 12
# The calls should be spaced 1 hour apart
expected_calls = [ts_to_dt(start_time) + datetime.timedelta(hours=i) for i in range(1, 13)]
for e in expected_calls:
mock_gc.assert_any_call(e)
def __call__(self, f):
@wrapt.decorator
def test_with_fixtures(wrapped, instance, args, kwargs):
fixture_instances = [i for i in list(args) + list(kwargs.values())
if i.__class__ in self.fixture_classes or i.scenario in self.requested_fixtures]
dependency_ordered_fixtures = self.topological_sort_instances(fixture_instances)
if six.PY2:
with contextlib.nested(*list(dependency_ordered_fixtures)):
return wrapped(*args, **kwargs)
else:
with contextlib.ExitStack() as stack:
for fixture in dependency_ordered_fixtures:
stack.enter_context(fixture)
return wrapped(*args, **kwargs)
ff = test_with_fixtures(f)
arg_names = self.fixture_arg_names(ff)
return pytest.mark.parametrize(','.join(arg_names), self.fixture_permutations())(ff)
_pyv8runtime.py 文件源码
项目:plugin.video.unofficial9anime
作者: Prometheusx-git
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def _exec_(self, source):
source = '''\
(function() {{
{0};
{1};
}})()'''.format(
encode_unicode_codepoints(self._source),
encode_unicode_codepoints(source)
)
source = str(source)
# backward compatibility
with contextlib.nested(PyV8.JSContext(), PyV8.JSEngine()) as (ctxt, engine):
js_errors = (PyV8.JSError, IndexError, ReferenceError, SyntaxError, TypeError)
try:
script = engine.compile(source)
except js_errors as e:
raise exceptions.RuntimeError(e)
try:
value = script.run()
except js_errors as e:
raise exceptions.ProgramError(e)
return self.convert(value)
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def exec_(self, source):
source = '''\
(function() {{
{0};
{1};
}})()'''.format(
encode_unicode_codepoints(self._source),
encode_unicode_codepoints(source)
)
source = str(source)
import PyV8
import contextlib
#backward compatibility
with contextlib.nested(PyV8.JSContext(), PyV8.JSEngine()) as (ctxt, engine):
js_errors = (PyV8.JSError, IndexError, ReferenceError, SyntaxError, TypeError)
try:
script = engine.compile(source)
except js_errors as e:
raise RuntimeError(e)
try:
value = script.run()
except js_errors as e:
raise ProgramError(e)
return self.convert(value)
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def _exec_(self, source):
source = '''\
(function() {{
{0};
{1};
}})()'''.format(
encode_unicode_codepoints(self._source),
encode_unicode_codepoints(source)
)
source = str(source)
# backward compatibility
with contextlib.nested(PyV8.JSContext(), PyV8.JSEngine()) as (ctxt, engine):
js_errors = (PyV8.JSError, IndexError, ReferenceError, SyntaxError, TypeError)
try:
script = engine.compile(source)
except js_errors as e:
raise exceptions.RuntimeError(e)
try:
value = script.run()
except js_errors as e:
raise exceptions.ProgramError(e)
return self.convert(value)
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def test_pkey_calls_paramiko_RSAKey(self):
with contextlib.nested(
mock.patch('paramiko.RSAKey.from_private_key'),
mock.patch('cStringIO.StringIO')) as (rsa_mock, cs_mock):
cs_mock.return_value = mock.sentinel.csio
pkey = 'mykey'
ssh.Client('localhost', 'root', pkey=pkey)
rsa_mock.assert_called_once_with(mock.sentinel.csio)
cs_mock.assert_called_once_with('mykey')
rsa_mock.reset_mock()
cs_mock.rest_mock()
pkey = mock.sentinel.pkey
# Shouldn't call out to load a file from RSAKey, since
# a sentinel isn't a basestring...
ssh.Client('localhost', 'root', pkey=pkey)
rsa_mock.assert_not_called()
cs_mock.assert_not_called()
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])
def nested(*managers):
exits = []
vars = []
exc = (None, None, None)
try:
for mgr in managers:
exit = mgr.__exit__
enter = mgr.__enter__
vars.append(enter())
exits.append(exit)
yield vars
except:
exc = sys.exc_info()
finally:
while exits:
exit = exits.pop()
try:
if exit(*exc):
exc = (None, None, None)
except:
exc = sys.exc_info()
if exc != (None, None, None):
reraise(exc[0], exc[1], exc[2])