def connection_from_pool_key(self, pool_key):
"""
Get a :class:`ConnectionPool` based on the provided pool key.
``pool_key`` should be a namedtuple that only contains immutable
objects. At a minimum it must have the ``scheme``, ``host``, and
``port`` fields.
"""
with self.pools.lock:
# If the scheme, host, or port doesn't match existing open
# connections, open a new ConnectionPool.
pool = self.pools.get(pool_key)
if pool:
return pool
# Make a fresh ConnectionPool of the desired type
pool = self._new_pool(pool_key.scheme, pool_key.host, pool_key.port)
self.pools[pool_key] = pool
return pool
python类namedtuple()的实例源码
def driver_version(self):
"""
collections.namedtuple: Indicates the major, minor and update
portions of the installed version of NI-DAQmx.
- major_version (int): Indicates the major portion of the
installed version of NI-DAQmx, such as 7 for version 7.0.
- minor_version (int): Indicates the minor portion of the
installed version of NI-DAQmx, such as 0 for version 7.0.
- update_version (int): Indicates the update portion of the
installed version of NI-DAQmx, such as 1 for version 9.0.1.
"""
DriverVersion = collections.namedtuple(
'DriverVersion', ['major_version', 'minor_version',
'update_version'])
return DriverVersion(self._major_version, self._minor_version,
self._update_version)
def generate_leaders(*, leader_factory=None):
"""Creates a generator for the leaderboard.
Generates a 4-tuple containing the ranking, score, user object, and
registration information of the leader. The result is a namedtuple.
"""
if leader_factory is None:
leader_factory = Leader
cursor = db.get_cursor()
cursor.execute("""
SELECT "serial", "score"
FROM "user"
ORDER BY "score" DESC
""")
for ranking, (serial, score) in enumerate(cursor, 1):
user = User(serial=serial)
registration = registrations.get_registration(serial=serial)
yield leader_factory(
ranking=ranking, score=score,
user=user, registration=registration,
)
def connection_from_pool_key(self, pool_key, request_context=None):
"""
Get a :class:`ConnectionPool` based on the provided pool key.
``pool_key`` should be a namedtuple that only contains immutable
objects. At a minimum it must have the ``scheme``, ``host``, and
``port`` fields.
"""
with self.pools.lock:
# If the scheme, host, or port doesn't match existing open
# connections, open a new ConnectionPool.
pool = self.pools.get(pool_key)
if pool:
return pool
# Make a fresh ConnectionPool of the desired type
scheme = request_context['scheme']
host = request_context['host']
port = request_context['port']
pool = self._new_pool(scheme, host, port, request_context=request_context)
self.pools[pool_key] = pool
return pool
def connection_from_pool_key(self, pool_key):
"""
Get a :class:`ConnectionPool` based on the provided pool key.
``pool_key`` should be a namedtuple that only contains immutable
objects. At a minimum it must have the ``scheme``, ``host``, and
``port`` fields.
"""
with self.pools.lock:
# If the scheme, host, or port doesn't match existing open
# connections, open a new ConnectionPool.
pool = self.pools.get(pool_key)
if pool:
return pool
# Make a fresh ConnectionPool of the desired type
pool = self._new_pool(pool_key.scheme, pool_key.host, pool_key.port)
self.pools[pool_key] = pool
return pool
def iter_units_for_relation_name(relation_name):
"""Iterate through all units in a relation
Generator that iterates through all the units in a relation and yields
a named tuple with rid and unit field names.
Usage:
data = [(u.rid, u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param relation_name: string relation name
:yield: Named Tuple with rid and unit field names
"""
RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
for rid in relation_ids(relation_name):
for unit in related_units(rid):
yield RelatedUnit(rid, unit)
def iter_units_for_relation_name(relation_name):
"""Iterate through all units in a relation
Generator that iterates through all the units in a relation and yields
a named tuple with rid and unit field names.
Usage:
data = [(u.rid, u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param relation_name: string relation name
:yield: Named Tuple with rid and unit field names
"""
RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
for rid in relation_ids(relation_name):
for unit in related_units(rid):
yield RelatedUnit(rid, unit)
def parse_argspec(callable_):
"""
Takes a callable and returns a tuple with the list of Argument objects,
the name of *args, and the name of **kwargs.
If *args or **kwargs is not present, it will be None.
This returns a namedtuple called Argspec that has three fields named:
args, starargs, and kwargs.
"""
args, varargs, keywords, defaults = inspect.getargspec(callable_)
defaults = list(defaults or [])
if getattr(callable_, '__self__', None) is not None:
# This is a bound method, drop the self param.
args = args[1:]
first_default = len(args) - len(defaults)
return Argspec(
[Argument(arg, Argument.no_default
if n < first_default else defaults[n - first_default])
for n, arg in enumerate(args)],
varargs,
keywords,
)
def test_checks_should_trigger(self):
class CountingRule(Always):
count = 0
def should_trigger(self, dt, env):
CountingRule.count += 1
return True
for r in [CountingRule] * 5:
self.em.add_event(
Event(r(), lambda context, data: None)
)
mock_algo_class = namedtuple('FakeAlgo', ['trading_environment'])
mock_algo = mock_algo_class(trading_environment="fake_env")
self.em.handle_data(mock_algo, None, datetime.datetime.now())
self.assertEqual(CountingRule.count, 5)
def choose_aws_role(assertion):
""" Choose AWS role from SAML assertion """
aws_attribute_role = 'https://aws.amazon.com/SAML/Attributes/Role'
attribute_value_urn = '{urn:oasis:names:tc:SAML:2.0:assertion}AttributeValue'
roles = []
role_tuple = namedtuple("RoleTuple", ["principal_arn", "role_arn"])
root = ET.fromstring(base64.b64decode(assertion))
for saml2attribute in root.iter('{urn:oasis:names:tc:SAML:2.0:assertion}Attribute'):
if saml2attribute.get('Name') == aws_attribute_role:
for saml2attributevalue in saml2attribute.iter(attribute_value_urn):
roles.append(role_tuple(*saml2attributevalue.text.split(',')))
for index, role in enumerate(roles):
role_name = role.role_arn.split('/')[1]
print("%d: %s" % (index+1, role_name))
role_choice = input('Please select the AWS role: ')-1
return roles[role_choice]
def create_legacy_kmeans_nodes(f, new_group_name, legacy_group_name, namedtuple, clustering_key):
""" Soft-link a legacy-structured (CR 1.2) kmeans subgroup (dest) to a new-style (CR 1.3) subgroup (src).
The old-style was a group called 'kmeans' with subgroups named _K.
The new-style is a group called 'clustering' with subgroups named kmeans_K_clusters, etc. """
group = f.create_group(f.root, legacy_group_name)
cluster_type, cluster_param = parse_clustering_key(clustering_key)
if cluster_type != CLUSTER_TYPE_KMEANS:
return
legacy_key = format_legacy_clustering_key(cluster_type, cluster_param)
subgroup = f.create_group(group, legacy_key)
for field in namedtuple._fields:
target = '/%s/_%s/%s' % (new_group_name, clustering_key, field)
if f.__contains__(target):
# NOTE: coerce `target` to 'str' here because pytables chokes on unicode `target`
f.create_soft_link(subgroup, field, target=str(target))
else:
sys.stderr.write('Skipped soft-link of legacy dataset to %s; node doesn\'t exist\n' % target)
def refresh_manager(self, fake_cluster, fake_database):
refresh_manager = FullRefreshManager()
refresh_manager.options = namedtuple(
'Options',
['cluster', 'database', 'config_path', 'dry_run', 'verbose',
'per_source_throughput_cap', 'total_throughput_cap']
)(
fake_cluster,
fake_database,
self.config_path,
True,
0,
DEFAULT_CAP,
1000
)
refresh_manager._init_global_state()
return refresh_manager
def make_params(**kwargs):
"""Create a Params tuple for BenchmarkCNN from kwargs.
Default values are filled in from _DEFAULT_PARAMS.
Args:
**kwargs: kwarg values will override the default values.
Returns:
Params namedtuple for constructing BenchmarkCNN.
"""
# Create a (name: default_value) map from PARAMS.
default_kwargs = {
name: _DEFAULT_PARAMS[name].default_value
for name in _DEFAULT_PARAMS
}
return Params(**default_kwargs)._replace(**kwargs)
def connection_from_pool_key(self, pool_key, request_context=None):
"""
Get a :class:`ConnectionPool` based on the provided pool key.
``pool_key`` should be a namedtuple that only contains immutable
objects. At a minimum it must have the ``scheme``, ``host``, and
``port`` fields.
"""
with self.pools.lock:
# If the scheme, host, or port doesn't match existing open
# connections, open a new ConnectionPool.
pool = self.pools.get(pool_key)
if pool:
return pool
# Make a fresh ConnectionPool of the desired type
scheme = request_context['scheme']
host = request_context['host']
port = request_context['port']
pool = self._new_pool(scheme, host, port, request_context=request_context)
self.pools[pool_key] = pool
return pool
def lookup(tag):
"""
:param tag: Integer tag number
:returns: Taginfo namedtuple, From the TAGS_V2 info if possible,
otherwise just populating the value and name from TAGS.
If the tag is not recognized, "unknown" is returned for the name
"""
return TAGS_V2.get(tag, TagInfo(tag, TAGS.get(tag, 'unknown')))
##
# Map tag numbers to tag info.
#
# id: (Name, Type, Length, enum_values)
#
def collect_moves(self, reader, name):
Moves = namedtuple('Moves', ['pokemon', 'gen', 'color', 'moves', 'versions'])
if name.split('-')[-1].isdigit():
for row in reader:
if name == row[0]:
pokemon = name.split('-')[0].title()
generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
return Moves(pokemon, generation, color, moves, versions)
else:
for row in reader:
if name in row[0]:
pokemon = name.title()
generation, color = switcher[row[1]], int(ast.literal_eval(row[2]))
moves, versions = ast.literal_eval(row[3]), ast.literal_eval(row[4])
return Moves(pokemon, generation, color, moves, versions)
def test_non_frozen_udts(self):
"""
Test to ensure that non frozen udt's work with C* >3.6.
@since 3.7.0
@jira_ticket PYTHON-498
@expected_result Non frozen UDT's are supported
@test_category data_types, udt
"""
self.session.execute("USE {0}".format(self.keyspace_name))
self.session.execute("CREATE TYPE user (state text, has_corn boolean)")
self.session.execute("CREATE TABLE {0} (a int PRIMARY KEY, b user)".format(self.function_table_name))
User = namedtuple('user', ('state', 'has_corn'))
self.cluster.register_user_type(self.keyspace_name, "user", User)
self.session.execute("INSERT INTO {0} (a, b) VALUES (%s, %s)".format(self.function_table_name), (0, User("Nebraska", True)))
self.session.execute("UPDATE {0} SET b.has_corn = False where a = 0".format(self.function_table_name))
result = self.session.execute("SELECT * FROM {0}".format(self.function_table_name))
self.assertFalse(result[0].b.has_corn)
table_sql = self.cluster.metadata.keyspaces[self.keyspace_name].tables[self.function_table_name].as_cql_query()
self.assertNotIn("<frozen>", table_sql)
def test_raise_error_on_nonexisting_udts(self):
"""
Test for ensuring that an error is raised for operating on a nonexisting udt or an invalid keyspace
"""
c = Cluster(protocol_version=PROTOCOL_VERSION)
s = c.connect(self.keyspace_name, wait_for_all_pools=True)
User = namedtuple('user', ('age', 'name'))
with self.assertRaises(UserTypeDoesNotExist):
c.register_user_type("some_bad_keyspace", "user", User)
with self.assertRaises(UserTypeDoesNotExist):
c.register_user_type("system", "user", User)
with self.assertRaises(InvalidRequest):
s.execute("CREATE TABLE mytable (a int PRIMARY KEY, b frozen<user>)")
c.shutdown()
def connection_from_pool_key(self, pool_key, request_context=None):
"""
Get a :class:`ConnectionPool` based on the provided pool key.
``pool_key`` should be a namedtuple that only contains immutable
objects. At a minimum it must have the ``scheme``, ``host``, and
``port`` fields.
"""
with self.pools.lock:
# If the scheme, host, or port doesn't match existing open
# connections, open a new ConnectionPool.
pool = self.pools.get(pool_key)
if pool:
return pool
# Make a fresh ConnectionPool of the desired type
scheme = request_context['scheme']
host = request_context['host']
port = request_context['port']
pool = self._new_pool(scheme, host, port, request_context=request_context)
self.pools[pool_key] = pool
return pool
def lookup(tag):
"""
:param tag: Integer tag number
:returns: Taginfo namedtuple, From the TAGS_V2 info if possible,
otherwise just populating the value and name from TAGS.
If the tag is not recognized, "unknown" is returned for the name
"""
return TAGS_V2.get(tag, TagInfo(tag, TAGS.get(tag, 'unknown')))
##
# Map tag numbers to tag info.
#
# id: (Name, Type, Length, enum_values)
#
# The length here differs from the length in the tiff spec. For
# numbers, the tiff spec is for the number of fields returned. We
# agree here. For string-like types, the tiff spec uses the length of
# field in bytes. In Pillow, we are using the number of expected
# fields, in general 1 for string-like types.
def stats2(sarray, names=None):
"""Calculate means and (co)variances for structured array data."""
if names is None:
names = sarray.dtype.names
nvar = len(names)
data = tuple(sarray[name] for name in names)
cov = np.cov(data)
nondiag_cov = list(cov[i, j] for i, j in permutations(range(nvar), 2))
names_ave = list('ave_' + name for name in names)
names_var = list('var_' + name for name in names)
names_cov = list(
'cov_' + n1 + "_" + n2 for n1, n2 in permutations(names, 2))
out = dict(zip(names_ave, np.mean(data, axis=1)))
out.update(zip(names_var, cov.diagonal()))
out.update(zip(names_cov, nondiag_cov))
NamedStats = namedtuple('Stats2', names_ave + names_var + names_cov)
return NamedStats(**out)
def connection_from_pool_key(self, pool_key, request_context=None):
"""
Get a :class:`ConnectionPool` based on the provided pool key.
``pool_key`` should be a namedtuple that only contains immutable
objects. At a minimum it must have the ``scheme``, ``host``, and
``port`` fields.
"""
with self.pools.lock:
# If the scheme, host, or port doesn't match existing open
# connections, open a new ConnectionPool.
pool = self.pools.get(pool_key)
if pool:
return pool
# Make a fresh ConnectionPool of the desired type
scheme = request_context['scheme']
host = request_context['host']
port = request_context['port']
pool = self._new_pool(scheme, host, port, request_context=request_context)
self.pools[pool_key] = pool
return pool
def abstract_brackets(formula, variables_re=''):
lwt = split_formula(formula)
new_variables = {}
while lwt:
substitute = no_re_matches(combine_re_expressions(
itertools.chain((variables_re,), new_variables.keys())
))
formula = lwt['leading'] + substitute + lwt['trailing']
new_variables[substitute] = lwt['within']
lwt = split_formula(formula)
if formula in new_variables.keys():
# incase of extranous brackets
return abstract_brackets(new_variables[formula], variables_re)
# return [formula, new_variables]
return namedtuple('abstract_brackets', ('formula', 'new_variables'))(formula, new_variables)
# splits formula into 2 parts (leading and trailing) where the operator (from settings.order_of_operations) with the lowest priority is
# if there are no operators in formula returns None
def iter_units_for_relation_name(relation_name):
"""Iterate through all units in a relation
Generator that iterates through all the units in a relation and yields
a named tuple with rid and unit field names.
Usage:
data = [(u.rid, u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param relation_name: string relation name
:yield: Named Tuple with rid and unit field names
"""
RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
for rid in relation_ids(relation_name):
for unit in related_units(rid):
yield RelatedUnit(rid, unit)
def iter_units_for_relation_name(relation_name):
"""Iterate through all units in a relation
Generator that iterates through all the units in a relation and yields
a named tuple with rid and unit field names.
Usage:
data = [(u.rid, u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param relation_name: string relation name
:yield: Named Tuple with rid and unit field names
"""
RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
for rid in relation_ids(relation_name):
for unit in related_units(rid):
yield RelatedUnit(rid, unit)
def iter_units_for_relation_name(relation_name):
"""Iterate through all units in a relation
Generator that iterates through all the units in a relation and yields
a named tuple with rid and unit field names.
Usage:
data = [(u.rid, u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param relation_name: string relation name
:yield: Named Tuple with rid and unit field names
"""
RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
for rid in relation_ids(relation_name):
for unit in related_units(rid):
yield RelatedUnit(rid, unit)
def iter_units_for_relation_name(relation_name):
"""Iterate through all units in a relation
Generator that iterates through all the units in a relation and yields
a named tuple with rid and unit field names.
Usage:
data = [(u.rid, u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param relation_name: string relation name
:yield: Named Tuple with rid and unit field names
"""
RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
for rid in relation_ids(relation_name):
for unit in related_units(rid):
yield RelatedUnit(rid, unit)
def iter_units_for_relation_name(relation_name):
"""Iterate through all units in a relation
Generator that iterates through all the units in a relation and yields
a named tuple with rid and unit field names.
Usage:
data = [(u.rid, u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param relation_name: string relation name
:yield: Named Tuple with rid and unit field names
"""
RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
for rid in relation_ids(relation_name):
for unit in related_units(rid):
yield RelatedUnit(rid, unit)
def iter_units_for_relation_name(relation_name):
"""Iterate through all units in a relation
Generator that iterates through all the units in a relation and yields
a named tuple with rid and unit field names.
Usage:
data = [(u.rid, u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param relation_name: string relation name
:yield: Named Tuple with rid and unit field names
"""
RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
for rid in relation_ids(relation_name):
for unit in related_units(rid):
yield RelatedUnit(rid, unit)
def iter_units_for_relation_name(relation_name):
"""Iterate through all units in a relation
Generator that iterates through all the units in a relation and yields
a named tuple with rid and unit field names.
Usage:
data = [(u.rid, u.unit)
for u in iter_units_for_relation_name(relation_name)]
:param relation_name: string relation name
:yield: Named Tuple with rid and unit field names
"""
RelatedUnit = namedtuple('RelatedUnit', 'rid, unit')
for rid in relation_ids(relation_name):
for unit in related_units(rid):
yield RelatedUnit(rid, unit)