def starts_when(iterable, condition):
# type: (Iterable, Union[Callable, Any]) -> Iterable
"""Start yielding items when a condition arise.
Args:
iterable: the iterable to filter.
condition: if the callable returns True once, start yielding
items. If it's not a callable, it will be converted
to one as `lambda condition: condition == item`.
Example:
>>> list(starts_when(range(10), lambda x: x > 5))
[6, 7, 8, 9]
>>> list(starts_when(range(10), 7))
[7, 8, 9]
"""
if not callable(condition):
cond_value = condition
def condition(x):
return x == cond_value
return itertools.dropwhile(lambda x: not condition(x), iterable)
python类dropwhile()的实例源码
def getMenu(today):
day = today.weekday()
try:
dom = get_dom(URL)
menu = dom.xpath("/html/body//div[@class='fck']/*[self::h3 or self::p]//text()")
menu = dropwhile(lambda line: days_lower[day] not in line.lower(), menu)
menu = islice(skip_empty_lines(menu), 1, 3)
menu = '<br>'.join(menu)
except:
menu = ''
return {
'name': 'Gólvonal',
'url': URL,
'menu': menu
}
def lines(self):
source = ''
lines = self.definition._source[self.definition._slice]
offset = self.definition.start
lines_stripped = list(reversed(list(dropwhile(is_blank,
reversed(lines)))))
numbers_width = 0
for n, line in enumerate(lines_stripped):
numbers_width = max(numbers_width, n + offset)
numbers_width = len(str(numbers_width))
numbers_width = 6
for n, line in enumerate(lines_stripped):
source += '%*d: %s' % (numbers_width, n + offset, line)
if n > 5:
source += ' ...\n'
break
return source
def pathgen(stroke, path, height, split_at_invisible, f=lambda v: not v.attribute.visible):
"""Generator that creates SVG paths (as strings) from the current stroke """
it = iter(stroke)
# start first path
yield path
for v in it:
x, y = v.point
yield '{:.3f}, {:.3f} '.format(x, height - y)
if split_at_invisible and v.attribute.visible is False:
# end current and start new path;
yield '" />' + path
# fast-forward till the next visible vertex
it = itertools.dropwhile(f, it)
# yield next visible vertex
svert = next(it, None)
if svert is None:
break
x, y = svert.point
yield '{:.3f}, {:.3f} '.format(x, height - y)
# close current path
yield '" />'
def getMenu(today):
day = today.weekday()
try:
is_this_week = lambda date: datetime.strptime(date, '%Y-%m-%dT%H:%M:%S%z').date() > today.date() - timedelta(days=7)
menu_filter = lambda post: is_this_week(post['created_time']) and sum(day in post['message'].lower() for day in days_lower) > 3
menu = get_filtered_fb_post(FB_ID, menu_filter)
menu = dropwhile(lambda line: days_lower[day] not in line.lower(), skip_empty_lines(menu.split('\n')))
menu = islice(menu, 1, 4)
menu = '<br>'.join(menu)
except:
menu = ''
return {
'name': 'Amici Miei',
'url': FB_PAGE,
'menu': menu
}
def getFBMenu(today):
day = today.weekday()
menu = ''
try:
if day < 5:
is_this_week = lambda date: datetime.strptime(date, '%Y-%m-%dT%H:%M:%S%z').date() > today.date() - timedelta(days=7)
menu_filter = lambda post: is_this_week(post['created_time']) and "jelmagyarázat" in post['message'].lower()
menu = get_filtered_fb_post(FB_ID, menu_filter)
post_parts = menu.split("HETI MENÜ")
if len(post_parts) > 1:
weekly_menu = post_parts[1]
menu = weekly_menu.strip().split("\n")
menu = islice(dropwhile(lambda l: days_lower[day] not in l, menu), 1, None)
menu = takewhile(lambda l: not any(day in l for day in days_lower), menu)
menu = '<br>'.join(skip_empty_lines(menu))
else:
menu = f'<a href="{get_fb_cover_url(FB_ID)}">heti menü</a>'
except:
pass
return menu
def default(self):
"""Return last changes in truncated unified diff format"""
output = ensure_unicode(self.git.log(
'-1',
'-p',
'--no-color',
'--format=%s',
).stdout)
lines = output.splitlines()
return u'\n'.join(
itertools.chain(
lines[:1],
itertools.islice(
itertools.dropwhile(
lambda x: not x.startswith('+++'),
lines[1:],
),
1,
None,
),
)
)
def _trim_silence(self, audio: ndarray) -> ndarray:
def trim_start(sound: ndarray) -> ndarray:
return numpy.array(list(dropwhile(lambda x: x < self.silence_threshold_for_not_normalized_sound, sound)))
def trim_end(sound: ndarray) -> ndarray:
return flipud(trim_start(flipud(sound)))
return trim_start(trim_end(audio))
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
if os.path.exists(potfile):
# Strip the header
msgs = '\n'.join(dropwhile(len, msgs.split('\n')))
else:
msgs = msgs.replace('charset=CHARSET', 'charset=UTF-8')
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
pot_lines = msgs.splitlines()
if os.path.exists(potfile):
# Strip the header
lines = dropwhile(len, pot_lines)
else:
lines = []
found, header_read = False, False
for line in pot_lines:
if not found and not header_read:
found = True
line = line.replace('charset=CHARSET', 'charset=UTF-8')
if not line and not found:
header_read = True
lines.append(line)
msgs = '\n'.join(lines)
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def _gen_words(match, splits):
groups = list(it.dropwhile(lambda x: not x, match.groups()))
for s in splits:
try:
num = int(s)
except ValueError:
word = s
else:
word = next(it.islice(groups, num, num + 1))
yield word
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
pot_lines = msgs.splitlines()
if os.path.exists(potfile):
# Strip the header
lines = dropwhile(len, pot_lines)
else:
lines = []
found, header_read = False, False
for line in pot_lines:
if not found and not header_read:
found = True
line = line.replace('charset=CHARSET', 'charset=UTF-8')
if not line and not found:
header_read = True
lines.append(line)
msgs = '\n'.join(lines)
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def get_first_occurence(occurences, from_=None, until=None):
results = occurences
if until is not None:
results = takewhile(lambda x: x <= until, results)
if from_ is not None:
results = dropwhile(lambda x: x < from_, results)
return list(results)[0]
def occurences_from(from_, dates, is_ints=False):
_from = from_
if is_ints:
_from = from_ and dt2int(from_) or None
if _from is not None:
results = dropwhile(lambda x: x < _from, dates)
return results
return dates
#@region.cache_on_arguments()
def pig_latin(word):
if is_vowel(word[0]):
return word + 'yay'
else:
remain = ''.join(dropwhile(is_consonant, word))
removed = word[:len(word) - len(remain)]
return remain + removed + 'ay'
def pathgen(stroke, style, height, split_at_invisible, stroke_color_mode, f=lambda v: not v.attribute.visible):
"""Generator that creates SVG paths (as strings) from the current stroke """
if len(stroke) <= 1:
return ""
if stroke_color_mode != 'BASE':
# try to use the color of the first or last vertex
try:
index = 0 if stroke_color_mode == 'FIRST' else -1
color = format_rgb(stroke[index].attribute.color)
style["stroke"] = color
except (ValueError, IndexError):
# default is linestyle base color
pass
# put style attributes into a single svg path definition
path = '\n<path ' + "".join('{}="{}" '.format(k, v) for k, v in style.items()) + 'd=" M '
it = iter(stroke)
# start first path
yield path
for v in it:
x, y = v.point
yield '{:.3f}, {:.3f} '.format(x, height - y)
if split_at_invisible and v.attribute.visible is False:
# end current and start new path;
yield '" />' + path
# fast-forward till the next visible vertex
it = itertools.dropwhile(f, it)
# yield next visible vertex
svert = next(it, None)
if svert is None:
break
x, y = svert.point
yield '{:.3f}, {:.3f} '.format(x, height - y)
# close current path
yield '" />'
def split_by(pred, seq):
"""
Splits start of sequence, consisting of items passing predicate, from the
rest of it. Works similar to takewhile(pred, seq), dropwhile(pred, seq),
but works with iterator seq correctly.
"""
a, b = _tee(seq)
return _takewhile(pred, a), _dropwhile(pred, b)
#
# Special iteration
#
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
pot_lines = msgs.splitlines()
if os.path.exists(potfile):
# Strip the header
lines = dropwhile(len, pot_lines)
else:
lines = []
found, header_read = False, False
for line in pot_lines:
if not found and not header_read:
found = True
line = line.replace('charset=CHARSET', 'charset=UTF-8')
if not line and not found:
header_read = True
lines.append(line)
msgs = '\n'.join(lines)
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def source(self):
"""Return the source code for the definition."""
full_src = self._source[self._slice]
def is_empty_or_comment(line):
return line.strip() == '' or line.strip().startswith('#')
filtered_src = dropwhile(is_empty_or_comment, reversed(full_src))
return ''.join(reversed(list(filtered_src)))
def dropwhile(predicate, col):
'''
Drop elements from a collection while the predicate holds.
Return a list of those elements that are left
'''
return list(idropwhile(predicate, col))
def ios_chunk(line_it):
chunk_start = b': GoogleAnalytics '
chunk_end = b'timestamp = '
it = itertools.dropwhile(lambda x: chunk_start not in x, line_it)
next(it)
return itertools.takewhile(lambda x: chunk_end not in x, it)
def memoryPills(pills):
gen = dropwhile(lambda s: len(s) % 2 != 0, pills + [""] * 3)
next(gen)
return [next(gen) for _ in range(3)]
def enumerateslice(iterable, first, last):
return itertools.takewhile(lambda (i,v): i<last, itertools.dropwhile(lambda (i,v): i<first, enumerate(iterable)))
## Convert fractional day into datetime.time
def collect_docstring(lines):
"""Return document docstring if it exists"""
lines = dropwhile(lambda x: not x.startswith('"""'), lines)
doc = ""
for line in lines:
doc += line
if doc.endswith('"""\n'):
break
return doc[3:-4].replace("\r", "").replace("\n", " ")
def validate_turtle(contents):
all_lines = contents.splitlines()
# Separate prefixes from the body
prefix_lines = [line for line in all_lines if line.startswith(u"@prefix")]
prefixes = {
"books": "http://www.books.org/",
"isbn": "http://www.books.org/isbn/",
"xsd": "http://www.w3.org/2001/XMLSchema#"
}
# Validate that specified prefixes are there
for pre, url in prefixes.items():
pattern = r"@prefix {}:[\s]* <{}> \.".format(pre, url)
assert any([re.match(pattern, x) is not None for x in prefix_lines]), \
"{} is not found among prefixes".format(pre)
# Validate subject grouping
# Move the cursor until the first subject
iter = dropwhile(lambda x: len(x) == 0 or re.match("[\s]+$", x) or x.startswith(u"@prefix"), all_lines)
# Check the block for each subject
for s in range(NUM_SUBJECTS):
this_sub_lines = list(takewhile(lambda x: len(x) != 0 and not re.match("[\s]+$", x), iter))
assert len(this_sub_lines) == NUM_TRIPLES_PER_SUBJ
# First line is where subject is defined
subj_line = this_sub_lines[0]
assert subj_line.startswith(u"isbn:")
assert subj_line.endswith(";")
# Rest of the lines starts with some whitespace
assert all([re.match(r"^[\s]+", x) for x in this_sub_lines[1:]])
# Next two lines end with ;
assert all([x.endswith(u";") for x in this_sub_lines[1:(NUM_TRIPLES_PER_SUBJ-1)]])
# Last line ends with a dot
assert this_sub_lines[-1].endswith(u".")
# Each line has a "books:" for the predicate
assert all(["books:" in x for x in this_sub_lines])
# One of the lines has true or false
assert any(["true" in x or "false" in x for x in this_sub_lines])
# Two of the lines has xsd:
assert sum([1 for x in this_sub_lines if "xsd:" in x]) == 2
def get_after(sentinel, iterable):
"Get the value after `sentinel` in an `iterable`"
truncated = dropwhile(lambda el: el != sentinel, iterable)
next(truncated)
return next(truncated)
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
pot_lines = msgs.splitlines()
if os.path.exists(potfile):
# Strip the header
lines = dropwhile(len, pot_lines)
else:
lines = []
found, header_read = False, False
for line in pot_lines:
if not found and not header_read:
found = True
line = line.replace('charset=CHARSET', 'charset=UTF-8')
if not line and not found:
header_read = True
lines.append(line)
msgs = '\n'.join(lines)
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def parse_pr_message(message):
message_by_line = message.split("\n")
if len(message) == 0:
return None, None
title = message_by_line[0]
body = "\n".join(itertools.dropwhile(
operator.not_, message_by_line[1:]))
return title, body
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
if os.path.exists(potfile):
# Strip the header
msgs = '\n'.join(dropwhile(len, msgs.split('\n')))
else:
msgs = msgs.replace('charset=CHARSET', 'charset=UTF-8')
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def next(self):
in_list = self._get_list_attribute_is_member_off()
if in_list is None:
return None
next_node = list(itertools.dropwhile(lambda x: x is not self, in_list))[1:]
return next_node[0] if next_node else None