def _by_version_descending(names):
"""
Given a list of filenames, return them in descending order
by version number.
>>> names = 'bar', 'foo', 'Python-2.7.10.egg', 'Python-2.7.2.egg'
>>> _by_version_descending(names)
['Python-2.7.10.egg', 'Python-2.7.2.egg', 'foo', 'bar']
>>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.egg', 'Setuptools-1.2.3b1.egg']
>>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.post1.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.post1.egg', 'Setuptools-1.2.3b1.egg']
"""
def _by_version(name):
"""
Parse each component of the filename
"""
name, ext = os.path.splitext(name)
parts = itertools.chain(name.split('-'), [ext])
return [packaging.version.parse(part) for part in parts]
return sorted(names, key=_by_version, reverse=True)
python类chain()的实例源码
def _pad_version(left, right):
left_split, right_split = [], []
# Get the release segment of our versions
left_split.append(list(itertools.takewhile(lambda x: x.isdigit(), left)))
right_split.append(list(itertools.takewhile(lambda x: x.isdigit(), right)))
# Get the rest of our versions
left_split.append(left[len(left_split[0]):])
right_split.append(right[len(right_split[0]):])
# Insert our padding
left_split.insert(
1,
["0"] * max(0, len(right_split[0]) - len(left_split[0])),
)
right_split.insert(
1,
["0"] * max(0, len(left_split[0]) - len(right_split[0])),
)
return (
list(itertools.chain(*left_split)),
list(itertools.chain(*right_split)),
)
def _hash_comparison(self):
"""
Return a comparison of actual and expected hash values.
Example::
Expected sha256 abcdeabcdeabcdeabcdeabcdeabcdeabcdeabcdeabcde
or 123451234512345123451234512345123451234512345
Got bcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdef
"""
def hash_then_or(hash_name):
# For now, all the decent hashes have 6-char names, so we can get
# away with hard-coding space literals.
return chain([hash_name], repeat(' or'))
lines = []
for hash_name, expecteds in iteritems(self.allowed):
prefix = hash_then_or(hash_name)
lines.extend((' Expected %s %s' % (next(prefix), e))
for e in expecteds)
lines.append(' Got %s\n' %
self.gots[hash_name].hexdigest())
prefix = ' or'
return '\n'.join(lines)
def _pad_version(left, right):
left_split, right_split = [], []
# Get the release segment of our versions
left_split.append(list(itertools.takewhile(lambda x: x.isdigit(), left)))
right_split.append(list(itertools.takewhile(lambda x: x.isdigit(), right)))
# Get the rest of our versions
left_split.append(left[len(left_split[0]):])
right_split.append(right[len(right_split[0]):])
# Insert our padding
left_split.insert(
1,
["0"] * max(0, len(right_split[0]) - len(left_split[0])),
)
right_split.insert(
1,
["0"] * max(0, len(left_split[0]) - len(right_split[0])),
)
return (
list(itertools.chain(*left_split)),
list(itertools.chain(*right_split)),
)
def _build_multipart(cls, data):
"""
Build up the MIME payload for the POST data
"""
boundary = b'--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
sep_boundary = b'\n--' + boundary
end_boundary = sep_boundary + b'--'
end_items = end_boundary, b"\n",
builder = functools.partial(
cls._build_part,
sep_boundary=sep_boundary,
)
part_groups = map(builder, data.items())
parts = itertools.chain.from_iterable(part_groups)
body_items = itertools.chain(parts, end_items)
content_type = 'multipart/form-data; boundary=%s' % boundary.decode('ascii')
return b''.join(body_items), content_type
def find_data_files(self, package, src_dir):
"""Return filenames for package's data files in 'src_dir'"""
patterns = self._get_platform_patterns(
self.package_data,
package,
src_dir,
)
globs_expanded = map(glob, patterns)
# flatten the expanded globs into an iterable of matches
globs_matches = itertools.chain.from_iterable(globs_expanded)
glob_files = filter(os.path.isfile, globs_matches)
files = itertools.chain(
self.manifest_files.get(package, []),
glob_files,
)
return self.exclude_data_files(package, src_dir, files)
def exclude_data_files(self, package, src_dir, files):
"""Filter filenames for package's data files in 'src_dir'"""
files = list(files)
patterns = self._get_platform_patterns(
self.exclude_package_data,
package,
src_dir,
)
match_groups = (
fnmatch.filter(files, pattern)
for pattern in patterns
)
# flatten the groups of matches into an iterable of matches
matches = itertools.chain.from_iterable(match_groups)
bad = set(matches)
keepers = (
fn
for fn in files
if fn not in bad
)
# ditch dupes
return list(_unique_everseen(keepers))
def _build_paths(self, name, spec_path_lists, exists):
"""
Given an environment variable name and specified paths,
return a pathsep-separated string of paths containing
unique, extant, directories from those paths and from
the environment variable. Raise an error if no paths
are resolved.
"""
# flatten spec_path_lists
spec_paths = itertools.chain.from_iterable(spec_path_lists)
env_paths = safe_env.get(name, '').split(os.pathsep)
paths = itertools.chain(spec_paths, env_paths)
extant_paths = list(filter(os.path.isdir, paths)) if exists else paths
if not extant_paths:
msg = "%s environment variable is empty" % name.upper()
raise distutils.errors.DistutilsPlatformError(msg)
unique_paths = self._unique_everseen(extant_paths)
return os.pathsep.join(unique_paths)
# from Python docs
def read_flat(self):
"""
Read a PNG file and decode it into flat row flat pixel format.
Returns (*width*, *height*, *pixels*, *metadata*).
May use excessive memory.
`pixels` are returned in flat row flat pixel format.
See also the :meth:`read` method which returns pixels in the
more stream-friendly boxed row flat pixel format.
"""
x, y, pixel, meta = self.read()
arraycode = 'BH'[meta['bitdepth']>8]
pixel = array(arraycode, itertools.chain(*pixel))
return x, y, pixel, meta
def _pad_version(left, right):
left_split, right_split = [], []
# Get the release segment of our versions
left_split.append(list(itertools.takewhile(lambda x: x.isdigit(), left)))
right_split.append(list(itertools.takewhile(lambda x: x.isdigit(), right)))
# Get the rest of our versions
left_split.append(left[len(left_split[0]):])
right_split.append(right[len(right_split[0]):])
# Insert our padding
left_split.insert(
1,
["0"] * max(0, len(right_split[0]) - len(left_split[0])),
)
right_split.insert(
1,
["0"] * max(0, len(left_split[0]) - len(right_split[0])),
)
return (
list(itertools.chain(*left_split)),
list(itertools.chain(*right_split)),
)
def _hash_comparison(self):
"""
Return a comparison of actual and expected hash values.
Example::
Expected sha256 abcdeabcdeabcdeabcdeabcdeabcdeabcdeabcdeabcde
or 123451234512345123451234512345123451234512345
Got bcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdef
"""
def hash_then_or(hash_name):
# For now, all the decent hashes have 6-char names, so we can get
# away with hard-coding space literals.
return chain([hash_name], repeat(' or'))
lines = []
for hash_name, expecteds in iteritems(self.allowed):
prefix = hash_then_or(hash_name)
lines.extend((' Expected %s %s' % (next(prefix), e))
for e in expecteds)
lines.append(' Got %s\n' %
self.gots[hash_name].hexdigest())
prefix = ' or'
return '\n'.join(lines)
def _by_version_descending(names):
"""
Given a list of filenames, return them in descending order
by version number.
>>> names = 'bar', 'foo', 'Python-2.7.10.egg', 'Python-2.7.2.egg'
>>> _by_version_descending(names)
['Python-2.7.10.egg', 'Python-2.7.2.egg', 'foo', 'bar']
>>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.egg', 'Setuptools-1.2.3b1.egg']
>>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.post1.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.post1.egg', 'Setuptools-1.2.3b1.egg']
"""
def _by_version(name):
"""
Parse each component of the filename
"""
name, ext = os.path.splitext(name)
parts = itertools.chain(name.split('-'), [ext])
return [packaging.version.parse(part) for part in parts]
return sorted(names, key=_by_version, reverse=True)
def _build_multipart(cls, data):
"""
Build up the MIME payload for the POST data
"""
boundary = b'--------------GHSKFJDLGDS7543FJKLFHRE75642756743254'
sep_boundary = b'\n--' + boundary
end_boundary = sep_boundary + b'--'
end_items = end_boundary, b"\n",
builder = functools.partial(
cls._build_part,
sep_boundary=sep_boundary,
)
part_groups = map(builder, data.items())
parts = itertools.chain.from_iterable(part_groups)
body_items = itertools.chain(parts, end_items)
content_type = 'multipart/form-data; boundary=%s' % boundary.decode('ascii')
return b''.join(body_items), content_type
def find_data_files(self, package, src_dir):
"""Return filenames for package's data files in 'src_dir'"""
patterns = self._get_platform_patterns(
self.package_data,
package,
src_dir,
)
globs_expanded = map(glob, patterns)
# flatten the expanded globs into an iterable of matches
globs_matches = itertools.chain.from_iterable(globs_expanded)
glob_files = filter(os.path.isfile, globs_matches)
files = itertools.chain(
self.manifest_files.get(package, []),
glob_files,
)
return self.exclude_data_files(package, src_dir, files)
def _get_platform_patterns(spec, package, src_dir):
"""
yield platform-specific path patterns (suitable for glob
or fn_match) from a glob-based spec (such as
self.package_data or self.exclude_package_data)
matching package in src_dir.
"""
raw_patterns = itertools.chain(
spec.get('', []),
spec.get(package, []),
)
return (
# Each pattern has to be converted to a platform-specific path
os.path.join(src_dir, convert_path(pattern))
for pattern in raw_patterns
)
# from Python docs
def _build_paths(self, name, spec_path_lists, exists):
"""
Given an environment variable name and specified paths,
return a pathsep-separated string of paths containing
unique, extant, directories from those paths and from
the environment variable. Raise an error if no paths
are resolved.
"""
# flatten spec_path_lists
spec_paths = itertools.chain.from_iterable(spec_path_lists)
env_paths = safe_env.get(name, '').split(os.pathsep)
paths = itertools.chain(spec_paths, env_paths)
extant_paths = list(filter(os.path.isdir, paths)) if exists else paths
if not extant_paths:
msg = "%s environment variable is empty" % name.upper()
raise distutils.errors.DistutilsPlatformError(msg)
unique_paths = self._unique_everseen(extant_paths)
return os.pathsep.join(unique_paths)
# from Python docs
def find_a_bracket(self, caret_pt):
"""
Locate the next bracket after the caret in the current line.
If None is found, execution must be aborted.
Return (bracket, brackets, bracket_pt).
Example ('(', ('(', ')'), 1337)).
"""
caret_row, caret_col = self.view.rowcol(caret_pt)
line_text = self.view.substr(Region(caret_pt, self.view.line(caret_pt).b))
try:
found_brackets = min([(line_text.index(bracket), bracket)
for bracket in chain(*self.pairs)
if bracket in line_text])
except ValueError:
return None, None, None
bracket_a, bracket_b = [(a, b) for (a, b) in self.pairs if found_brackets[1] in (a, b)][0]
return (found_brackets[1], (bracket_a, bracket_b),
self.view.text_point(caret_row, caret_col + found_brackets[0]))
def _check_listening_on_services_ports(services, test=False):
"""Check that the unit is actually listening (has the port open) on the
ports that the service specifies are open. If test is True then the
function returns the services with ports that are open rather than
closed.
Returns an OrderedDict of service: ports and a list of booleans
@param services: OrderedDict(service: [port, ...], ...)
@param test: default=False, if False, test for closed, otherwise open.
@returns OrderedDict(service: [port-not-open, ...]...), [boolean]
"""
test = not(not(test)) # ensure test is True or False
all_ports = list(itertools.chain(*services.values()))
ports_states = [port_has_listener('0.0.0.0', p) for p in all_ports]
map_ports = OrderedDict()
matched_ports = [p for p, opened in zip(all_ports, ports_states)
if opened == test] # essentially opened xor test
for service, ports in services.items():
set_ports = set(ports).intersection(matched_ports)
if set_ports:
map_ports[service] = set_ports
return map_ports, ports_states
def _check_listening_on_services_ports(services, test=False):
"""Check that the unit is actually listening (has the port open) on the
ports that the service specifies are open. If test is True then the
function returns the services with ports that are open rather than
closed.
Returns an OrderedDict of service: ports and a list of booleans
@param services: OrderedDict(service: [port, ...], ...)
@param test: default=False, if False, test for closed, otherwise open.
@returns OrderedDict(service: [port-not-open, ...]...), [boolean]
"""
test = not(not(test)) # ensure test is True or False
all_ports = list(itertools.chain(*services.values()))
ports_states = [port_has_listener('0.0.0.0', p) for p in all_ports]
map_ports = OrderedDict()
matched_ports = [p for p, opened in zip(all_ports, ports_states)
if opened == test] # essentially opened xor test
for service, ports in services.items():
set_ports = set(ports).intersection(matched_ports)
if set_ports:
map_ports[service] = set_ports
return map_ports, ports_states
def peek(iterable):
"""
Peek ahead in an iterable.
Parameters
----------
iterable : iterable
Returns
-------
first : object
First element of ``iterable``
stream : iterable
Iterable containing ``first`` and all other elements from ``iterable``
"""
iterable = iter(iterable)
ahead = next(iterable)
return ahead, chain([ahead], iterable)
def attach_pipeline(self, pipeline, name, chunksize=None):
"""
Register a pipeline to be computed at the start of each day.
"""
if self._pipelines:
raise NotImplementedError("Multiple pipelines are not supported.")
if chunksize is None:
# Make the first chunk smaller to get more immediate results:
# (one week, then every half year)
chunks = iter(chain([5], repeat(126)))
else:
chunks = iter(repeat(int(chunksize)))
self._pipelines[name] = pipeline, chunks
# Return the pipeline to allow expressions like
# p = attach_pipeline(Pipeline(), 'name')
return pipeline
def _group_lengths(grouping):
"""Convert a localeconv-style grouping into a (possibly infinite)
iterable of integers representing group lengths.
"""
# The result from localeconv()['grouping'], and the input to this
# function, should be a list of integers in one of the
# following three forms:
#
# (1) an empty list, or
# (2) nonempty list of positive integers + [0]
# (3) list of positive integers + [locale.CHAR_MAX], or
from itertools import chain, repeat
if not grouping:
return []
elif grouping[-1] == 0 and len(grouping) >= 2:
return chain(grouping[:-1], repeat(grouping[-2]))
elif grouping[-1] == _locale.CHAR_MAX:
return grouping[:-1]
else:
raise ValueError('unrecognised format for grouping')
def _get_headnode_dict(fixer_list):
""" Accepts a list of fixers and returns a dictionary
of head node type --> fixer list. """
head_nodes = collections.defaultdict(list)
every = []
for fixer in fixer_list:
if fixer.pattern:
try:
heads = _get_head_types(fixer.pattern)
except _EveryNode:
every.append(fixer)
else:
for node_type in heads:
head_nodes[node_type].append(fixer)
else:
if fixer._accept_type is not None:
head_nodes[fixer._accept_type].append(fixer)
else:
every.append(fixer)
for node_type in chain(pygram.python_grammar.symbol2number.itervalues(),
pygram.python_grammar.tokens):
head_nodes[node_type].extend(every)
return dict(head_nodes)
def _match_label_with_color(label, colors, bg_label, bg_color):
"""Return `unique_labels` and `color_cycle` for label array and color list.
Colors are cycled for normal labels, but the background color should only
be used for the background.
"""
# Temporarily set background color; it will be removed later.
if bg_color is None:
bg_color = (0, 0, 0)
bg_color = _rgb_vector([bg_color])
unique_labels = list(set(label.flat))
# Ensure that the background label is in front to match call to `chain`.
if bg_label in unique_labels:
unique_labels.remove(bg_label)
unique_labels.insert(0, bg_label)
# Modify labels and color cycle so background color is used only once.
color_cycle = itertools.cycle(colors)
color_cycle = itertools.chain(bg_color, color_cycle)
return unique_labels, color_cycle
def predict_beatmap(self, beatmap, *mods, **mods_scalar):
"""Predict the user's accuracy for the given beatmap.
Parameters
----------
beatmap : Beatmap
The map to predict the performance of.
*mods
A sequence of mod dictionaries to predict for.
**mods_dict
Mods to predict for.
Returns
-------
accuracy : float
The user's expected accuracy in the range [0, 1].
"""
for mod_name in 'hidden', 'hard_rock', 'half_time', 'double_time':
mods_scalar.setdefault(mod_name, False)
return self.predict([
(beatmap, ms) for ms in chain(mods, [mods_scalar])
])
def compile(self, ttFont):
# First make sure that all the data lines up properly. Format 4
# must have all its data lined up consecutively. If not this will fail.
for curLoc, nxtLoc in zip(self.locations, self.locations[1:]):
assert curLoc[1] == nxtLoc[0], "Data must be consecutive in indexSubTable format 4"
offsets = list(self.locations[0]) + [loc[1] for loc in self.locations[1:]]
# Image data offset must be less than or equal to the minimum of locations.
# Resetting this offset may change the value for round tripping but is safer
# and allows imageDataOffset to not be required to be in the XML version.
self.imageDataOffset = min(offsets)
offsets = [offset - self.imageDataOffset for offset in offsets]
glyphIds = list(map(ttFont.getGlyphID, self.names))
# Create an iterator over the ids plus a padding value.
idsPlusPad = list(itertools.chain(glyphIds, [0]))
dataList = [EblcIndexSubTable.compile(self, ttFont)]
dataList.append(struct.pack(">L", len(glyphIds)))
tmp = [struct.pack(codeOffsetPairFormat, *cop) for cop in zip(idsPlusPad, offsets)]
dataList += tmp
data = bytesjoin(dataList)
return data
def openapi2httpdomain(spec, **options):
generators = []
# OpenAPI spec may contain JSON references, common properties, etc.
# Trying to render the spec "As Is" will require to put multiple
# if-s around the code. In order to simplify flow, let's make the
# spec to have only one (expected) schema, i.e. normalize it.
_normalize_spec(spec, **options)
# If 'paths' are passed we've got to ensure they exist within an OpenAPI
# spec; otherwise raise error and ask user to fix that.
if 'paths' in options:
if not set(options['paths']).issubset(spec['paths']):
raise ValueError(
'One or more paths are not defined in the spec: %s.' % (
', '.join(set(options['paths']) - set(spec['paths'])),
)
)
for endpoint in options.get('paths', spec['paths']):
for method, properties in spec['paths'][endpoint].items():
generators.append(_httpresource(endpoint, method, properties))
return iter(itertools.chain(*generators))
def __iter__(self):
stack = [ (self.__graphRoot, chain(self.__pkgRoot.getDirectDepSteps(),
self.__pkgRoot.getIndirectDepSteps())) ]
yield (self.__graphRoot, self.__pkgRoot)
done = set([self.__graphRoot.key()])
while stack:
try:
childPkg = next(stack[-1][1]).getPackage()
childNode = stack[-1][0][childPkg.getName()].node
if childNode.key() not in done:
done.add(childNode.key())
yield (childNode, childPkg)
stack.append( (childNode, chain(childPkg.getDirectDepSteps(),
childPkg.getIndirectDepSteps())) )
except StopIteration:
stack.pop()
def __load(self):
# load cards
with open(self.cardJSON, 'r', encoding='utf8') as file:
cards = json.load(file)
with open(self.tokenJSON, 'r', encoding='utf8') as file:
tokens = json.load(file)
# json to db full of text
for name, card in itertools.chain(cards.items(), tokens.items()):
clean = CardDB.cleanName(name)
if clean in self.__db:
log.error("load() duplicate name, already in the db: %s",
clean)
raise Exception('duplicate card')
self.__db[clean] = formatter.createCardText(card, self.constants)
self.tokens = [CardDB.cleanName(name) for name in tokens.keys()]
# finally load temp file
self.refreshTemp()
def exists(self, submission_id, cards):
"""Test if request is a duplicate and inserts new
:return: true if all cards are already posted for parent
"""
query = ('SELECT card FROM topcomment '
' WHERE submission_id = ?'
' AND card IN (%s)' % ','.join('?' * len(cards)))
params = list(itertools.chain((submission_id,), cards))
foundCards = [row[0] for row in self.conn.execute(query, params)]
inserted = False
for card in cards:
if card not in foundCards:
inserted = True
self.conn.execute("INSERT INTO topcomment (submission_id, card) VALUES (?, ?)",
(submission_id, card))
self.conn.commit()
return not inserted