def nested(*contexts):
with contextlib.ExitStack() as stack:
yield [stack.enter_context(c) for c in contexts]
python类nested()的实例源码
def nested(*contexts):
"""
Reimplementation of nested in python 3.
"""
with ExitStack() as stack:
results = [
stack.enter_context(context)
for context in contexts
]
yield results
def create_upload_func(self, ns, definition, path, operation):
request_schema = definition.request_schema or Schema()
response_schema = definition.response_schema or Schema()
@self.add_route(path, operation, ns)
@wraps(definition.func)
def upload(**path_data):
request_data = load_query_string_data(request_schema)
if not request.files:
raise BadRequest("No files were uploaded")
uploads = [
temporary_upload(name, fileobj)
for name, fileobj
in request.files.items()
if not self.exclude_func(name, fileobj)
]
with nested(*uploads) as files:
response_data = definition.func(files, **merge_data(path_data, request_data))
if response_data is None:
return "", 204
return dump_response_data(response_schema, response_data, operation.value.default_code)
if definition.request_schema:
upload = qs(definition.request_schema)(upload)
if definition.response_schema:
upload = response(definition.response_schema)(upload)
return upload
def nested(*contexts):
with contextlib.ExitStack() as stack:
yield [stack.enter_context(c) for c in contexts]
def flatten(self, *args):
"""Flatten a nested tuple/list of tuples/lists"""
flattened_list = []
for element in args:
if isinstance(element, (tuple, list)):
flattened_list.extend(self.flatten(*element))
else:
flattened_list.extend([element])
return flattened_list
def _create_vm_in_aws_nova(self):
self._create_instance()
self._create_network()
with contextlib.nested(
mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
mock.patch.object(EC2Driver, '_process_network_info'),
mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
) as (mock_image, mock_network, mock_secgrp):
mock_image.return_value = 'ami-1234abc'
mock_network.return_value = (self.subnet_id, '192.168.10.5', None,
None)
mock_secgrp.return_value = []
self._create_nova_vm()
def test_add_ssh_keys_key_exists(self):
fake_key = 'fake_key'
fake_key_data = 'abcdefgh'
self.conn.ec2_conn.import_key_pair(fake_key, fake_key_data)
with contextlib.nested(
mock.patch.object(boto.ec2.EC2Connection, 'get_key_pair'),
mock.patch.object(boto.ec2.EC2Connection, 'import_key_pair'),
) as (fake_get, fake_import):
fake_get.return_value = True
self.conn._add_ssh_keys(fake_key, fake_key_data)
fake_get.assert_called_once_with(fake_key)
fake_import.assert_not_called()
def test_add_ssh_keys_key_absent(self):
fake_key = 'fake_key'
fake_key_data = 'abcdefgh'
with contextlib.nested(
mock.patch.object(boto.ec2.EC2Connection, 'get_key_pair'),
mock.patch.object(boto.ec2.EC2Connection, 'import_key_pair'),
) as (fake_get, fake_import):
fake_get.return_value = False
self.conn._add_ssh_keys(fake_key, fake_key_data)
fake_get.assert_called_once_with(fake_key)
fake_import.assert_called_once_with(fake_key, fake_key_data)
def test_spawn_with_network_error(self):
self._create_instance()
with contextlib.nested(
mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
mock.patch.object(EC2Driver, '_process_network_info'),
mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
) as (mock_image, mock_network, mock_secgrp):
mock_image.return_value = 'ami-1234abc'
mock_network.return_value = (None, None, None, None)
mock_secgrp.return_value = []
self.assertRaises(exception.BuildAbortException,
self._create_nova_vm)
self.reset()
def test_spawn_with_network_error_from_aws(self):
self._create_instance()
with contextlib.nested(
mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
mock.patch.object(EC2Driver, '_process_network_info'),
mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
) as (mock_image, mock_network, mock_secgrp):
mock_image.return_value = 'ami-1234abc'
mock_network.return_value = (None, '192.168.10.5', None, None)
mock_secgrp.return_value = []
self.assertRaises(exception.BuildAbortException,
self._create_nova_vm)
self.reset()
def test_spawn_with_image_error(self):
self._create_instance()
self._create_network()
with contextlib.nested(
mock.patch.object(EC2Driver, '_get_image_ami_id_from_meta'),
mock.patch.object(EC2Driver, '_process_network_info'),
mock.patch.object(EC2Driver, '_get_instance_sec_grps'),
) as (mock_image, mock_network, mock_secgrp):
mock_image.side_effect = exception.BuildAbortException('fake')
mock_network.return_value = ('subnet-1234abc', '192.168.10.5',
None, None)
mock_secgrp.return_value = []
self.assertRaises(exception.BuildAbortException,
self._create_nova_vm)
self.reset()
def test_destroy_instance_not_found(self):
self._create_instance()
with contextlib.nested(
mock.patch.object(boto.ec2.EC2Connection, 'stop_instances'),
mock.patch.object(boto.ec2.EC2Connection, 'terminate_instances'),
mock.patch.object(EC2Driver, '_wait_for_state'),
) as (fake_stop, fake_terminate, fake_wait):
self.conn.destroy(self.context, self.instance, None, None)
fake_stop.assert_not_called()
fake_terminate.assert_not_called()
fake_wait.assert_not_called()
self.reset()
def _stubs(self, network_info, subnet_info, port_info):
self.ipam = quark.ipam.QuarkIpamANY()
with contextlib.nested(
mock.patch("neutron.common.rpc.get_notifier"),
mock.patch("neutron.quota.QUOTAS.limit_check")):
net = network_api.create_network(self.context, network_info)
mac = {'mac_address_range': dict(cidr="AA:BB:CC")}
self.context.is_admin = True
macrng_api.create_mac_address_range(self.context, mac)
self.context.is_admin = False
subnet_info['subnet']['network_id'] = net['id']
port_info['port']['network_id'] = net['id']
sub = subnet_api.create_subnet(self.context, subnet_info)
port = port_api.create_port(self.context, port_info)
yield net, sub, port
def unzip(infile, outfile):
with contextlib.nested(gzip.open(infile, 'rb'), open(outfile, 'w')) as (gz, out):
for data in iter(functools.partial(gz.read, 1024), ''):
out.write(data)
###########################################################################
# END: Utility functions
###########################################################################
def read_files(*file_paths):
files = []
for i, p in enumerate(file_paths):
if p:
files.append(open(p, mode="r"))
print 'Opened:', p
else:
files.append(EmptyFile())
print 'WARNING: no path provided for file {} in list.'.format(i)
with contextlib.nested(*files) as entered_files:
for lines in izip(*entered_files):
yield lines
def read_files(*file_paths):
files = []
for i, p in enumerate(file_paths):
if p:
files.append(open(p, mode="r"))
print 'Opened:', p
else:
files.append(EmptyFile())
print 'WARNING: no path provided for file {} in list.'.format(i)
with contextlib.nested(*files) as entered_files:
for lines in izip(*entered_files):
yield lines
def makeSummaryFiles(iniName):
os.chdir(dirs['tmp'])
datacsv = 'filtered_measurements.csv'
ndatacsv = 'fastest_measurements.csv'
bulkdatacsv = 'all_measurements.csv'
with nested( open( join(datacsv), 'w'),
open( join(ndatacsv), 'w')) as (df,ndf):
try:
#bdf = bz2.BZ2File( join(bulkdatacsv), 'w')
bdf = open( join(bulkdatacsv), 'w')
writeHeader(df); writeHeader(ndf); writeHeader(bdf)
walkDatFiles(df,ndf,bdf)
except (OSError,IOError), err:
if logger: logger.error(err)
finally:
bdf.close()
try:
if dirs['dat_sweep']:
copy2( join(datacsv), dirs['dat_sweep'])
copy2( join(ndatacsv), dirs['dat_sweep'])
copy2( join(bulkdatacsv), dirs['dat_sweep'])
if logger: logger.info('copy %s files to %s','*.csv',dirs['dat_sweep'])
except (OSError,IOError), err:
if logger: logger.error(err)
# =============================
# sweep .code & .log
# =============================
def test_interface_initially_up(self):
combined_id = WloadEndpointId("host_id", "orchestrator_id",
"workload_id", "endpoint_id")
ip_type = futils.IPV4
local_ep = self.create_endpoint(combined_id, ip_type)
ips = ["1.2.3.4"]
iface = "tapabcdef"
data = {
'state': "active",
'endpoint': "endpoint_id",
'mac': stub_utils.get_mac(),
'name': iface,
'ipv4_nets': ips,
'profile_ids': ["prof1"]
}
# We can only get on_interface_update calls after the first
# on_endpoint_update, so trigger that.
with nested(
mock.patch('calico.felix.devices.set_routes'),
mock.patch('calico.felix.devices.configure_interface_ipv4'),
mock.patch('calico.felix.devices.interface_up'),
mock.patch('calico.felix.devices.interface_exists'),
) as [m_set_routes, m_conf, m_iface_up, m_iface_exists]:
m_iface_up.return_value = True
m_iface_exists.return_value = True
local_ep.on_endpoint_update(data, async=True)
self.step_actor(local_ep)
self.assertEqual(local_ep._mac, data['mac'])
self.assertTrue(m_conf.called)
self.assertTrue(local_ep._device_in_sync)
# Interface is up so we should get the routes striaght away.
m_set_routes.assert_called_once_with(ip_type,
set(ips),
iface,
data['mac'],
reset_arp=True)
def test_interface_goes_down_removes_routes(self):
combined_id = WloadEndpointId("host_id", "orchestrator_id",
"workload_id", "endpoint_id")
ip_type = futils.IPV4
local_ep = self.create_endpoint(combined_id, ip_type)
ips = ["1.2.3.4"]
iface = "tapabcdef"
data = {
'state': "active",
'endpoint': "endpoint_id",
'mac': stub_utils.get_mac(),
'name': iface,
'ipv4_nets': ips,
'profile_ids': ["prof1"]
}
# We can only get on_interface_update calls after the first
# on_endpoint_update, so trigger that.
with nested(
mock.patch('calico.felix.devices.set_routes'),
mock.patch('calico.felix.devices.configure_interface_ipv4'),
mock.patch('calico.felix.devices.interface_up'),
mock.patch('calico.felix.devices.interface_exists'),
) as [m_set_routes, m_conf, m_iface_up, m_iface_exists]:
m_iface_up.return_value = True
m_iface_exists.return_value = True
local_ep.on_endpoint_update(data, async=True)
self.step_actor(local_ep)
self.assertTrue(m_conf.called) # Check we did the "UP" path.
# Now pretend send an interface update.
with mock.patch('calico.felix.devices.set_routes') as m_set_routes:
local_ep.on_interface_update(False, async=True)
self.step_actor(local_ep)
m_set_routes.assert_called_once_with(ip_type,
set(),
iface,
None)
self.assertTrue(local_ep._device_in_sync)
def test_configure_interface_ipv6_mainline(self):
"""
Test that configure_interface_ipv6_mainline
- opens and writes to the /proc system to enable proxy NDP on the
interface.
- calls ip -6 neigh to set up the proxy targets.
Mainline test has two proxy targets.
"""
m_open = mock.mock_open()
rc = futils.CommandOutput("", "")
if_name = "tap3e5a2b34222"
proxy_target = "2001::3:4"
open_patch = mock.patch('__builtin__.open', m_open, create=True)
m_check_call = mock.patch('calico.felix.futils.check_call',
return_value=rc)
with nested(open_patch, m_check_call) as (_, m_check_call):
devices.configure_interface_ipv6(if_name, proxy_target)
calls = [mock.call('/proc/sys/net/ipv6/conf/%s/proxy_ndp' %
if_name,
'wb'),
M_ENTER,
mock.call().write('1'),
M_CLEAN_EXIT]
m_open.assert_has_calls(calls)
ip_calls = [mock.call(["ip", "-6", "neigh", "add", "proxy",
str(proxy_target), "dev", if_name])]
m_check_call.assert_has_calls(ip_calls)