def gen_values(self, n, reversed = False, shuffled = False, gen_dupes = False):
if reversed:
keys = xrange(n-1,-1,-1)
else:
keys = xrange(n)
if shuffled:
keys = list(keys)
r = random.Random(1234827)
r.shuffle(keys)
if gen_dupes:
return itertools.chain(
itertools.izip(keys, xrange(0, 2*n, 2)),
itertools.islice(itertools.izip(keys, xrange(0, 2*n, 2)), 10, None),
)
else:
return itertools.izip(keys, xrange(0, 2*n, 2))
python类chain()的实例源码
def _replace_cdata_list_attribute_values(self, tag_name, attrs):
"""Replaces class="foo bar" with class=["foo", "bar"]
Modifies its input in place.
"""
if self.cdata_list_attributes:
universal = self.cdata_list_attributes.get('*', [])
tag_specific = self.cdata_list_attributes.get(
tag_name.lower(), [])
for cdata_list_attr in itertools.chain(universal, tag_specific):
if cdata_list_attr in dict(attrs):
# Basically, we have a "class" attribute whose
# value is a whitespace-separated list of CSS
# classes. Split it into a list.
value = attrs[cdata_list_attr]
values = whitespace_re.split(value)
attrs[cdata_list_attr] = values
return attrs
def _parse_ipv6(a):
"""
Parse IPv6 address. Ideally we would use the ipaddress module in
Python3.3 but can't rely on having this.
Does not handle dotted-quad addresses or subnet prefix
>>> _parse_ipv6("::") == (0,) * 16
True
>>> _parse_ipv6("1234:5678::abcd:0:ff00")
(18, 52, 86, 120, 0, 0, 0, 0, 0, 0, 171, 205, 0, 0, 255, 0)
"""
l,_,r = a.partition("::")
l_groups = list(chain(*[divmod(int(x,16),256) for x in l.split(":") if x]))
r_groups = list(chain(*[divmod(int(x,16),256) for x in r.split(":") if x]))
zeros = [0] * (16 - len(l_groups) - len(r_groups))
return tuple(l_groups + zeros + r_groups)
def write(self, outfile, rows):
"""Write a PNG image to the output file. `rows` should be
an iterable that yields each row in boxed row flat pixel format.
The rows should be the rows of the original image, so there
should be ``self.height`` rows of ``self.width * self.planes`` values.
If `interlace` is specified (when creating the instance), then
an interlaced PNG file will be written. Supply the rows in the
normal image order; the interlacing is carried out internally.
.. note ::
Interlacing will require the entire image to be in working memory.
"""
if self.interlace:
fmt = 'BH'[self.bitdepth > 8]
a = array(fmt, itertools.chain(*rows))
return self.write_array(outfile, a)
else:
nrows = self.write_passes(outfile, rows)
if nrows != self.height:
raise ValueError(
"rows supplied (%d) does not match height (%d)" %
(nrows, self.height))
def read_flat(self):
"""
Read a PNG file and decode it into flat row flat pixel format.
Returns (*width*, *height*, *pixels*, *metadata*).
May use excessive memory.
`pixels` are returned in flat row flat pixel format.
See also the :meth:`read` method which returns pixels in the
more stream-friendly boxed row flat pixel format.
"""
x, y, pixel, meta = self.read()
arraycode = 'BH'[meta['bitdepth']>8]
pixel = array(arraycode, itertools.chain(*pixel))
return x, y, pixel, meta
def testPAMin(self):
"""Test that the command line tool can read PAM file."""
def do():
return _main(['testPAMin'])
s = BytesIO()
s.write(strtobytes('P7\nWIDTH 3\nHEIGHT 1\nDEPTH 4\nMAXVAL 255\n'
'TUPLTYPE RGB_ALPHA\nENDHDR\n'))
# The pixels in flat row flat pixel format
flat = [255,0,0,255, 0,255,0,120, 0,0,255,30]
asbytes = seqtobytes(flat)
s.write(asbytes)
s.flush()
s.seek(0)
o = BytesIO()
testWithIO(s, o, do)
r = Reader(bytes=o.getvalue())
x,y,pixels,meta = r.read()
self.assertTrue(r.alpha)
self.assertTrue(not r.greyscale)
self.assertEqual(list(itertools.chain(*pixels)), flat)
def partition_args(self, all_commands, args):
name = args[0]
try:
command = all_commands[name]
except KeyError:
raise RunnerError('Unknown command: {name}'.format(name=name))
args = args[1:]
command_args = []
partition = [command, command_args]
prev_args = chain([None], args[:-1])
next_args = chain(args[1:], [None])
for prev_arg, arg, next_arg in zip(prev_args, args, next_args):
if arg in all_commands:
option = command.arg_map.get(prev_arg)
if option is None or not option.takes_value:
break
if arg.startswith(':') and arg != ':':
arg = arg[1:]
command_args.append(arg)
return partition
def _iterate_polymorphic_properties(self, mappers=None):
"""Return an iterator of MapperProperty objects which will render into
a SELECT."""
if mappers is None:
mappers = self._with_polymorphic_mappers
if not mappers:
for c in self.iterate_properties:
yield c
else:
# in the polymorphic case, filter out discriminator columns
# from other mappers, as these are sometimes dependent on that
# mapper's polymorphic selectable (which we don't want rendered)
for c in util.unique_list(
chain(*[
list(mapper.iterate_properties) for mapper in
[self] + mappers
])
):
if getattr(c, '_is_polymorphic_discriminator', False) and \
(self.polymorphic_on is None or
c.columns[0] is not self.polymorphic_on):
continue
yield c
def _create_outerjoin(cls, left, right, onclause=None):
"""Return an ``OUTER JOIN`` clause element.
The returned object is an instance of :class:`.Join`.
Similar functionality is also available via the
:meth:`~.FromClause.outerjoin()` method on any
:class:`.FromClause`.
:param left: The left side of the join.
:param right: The right side of the join.
:param onclause: Optional criterion for the ``ON`` clause, is
derived from foreign key relationships established between
left and right otherwise.
To chain joins together, use the :meth:`.FromClause.join` or
:meth:`.FromClause.outerjoin` methods on the resulting
:class:`.Join` object.
"""
return cls(left, right, onclause, isouter=True)
def _froms(self):
# would love to cache this,
# but there's just enough edge cases, particularly now that
# declarative encourages construction of SQL expressions
# without tables present, to just regen this each time.
froms = []
seen = set()
translate = self._from_cloned
for item in itertools.chain(
_from_objects(*self._raw_columns),
_from_objects(self._whereclause)
if self._whereclause is not None else (),
self._from_obj
):
if item is self:
raise exc.InvalidRequestError(
"select() construct refers to itself as a FROM")
if translate and item in translate:
item = translate[item]
if not seen.intersection(item._cloned_set):
froms.append(item)
seen.update(item._cloned_set)
return froms
def __init__(self, selectable, equivalents=None,
chain_to=None, adapt_required=False,
include_fn=None, exclude_fn=None,
adapt_on_names=False,
allow_label_resolve=True,
anonymize_labels=False):
ClauseAdapter.__init__(self, selectable, equivalents,
include_fn=include_fn, exclude_fn=exclude_fn,
adapt_on_names=adapt_on_names,
anonymize_labels=anonymize_labels)
if chain_to:
self.chain(chain_to)
self.columns = util.populate_column_dict(self._locate_col)
if self.include_fn or self.exclude_fn:
self.columns = self._IncludeExcludeMapping(self, self.columns)
self.adapt_required = adapt_required
self.allow_label_resolve = allow_label_resolve
self._wrap = None
def run(self):
mtimes = {}
while 1:
for filename in chain(_iter_module_files(),
self.extra_files):
try:
mtime = os.stat(filename).st_mtime
except OSError:
continue
old_time = mtimes.get(filename)
if old_time is None:
mtimes[filename] = mtime
continue
elif mtime > old_time:
self.trigger_reload(filename)
self._sleep(self.interval)
def update_template_context(self, context):
"""Update the template context with some commonly used variables.
This injects request, session, config and g into the template
context as well as everything template context processors want
to inject. Note that the as of Flask 0.6, the original values
in the context will not be overridden if a context processor
decides to return a value with the same key.
:param context: the context as a dictionary that is updated in place
to add extra variables.
"""
funcs = self.template_context_processors[None]
reqctx = _request_ctx_stack.top
if reqctx is not None:
bp = reqctx.request.blueprint
if bp is not None and bp in self.template_context_processors:
funcs = chain(funcs, self.template_context_processors[bp])
orig_ctx = context.copy()
for func in funcs:
context.update(func())
# make sure the original values win. This makes it possible to
# easier add new variables in context processors without breaking
# existing views.
context.update(orig_ctx)
def preprocess_request(self):
"""Called before the actual request dispatching and will
call every as :meth:`before_request` decorated function.
If any of these function returns a value it's handled as
if it was the return value from the view and further
request handling is stopped.
This also triggers the :meth:`url_value_processor` functions before
the actual :meth:`before_request` functions are called.
"""
bp = _request_ctx_stack.top.request.blueprint
funcs = self.url_value_preprocessors.get(None, ())
if bp is not None and bp in self.url_value_preprocessors:
funcs = chain(funcs, self.url_value_preprocessors[bp])
for func in funcs:
func(request.endpoint, request.view_args)
funcs = self.before_request_funcs.get(None, ())
if bp is not None and bp in self.before_request_funcs:
funcs = chain(funcs, self.before_request_funcs[bp])
for func in funcs:
rv = func()
if rv is not None:
return rv
def process_response(self, response):
"""Can be overridden in order to modify the response object
before it's sent to the WSGI server. By default this will
call all the :meth:`after_request` decorated functions.
.. versionchanged:: 0.5
As of Flask 0.5 the functions registered for after request
execution are called in reverse order of registration.
:param response: a :attr:`response_class` object.
:return: a new response object or the same, has to be an
instance of :attr:`response_class`.
"""
ctx = _request_ctx_stack.top
bp = ctx.request.blueprint
funcs = ctx._after_request_functions
if bp is not None and bp in self.after_request_funcs:
funcs = chain(funcs, reversed(self.after_request_funcs[bp]))
if None in self.after_request_funcs:
funcs = chain(funcs, reversed(self.after_request_funcs[None]))
for handler in funcs:
response = handler(response)
if not self.session_interface.is_null_session(ctx.session):
self.save_session(ctx.session, response)
return response
def _by_version_descending(names):
"""
Given a list of filenames, return them in descending order
by version number.
>>> names = 'bar', 'foo', 'Python-2.7.10.egg', 'Python-2.7.2.egg'
>>> _by_version_descending(names)
['Python-2.7.10.egg', 'Python-2.7.2.egg', 'foo', 'bar']
>>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.egg', 'Setuptools-1.2.3b1.egg']
>>> names = 'Setuptools-1.2.3b1.egg', 'Setuptools-1.2.3.post1.egg'
>>> _by_version_descending(names)
['Setuptools-1.2.3.post1.egg', 'Setuptools-1.2.3b1.egg']
"""
def _by_version(name):
"""
Parse each component of the filename
"""
name, ext = os.path.splitext(name)
parts = itertools.chain(name.split('-'), [ext])
return [packaging.version.parse(part) for part in parts]
return sorted(names, key=_by_version, reverse=True)
def _pad_version(left, right):
left_split, right_split = [], []
# Get the release segment of our versions
left_split.append(list(itertools.takewhile(lambda x: x.isdigit(), left)))
right_split.append(list(itertools.takewhile(lambda x: x.isdigit(), right)))
# Get the rest of our versions
left_split.append(left[len(left_split[0]):])
right_split.append(right[len(right_split[0]):])
# Insert our padding
left_split.insert(
1,
["0"] * max(0, len(right_split[0]) - len(left_split[0])),
)
right_split.insert(
1,
["0"] * max(0, len(left_split[0]) - len(right_split[0])),
)
return (
list(itertools.chain(*left_split)),
list(itertools.chain(*right_split)),
)
def _hash_comparison(self):
"""
Return a comparison of actual and expected hash values.
Example::
Expected sha256 abcdeabcdeabcdeabcdeabcdeabcdeabcdeabcdeabcde
or 123451234512345123451234512345123451234512345
Got bcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdefbcdef
"""
def hash_then_or(hash_name):
# For now, all the decent hashes have 6-char names, so we can get
# away with hard-coding space literals.
return chain([hash_name], repeat(' or'))
lines = []
for hash_name, expecteds in iteritems(self.allowed):
prefix = hash_then_or(hash_name)
lines.extend((' Expected %s %s' % (next(prefix), e))
for e in expecteds)
lines.append(' Got %s\n' %
self.gots[hash_name].hexdigest())
prefix = ' or'
return '\n'.join(lines)
def retry(delays=(0, 1, 5, 30, 180, 600, 3600),
exception=Exception,
report=lambda *args: None):
def wrapper(function):
def wrapped(*args, **kwargs):
problems = []
for delay in itertools.chain(delays, [ None ]):
try:
return function(*args, **kwargs)
except exception as problem:
problems.append(problem)
if delay is None:
report("retryable failed definitely:", problems)
raise
else:
report("retryable failed:", problem,
"-- delaying for %ds" % delay)
time.sleep(delay)
return wrapped
return wrapper
def abstract_brackets(formula, variables_re=''):
lwt = split_formula(formula)
new_variables = {}
while lwt:
substitute = no_re_matches(combine_re_expressions(
itertools.chain((variables_re,), new_variables.keys())
))
formula = lwt['leading'] + substitute + lwt['trailing']
new_variables[substitute] = lwt['within']
lwt = split_formula(formula)
if formula in new_variables.keys():
# incase of extranous brackets
return abstract_brackets(new_variables[formula], variables_re)
# return [formula, new_variables]
return namedtuple('abstract_brackets', ('formula', 'new_variables'))(formula, new_variables)
# splits formula into 2 parts (leading and trailing) where the operator (from settings.order_of_operations) with the lowest priority is
# if there are no operators in formula returns None