def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
python类iterkeys()的实例源码
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def getFieldInfo(self):
"""
Return a list of fields that are known and can be queried.
:return: a list of known fields. Each entry is a dictionary with name,
datatype, and optionally a description.
"""
if self.fieldInfo is None:
# cache the fieldInfo so we don't process all of the documents
# every time.
# TODO: either have a maximum duration or some other method of
# analyzing a subset of the table; on a large table this takes a
# long time.
coll = self.connect()
fields = {}
for result in coll.find():
fields.update(result)
fieldInfo = []
for field in sorted(six.iterkeys(fields)):
fieldInfo.append({'name': field,
'type': 'unknown'})
self.fieldInfo = fieldInfo
return self.fieldInfo
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def get_active_version(self):
if self._active is not None:
return self.supported[self._active]
key = getattr(settings, self.SETTINGS_KEY, {}).get(self.service_type)
if key is None:
# TODO(gabriel): support API version discovery here; we'll leave
# the setting in as a way of overriding the latest available
# version.
key = self.preferred
# Since we do a key lookup in the supported dict the type matters,
# let's ensure people know if they use a string when the key isn't.
if isinstance(key, six.string_types):
msg = ('The version "%s" specified for the %s service should be '
'either an integer or a float, not a string.' %
(key, self.service_type))
raise exceptions.ConfigurationError(msg)
# Provide a helpful error message if the specified version isn't in the
# supported list.
if key not in self.supported:
choices = ", ".join(str(k) for k in six.iterkeys(self.supported))
msg = ('%s is not a supported API version for the %s service, '
' choices are: %s' % (key, self.service_type, choices))
raise exceptions.ConfigurationError(msg)
self._active = key
return self.supported[self._active]
test_multiple_transport_zones_basic_ops.py 文件源码
项目:vmware-nsx-tempest-plugin
作者: openstack
项目源码
文件源码
阅读 25
收藏 0
点赞 0
评论 0
def run_mtz_basic_ops(self, router_type):
self.tp_svrs = {}
router, nets = self._test_router_with_network_and_mtz_networks(
router_type)
servers_client = self.manager.servers_client
for net_id in six.iterkeys(nets):
s_id, network, subnet, security_group = nets[net_id]
"""
servers_client = (self.manager.servers_client if s_id is None
else self.admin_manager.servers_client)
"""
security_groups = [{'name': security_group['id']}]
svr = self.create_server_on_network(
network, security_groups,
name=network['name'],
servers_client=servers_client,
wait_on_boot=False)
self.tp_svrs[net_id] = dict(server=svr, s_id=s_id,
network=network, subnet=subnet,
security_group=security_group,
servers_client=servers_client)
self.wait_for_servers_become_active(self.tp_svrs)
self.run_servers_connectivity_test(self.tp_svrs)
def _find_get_patten_match_single(dic, match_func):
"""
Find a matching key from dict. This function returns value when the
found match is the only match in the dict.
Args:
dic (dict): Mapping from a key (str) to corresponding metric (numeric)
from one output of LogReporter.
match_func (func): A function that takes a variable (str) as input and
returns a bool if it is a valid match.
Returns:
str or None
"""
# match_func(str) -> bool
found_key = None
for k in six.iterkeys(dic):
if match_func(k):
if found_key is None:
found_key = k
else:
# The second match is found
return None
return found_key
def get_callable_args(function, required_only=False):
"""Get names of callable arguments.
Special arguments (like ``*args`` and ``**kwargs``) are not included into
output.
If required_only is True, optional arguments (with default values)
are not included into output.
"""
sig = get_signature(function)
function_args = list(six.iterkeys(sig.parameters))
for param_name, p in six.iteritems(sig.parameters):
if (p.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD)
or (required_only and p.default is not Parameter.empty)):
function_args.remove(param_name)
return function_args
def iterkeys(self):
"""Unsorted iterator over keys"""
patterns = self.patterns
replace = self.replace
seen = set()
for key in six.iterkeys(self.base):
for pattern in patterns:
m = pattern.match(key)
if m:
ret = m.group('key') if replace else m.group()
if ret not in seen:
seen.add(ret)
yield ret
break
for key in DictMethods.iterkeys(self):
if key not in seen:
yield key
def test_name1_tls_sni_01_1(self, mock_poll):
self.mock_net.request_domain_challenges.side_effect = functools.partial(
gen_dom_authzr, challs=acme_util.CHALLENGES)
mock_poll.side_effect = self._validate_all
authzr = self.handler.get_authorizations(["0"])
self.assertEqual(self.mock_net.answer_challenge.call_count, 1)
self.assertEqual(mock_poll.call_count, 1)
chall_update = mock_poll.call_args[0][0]
self.assertEqual(list(six.iterkeys(chall_update)), ["0"])
self.assertEqual(len(chall_update.values()), 1)
self.assertEqual(self.mock_auth.cleanup.call_count, 1)
# Test if list first element is TLSSNI01, use typ because it is an achall
self.assertEqual(
self.mock_auth.cleanup.call_args[0][0][0].typ, "tls-sni-01")
self.assertEqual(len(authzr), 1)
def test_name1_tls_sni_01_1_http_01_1_dns_1(self, mock_poll):
self.mock_net.request_domain_challenges.side_effect = functools.partial(
gen_dom_authzr, challs=acme_util.CHALLENGES, combos=False)
mock_poll.side_effect = self._validate_all
self.mock_auth.get_chall_pref.return_value.append(challenges.HTTP01)
self.mock_auth.get_chall_pref.return_value.append(challenges.DNS01)
authzr = self.handler.get_authorizations(["0"])
self.assertEqual(self.mock_net.answer_challenge.call_count, 3)
self.assertEqual(mock_poll.call_count, 1)
chall_update = mock_poll.call_args[0][0]
self.assertEqual(list(six.iterkeys(chall_update)), ["0"])
self.assertEqual(len(chall_update.values()), 1)
self.assertEqual(self.mock_auth.cleanup.call_count, 1)
# Test if list first element is TLSSNI01, use typ because it is an achall
for achall in self.mock_auth.cleanup.call_args[0][0]:
self.assertTrue(achall.typ in ["tls-sni-01", "http-01", "dns-01"])
# Length of authorizations list
self.assertEqual(len(authzr), 1)
def test_name3_tls_sni_01_3(self, mock_poll):
self.mock_net.request_domain_challenges.side_effect = functools.partial(
gen_dom_authzr, challs=acme_util.CHALLENGES)
mock_poll.side_effect = self._validate_all
authzr = self.handler.get_authorizations(["0", "1", "2"])
self.assertEqual(self.mock_net.answer_challenge.call_count, 3)
# Check poll call
self.assertEqual(mock_poll.call_count, 1)
chall_update = mock_poll.call_args[0][0]
self.assertEqual(len(list(six.iterkeys(chall_update))), 3)
self.assertTrue("0" in list(six.iterkeys(chall_update)))
self.assertEqual(len(chall_update["0"]), 1)
self.assertTrue("1" in list(six.iterkeys(chall_update)))
self.assertEqual(len(chall_update["1"]), 1)
self.assertTrue("2" in list(six.iterkeys(chall_update)))
self.assertEqual(len(chall_update["2"]), 1)
self.assertEqual(self.mock_auth.cleanup.call_count, 1)
self.assertEqual(len(authzr), 3)
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def get_callable_args(function, required_only=False):
"""Get names of callable arguments.
Special arguments (like ``*args`` and ``**kwargs``) are not included into
output.
If required_only is True, optional arguments (with default values)
are not included into output.
"""
sig = get_signature(function)
function_args = list(six.iterkeys(sig.parameters))
for param_name, p in six.iteritems(sig.parameters):
if (p.kind in (Parameter.VAR_POSITIONAL, Parameter.VAR_KEYWORD)
or (required_only and p.default is not Parameter.empty)):
function_args.remove(param_name)
return function_args
ir_lowering_match.py 文件源码
项目:graphql-compiler
作者: kensho-technologies
项目源码
文件源码
阅读 50
收藏 0
点赞 0
评论 0
def _flatten_location_translations(location_translations):
"""If location A translates to B, and B to C, then make A translate directly to C.
Args:
location_translations: dict of Location -> Location, where the key translates to the value.
Mutated in place for efficiency and simplicity of implementation.
"""
sources_to_process = set(six.iterkeys(location_translations))
def _update_translation(source):
"""Return the proper (fully-flattened) translation for the given location."""
destination = location_translations[source]
if destination not in location_translations:
# "destination" cannot be translated, no further flattening required.
return destination
else:
# "destination" can itself be translated -- do so,
# and then flatten "source" to the final translation as well.
sources_to_process.discard(destination)
final_destination = _update_translation(destination)
location_translations[source] = final_destination
return final_destination
while sources_to_process:
_update_translation(sources_to_process.pop())
def _ensure_arguments_are_provided(expected_types, arguments):
"""Ensure that all arguments expected by the query were actually provided."""
# This function only checks that the arguments were specified,
# and does not check types. Type checking is done as part of the actual formatting step.
expected_arg_names = set(six.iterkeys(expected_types))
provided_arg_names = set(six.iterkeys(arguments))
if expected_arg_names != provided_arg_names:
missing_args = expected_arg_names - provided_arg_names
unexpected_args = provided_arg_names - expected_arg_names
raise GraphQLInvalidArgumentError(u'Missing or unexpected arguments found: '
u'missing {}, unexpected '
u'{}'.format(missing_args, unexpected_args))
######
# Public API
######
def sorted_dict(value):
# type: (Mapping) -> Any
"""
Sorts a dict's keys to avoid leaking information about the
backend's handling of unordered dicts.
"""
if isinstance(value, Mapping):
return OrderedDict(
(key, sorted_dict(value[key]))
for key in sorted(iterkeys(value))
)
elif isinstance(value, Sequence) and not isinstance(value, string_types):
return list(map(sorted_dict, value))
else:
return value
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def test_get_product_metadata_path(self):
product = 'S2A_OPER_PRD_MSIL1C_PDMC_20160311T194734_R031_V20160311T011614_20160311T011614'
v = get_product_metadata_path(product)
keys = list(iterkeys(v))
assert len(keys) == 1
assert 'tiles' in v[keys[0]]
assert 'metadata' in v[keys[0]]
assert keys[0] == product
metadata = v[keys[0]]['metadata'].split('/')
tiles = v[keys[0]]['tiles'][0].split('/')
self.assertEqual(metadata[-1], 'metadata.xml')
self.assertEqual(tiles[-1], 'tileInfo.json')
self.assertEqual(len(v[product]['tiles']), 2)
def test_get_product_metadata_path_new_format(self):
product = 'S2A_MSIL1C_20161211T190342_N0204_R113_T10SDG_20161211T190344'
v = get_product_metadata_path(product)
keys = list(iterkeys(v))
assert len(keys) == 1
assert 'tiles' in v[keys[0]]
assert 'metadata' in v[keys[0]]
assert keys[0] == product
metadata = v[keys[0]]['metadata'].split('/')
tiles = v[keys[0]]['tiles'][0].split('/')
self.assertEqual(metadata[-1], 'metadata.xml')
self.assertEqual(tiles[-1], 'tileInfo.json')
self.assertEqual(len(v[product]['tiles']), 1)
def _structs_eq(a, b, comparison_func):
assert isinstance(a, dict)
if not isinstance(b, dict):
return False
# TODO support multiple mappings from same field name.
dict_len = len(a)
if dict_len != len(b):
return False
for a, b in ((a, b), (b, a)):
key_iter = six.iterkeys(a)
while True:
try:
key = next(key_iter)
except StopIteration:
break
if not comparison_func(a[key], b[key]):
return False
return True
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def _add_proposed():
"""Add the PROPOSED_POCKET as /etc/apt/source.list.d/proposed.list
Uses lsb_release()['DISTRIB_CODENAME'] to determine the correct staza for
the deb line.
For intel architecutres PROPOSED_POCKET is used for the release, but for
other architectures PROPOSED_PORTS_POCKET is used for the release.
"""
release = lsb_release()['DISTRIB_CODENAME']
arch = platform.machine()
if arch not in six.iterkeys(ARCH_TO_PROPOSED_POCKET):
raise SourceConfigError("Arch {} not supported for (distro-)proposed"
.format(arch))
with open('/etc/apt/sources.list.d/proposed.list', 'w') as apt:
apt.write(ARCH_TO_PROPOSED_POCKET[arch].format(release))
def CalculatePlaneThickness(self, planesDict):
"""Calculates the plane thickness for each structure."""
planes = []
# Iterate over each plane in the structure
for z in iterkeys(planesDict):
planes.append(float(z))
planes.sort()
# Determine the thickness
thickness = 10000
for n in range(0, len(planes)):
if (n > 0):
newThickness = planes[n] - planes[n-1]
if (newThickness < thickness):
thickness = newThickness
# If the thickness was not detected, set it to 0
if (thickness == 10000):
thickness = 0
return thickness