def next_generator(self):
in_list = self._get_list_attribute_is_member_off()
if in_list is None:
return None
generator = itertools.dropwhile(lambda x: x is not self, in_list)
next(generator)
return generator
python类dropwhile()的实例源码
def previous(self):
in_list = self._get_list_attribute_is_member_off()
if in_list is None:
return None
next_node = list(itertools.dropwhile(lambda x: x is not self, reversed(in_list)))[1:]
return next_node[0] if next_node else None
def previous_generator(self):
in_list = self._get_list_attribute_is_member_off()
if in_list is None:
return None
generator = itertools.dropwhile(lambda x: x is not self, reversed(in_list))
next(generator)
return generator
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
if os.path.exists(potfile):
# Strip the header
msgs = '\n'.join(dropwhile(len, msgs.split('\n')))
else:
msgs = msgs.replace('charset=CHARSET', 'charset=UTF-8')
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
if os.path.exists(potfile):
# Strip the header
msgs = '\n'.join(dropwhile(len, msgs.split('\n')))
else:
msgs = msgs.replace('charset=CHARSET', 'charset=UTF-8')
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def load_info(self, bot):
with open(os.path.join(self.path, 'plugin.yml')) as yml_file:
yml = yaml.safe_load(yml_file)
if 'enabled' not in yml or not yml['enabled']:
raise PluginNotEnabled
path = self.path.split('/')
name = path.pop()
# use the directory as plugin name if it is not set
if 'name' not in yml:
yml['name'] = name
self.name = yml['name']
self.log = create_logger(self.name)
self.log.info('Loading plugin {:s}'.format(self.name))
# set categories from rest of pathname
if 'categories' not in yml:
it = iter(path)
cat = [dropwhile(lambda x: x != 'plugins', it)][1:]
yml['categories'] = cat
self.categories = yml['categories']
if 'commands' in yml:
self.commands_info = yml['commands']
if 'events' in yml:
self.events_info = yml['events']
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
if os.path.exists(potfile):
# Strip the header
msgs = '\n'.join(dropwhile(len, msgs.split('\n')))
else:
msgs = msgs.replace('charset=CHARSET', 'charset=UTF-8')
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
if os.path.exists(potfile):
# Strip the header
msgs = '\n'.join(dropwhile(len, msgs.split('\n')))
else:
msgs = msgs.replace('charset=CHARSET', 'charset=UTF-8')
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def __order_subjects__(self):
global_order = sorted(self.subjects.items(), key=itemgetter(1))
sort = list(global_order)
for year in range(self.years):
self.ordered_by_year[year] = list(takewhile(lambda x: x[1].year == year+1, sort))
sort = list(dropwhile(lambda x: x[1].year == year+1, sort))
return global_order
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
pot_lines = msgs.splitlines()
if os.path.exists(potfile):
# Strip the header
lines = dropwhile(len, pot_lines)
else:
lines = []
found, header_read = False, False
for line in pot_lines:
if not found and not header_read:
found = True
line = line.replace('charset=CHARSET', 'charset=UTF-8')
if not line and not found:
header_read = True
lines.append(line)
msgs = '\n'.join(lines)
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
pot_lines = msgs.splitlines()
if os.path.exists(potfile):
# Strip the header
lines = dropwhile(len, pot_lines)
else:
lines = []
found, header_read = False, False
for line in pot_lines:
if not found and not header_read:
found = True
line = line.replace('charset=CHARSET', 'charset=UTF-8')
if not line and not found:
header_read = True
lines.append(line)
msgs = '\n'.join(lines)
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
pot_lines = msgs.splitlines()
if os.path.exists(potfile):
# Strip the header
lines = dropwhile(len, pot_lines)
else:
lines = []
found, header_read = False, False
for line in pot_lines:
if not found and not header_read:
found = True
line = line.replace('charset=CHARSET', 'charset=UTF-8')
if not line and not found:
header_read = True
lines.append(line)
msgs = '\n'.join(lines)
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def get_repo_line(result):
stdout = (line.strip() for line in result.stdout_lines)
stdout = itertools.dropwhile(lambda item: not item.startswith("***"),
stdout)
stdout = list(stdout)
if not stdout:
raise ValueError("Server {0} has no installed ceph-common".format(
result.srv.ip))
yield result.srv, stdout[1].split(" ", 1)[1]
def get_function_body(func):
source_lines = inspect.getsourcelines(func)[0]
source_lines = dropwhile(lambda x: x.startswith('@'), source_lines)
def_line = next(source_lines).strip()
if def_line.startswith('def ') and def_line.endswith(':'):
# Handle functions that are not one-liners
first_line = next(source_lines)
# Find the indentation of the first line
indentation = len(first_line) - len(first_line.lstrip())
return [first_line[indentation:]] + [line[indentation:] for line in source_lines]
else:
# Handle single line functions
return [def_line.rsplit(':')[-1].strip()]
def get_imports(obj):
source_lines = inspect.getsourcelines(sys.modules[obj.__module__])[0]
source_lines = dropwhile(lambda x: x.startswith('@'), source_lines)
import_lines = filter(lambda x: x.startswith('import ') or x.startswith('from '), source_lines)
return list(import_lines)
def lstrip(self):
""" Remove the left zeros and shift the exponent
Warning: 0 Not supported
"""
result = Number(list(reversed(list(
itertools.dropwhile(lambda x: x == 0, reversed(self.val))
))), self.exponent, self.sign)
if len(result.val) == 0:
result.val = [0]
result.sign = +1
result.exponent -= len(self.val) - len(result.val)
return result
def write_pot_file(potfile, msgs):
"""
Write the :param potfile: POT file with the :param msgs: contents,
previously making sure its format is valid.
"""
pot_lines = msgs.splitlines()
if os.path.exists(potfile):
# Strip the header
lines = dropwhile(len, pot_lines)
else:
lines = []
found, header_read = False, False
for line in pot_lines:
if not found and not header_read:
found = True
line = line.replace('charset=CHARSET', 'charset=UTF-8')
if not line and not found:
header_read = True
lines.append(line)
msgs = '\n'.join(lines)
with io.open(potfile, 'a', encoding='utf-8') as fp:
fp.write(msgs)
def data_deal_function():
# compress()????????????.????????????????,??????????????.
# ????????????????True?????
# ??,????????????.???????Python??????????,??????
# itertools.filterfalse()???????????,??????.???????????False???True???
for item in it.compress([1, 2, 3, 4, 5], [False, True, False, 0, 1]):
print(item)
# dropwhile()?takewhile()?????????????.??????????????????????????,???????????????.
# dropwhile()??????????????????????False.?takewhile()??????????False
# ??,????????????????????????(??dropwhile????,????????????,?takewhile?????????)
def __single_digit(n):
return n < 10
for n in it.dropwhile(__single_digit, range(20)):
print(n, end=" ")
for n in it.takewhile(__single_digit, range(20)):
print(n, end=" ")
# accumulate()?????????????????????????????(??????,????????????).??,???????
# [1,2,3,4]??,???result1?1.?????????result1?2??result2,????.????????functools???reduce()????
for n in it.accumulate([1, 2, 3, 4, ]):
print(n, end=" ")
def datapoints(self, query, maxdatapoints=None):
if query.statistics and (query.statistics not in ['Average', 'Sum', 'SampleCount',
'Maximum', 'Minimum']):
raise InvalidMetricQuery("Query statistic invalid value `{}`".format(query.statistics))
elif query.statistics:
statistics = query.statistics
else:
statistics = "Average"
if maxdatapoints:
# Calculate the Period where the number of datapoints
# returned are less than maxdatapoints.
# Get the first granularity that suits for return the maxdatapoints
seconds = (query.get_until() - query.get_since()).total_seconds()
period = next(dropwhile(lambda g: seconds / g > maxdatapoints, count(60, 60)))
else:
period = 60
# get a client using the region given by the query, or if it
# is None using the one given by the datasource or the profile
client = self._cw_client(region=query.region)
kwargs = {
'Namespace': query.namespace,
'MetricName': query.metricname,
'StartTime': query.get_since(),
'EndTime': query.get_until(),
'Period': period,
'Dimensions': [{
'Name': query.dimension_name,
'Value': query.dimension_value,
}],
'Statistics': [statistics]
}
datapoints = self._cw_call(client, "get_metric_statistics", **kwargs)
return [(point[statistics], time.mktime(point['Timestamp'].timetuple()))
for point in datapoints['Datapoints']]
def getMenu(today):
menu = ''
try:
day = today.weekday()
is_this_week = lambda date: datetime.strptime(date, '%Y-%m-%dT%H:%M:%S%z').date() > today.date() - timedelta(days=7)
is_today = lambda date: datetime.strptime(date, '%Y-%m-%dT%H:%M:%S%z').date() == today.date()
ignore_hashtags = lambda post: " ".join(word.lower() for word in post.split() if word[0] != "#")
daily_menu_filter = lambda post: is_today(post['created_time']) \
and "menü" in post['message'].lower()
weekly_menu_filter = lambda post: is_this_week(post['created_time']) \
and days_lower[day] in ignore_hashtags(post['message'])
weekly_menu = get_filtered_fb_post(FB_ID, weekly_menu_filter)
if weekly_menu:
menu_post = dropwhile(lambda line: days_lower[day] not in line.lower(), skip_empty_lines(weekly_menu.split('\n')))
else:
menu_post = get_filtered_fb_post(FB_ID, daily_menu_filter).split('\n')
menu_post = list(menu_post)
for i, line in enumerate(menu_post):
if "A:" in line:
menu = "<br>".join((menu_post[i-1], menu_post[i], menu_post[i+1]))
break
except:
pass
return {
'name': 'Kompót',
'url': FB_PAGE,
'menu': menu
}