def elapsed_time(start, end):
"""Calculate the elapsed time for a service activity.
Arguments:
start (:py:class:`str`): The activity start time.
end (:py:class:`str`): The activity end time.
Returns:
:py:class:`tuple`: The start and end times and humanized elapsed
time.
"""
start_time = safe_parse(start)
end_time = safe_parse(end)
if start_time is None or end_time is None:
logger.warning('failed to generate elapsed time')
text = 'elapsed time not available'
else:
text = 'took {}'.format(naturaldelta(parse(end) - parse(start)))
return to_utc_timestamp(start_time), to_utc_timestamp(end_time), text
python类naturaldelta()的实例源码
def humanize_duration(delta):
"""format for a single duration value - takes datatime.timedelta as arg
Parameters
----------
delta : datetime.timedelta
A time delta
Returns
-------
str
Humanize delta to duration.
"""
return humanize.naturaldelta(delta)
# various utility functions
def cmd_remind(self, event, duration, content):
if Reminder.count_for_user(event.author.id) > 30:
return event.msg.reply(':warning: you an only have 15 reminders going at once!')
remind_at = parse_duration(duration)
if remind_at > (datetime.utcnow() + timedelta(seconds=5 * YEAR_IN_SEC)):
return event.msg.reply(':warning: thats too far in the future, I\'ll forget!')
r = Reminder.create(
message_id=event.msg.id,
remind_at=remind_at,
content=content
)
self.reminder_task.set_next_schedule(r.remind_at)
event.msg.reply(':ok_hand: I\'ll remind you at {} ({})'.format(
r.remind_at.isoformat(),
humanize.naturaldelta(r.remind_at - datetime.utcnow()),
))
def temprole(self, event, user, role, duration, reason=None):
member = event.guild.get_member(user)
if not member:
raise CommandFail('invalid user')
self.can_act_on(event, member.id)
role_id = role if isinstance(role, (int, long)) else event.config.role_aliases.get(role.lower())
if not role_id or role_id not in event.guild.roles:
raise CommandFail('invalid or unknown role')
if role_id in member.roles:
raise CommandFail(u'{} is already in that role'.format(member.user))
expire_dt = parse_duration(duration)
Infraction.temprole(self, event, member, role_id, reason, expire_dt)
self.queue_infractions()
self.confirm_action(event, maybe_string(
reason,
u':ok_hand: {u} is now in the {r} role for {t} (`{o}`)',
u':ok_hand: {u} is now in the {r} role for {t}',
r=event.guild.roles[role_id].name,
u=member.user,
t=humanize.naturaldelta(expire_dt - datetime.utcnow()),
))
def tempban(self, event, duration, user, reason=None):
member = event.guild.get_member(user)
if member:
self.can_act_on(event, member.id)
expires_dt = parse_duration(duration)
Infraction.tempban(self, event, member, reason, expires_dt)
self.queue_infractions()
self.confirm_action(event, maybe_string(
reason,
u':ok_hand: temp-banned {u} for {t} (`{o}`)',
u':ok_hand: temp-banned {u} for {t}',
u=member.user,
t=humanize.naturaldelta(expires_dt - datetime.utcnow()),
))
else:
raise CommandFail('invalid user')
def update(self, increment=1):
self.duration = datetime.timedelta(seconds=time.monotonic() - self.started_at)
self.current += increment
try:
percent = 100 * (self.current / float(self.total))
filled_length = int(round(self.columns * self.current / float(self.total)))
except ZeroDivisionError:
percent = 0
filled_length = 0
if self.quiet:
return
if self.log:
logger = logging.getLogger(__name__)
logger.info("{s.prelude} {percent:.1f}% = {s.current}/{s.total} {s.suffix} ({s.duration} elapsed)".format(s=self, percent=percent))
return
bar = '=' * filled_length + '-' * (self.columns - filled_length)
out = "\r{s.prelude} [{bar}] {percent:.1f}% {s.current}/{s.total} {s.suffix} ({duration} elapsed)".format(bar=bar, s=self, percent=percent, duration=humanize.naturaldelta(self.duration))
self.longest_line = max(self.longest_line, len(out))
sys.stdout.write(out)
sys.stdout.flush()
def estimate_time(builds):
"""Update the working build with an estimated completion time.
Takes a simple average over the previous builds, using those
whose outcome is ``'passed'``.
Arguments:
builds (:py:class:`list`): All builds.
"""
try:
index, current = next(
(index, build) for index, build in enumerate(builds[:4])
if build['outcome'] == 'working'
)
except StopIteration:
return # no in-progress builds
if current.get('started_at') is None:
current['elapsed'] = 'estimate not available'
return
usable = [
current for current in builds[index + 1:]
if current['outcome'] == 'passed' and current['duration'] is not None
]
if not usable:
current['elapsed'] = 'estimate not available'
return
average_duration = int(sum(build['duration'] for build in usable) /
float(len(usable)))
finish = current['started_at'] + average_duration
remaining = (datetime.fromtimestamp(finish) -
datetime.now()).total_seconds()
if remaining >= 0:
current['elapsed'] = '{} left'.format(naturaldelta(remaining))
else:
current['elapsed'] = 'nearly done'
def natural_eta(self):
return humanize.naturaldelta(self.eta)
def natural_overall_eta(self):
return humanize.naturaldelta(self.avg * self.grand_total)
def nat_elapsed(self):
return humanize.naturaldelta(self.elapsed_td)
def process_issues(repository, installation):
now = time.time()
# Get issues labeled as 'Close?'
repo = RepoHandler(repository, 'master', installation)
issuelist = repo.get_issues('open', 'Close?')
for n in issuelist:
print(f'Checking {n}')
issue = IssueHandler(repository, n, installation)
labeled_time = issue.get_label_added_date('Close?')
if labeled_time is None:
continue
dt = now - labeled_time
if current_app.stale_issue_close and dt > current_app.stale_issue_close_seconds:
comment_ids = issue.find_comments('astropy-bot[bot]', filter_keep=is_close_epilogue)
if len(comment_ids) == 0:
print(f'-> CLOSING issue {n}')
issue.submit_comment(ISSUE_CLOSE_EPILOGUE)
issue.close()
else:
print(f'-> Skipping issue {n} (already closed)')
elif dt > current_app.stale_issue_warn_seconds:
comment_ids = issue.find_comments('astropy-bot[bot]', filter_keep=is_close_warning)
if len(comment_ids) == 0:
print(f'-> WARNING issue {n}')
issue.submit_comment(ISSUE_CLOSE_WARNING.format(pasttime=naturaltime(dt),
futuretime=naturaldelta(current_app.stale_issue_close_seconds - current_app.stale_issue_warn_seconds)))
else:
print(f'-> Skipping issue {n} (already warned)')
else:
print(f'-> OK issue {n}')
def format_duration(self, seconds):
time_format = self._settings.get(['time_format'], merged=True)
if seconds == None:
return "N/A"
delta = datetime.timedelta(seconds=seconds)
time_format = self._settings.get(['time_format'], merged=True)
if time_format == "FUZZY":
return humanize.naturaldelta(delta)
elif time_format == "EXACT":
return octoprint.util.get_formatted_timedelta(delta)
else:
return self.humanize_duration(seconds)
def humanize_timedelta_filter(dt: timedelta, fmt=None):
"""??humanize??????????????"""
humanize.i18n.activate('zh_CN', path='etc/humanize')
return humanize.naturaldelta(dt)
def stats(self):
chain_balances = self.calculate_balances()
coins_mined = abs(chain_balances['network'])
blockchain_start = timestamp2datetime(self[0].timestamp)
blockchain_end = timestamp2datetime(self[-1].timestamp)
total_duration = blockchain_start - blockchain_end
return u'{} britcoins | {} per britcoin'.format(
coins_mined,
humanize.naturaldelta(total_duration.total_seconds() / coins_mined)
)
def relative_datetime(self):
"""Return human-readable relative time string."""
now = datetime.now(timezone.utc)
tense = "from now" if self.created_at > now else "ago"
return "{0} {1}".format(humanize.naturaldelta(now - self.created_at), tense)
def natural_last_modified(self):
last_modified = parsedate_to_datetime(self.last_modified)
now = datetime.now(timezone.utc)
tense = "from now" if last_modified > now else "ago"
return "{0} {1}".format(humanize.naturaldelta(now - last_modified), tense)
def update_remaining(self):
if self.last_message.get('estimated_finish'):
estimated_remaining = self.last_message['estimated_finish'] - datetime.now(timezone.utc)
if estimated_remaining < timedelta(seconds=5):
estimated_remaining_str = 'a few seconds'
else:
estimated_remaining_str = humanize.naturaldelta(estimated_remaining)
self.estimated_completion.value_label.set_text(estimated_remaining_str)
self.estimated_completion.show_all()
def updateCard(self, trainer):
dailyDiff = await self.getDiff(trainer, 1)
level=trainer.level
embed=discord.Embed(timestamp=dailyDiff.new_date, colour=int(trainer.team().colour.replace("#", ""), 16))
try:
embed.set_author(name=trainer.username, icon_url=trainer.account().discord().avatar_url)
except:
embed.set_author(name=trainer.username)
embed.add_field(name='Level', value=level.level)
if level.level != 40:
embed.add_field(name='XP', value='{:,} / {:,}'.format(trainer.update.xp-level.total_xp,level.xp_required))
else:
embed.add_field(name='Total XP', value='{}'.format(humanize.intword(trainer.update.xp)))
if dailyDiff.change_xp and dailyDiff.change_time:
gain = '{:,} since {}. '.format(dailyDiff.change_xp, humanize.naturalday(dailyDiff.old_date))
if dailyDiff.change_time.days>1:
gain += "That's {:,} xp/day.".format(round(dailyDiff.change_xp/dailyDiff.change_time.days))
embed.add_field(name='Gain', value=gain)
if trainer.goal_daily and dailyDiff.change_time.days>0:
dailyGoal = trainer.goal_daily
embed.add_field(name='Daily completion', value='{}% towards {:,}'.format(pycent.percentage(dailyDiff.change_xp/max(1,dailyDiff.change_time.days), dailyGoal), dailyGoal))
if trainer.goal_total and trainer.goal_total!=0:
totalGoal = trainer.goal_total
else:
totalGoal = None
if totalGoal:
totalDiff = await self.getDiff(trainer, 7)
embed.add_field(name='Goal remaining', value='{:,} out of {}'.format(totalGoal-totalDiff.new_xp, humanize.intword(totalGoal)))
if totalDiff.change_time.seconds>=1:
eta = lambda x, y, z: round(x/(y/z))
eta = eta(totalGoal-totalDiff.new_xp, totalDiff.change_xp, totalDiff.change_time.total_seconds())
eta = totalDiff.new_date+datetime.timedelta(seconds=eta)
embed.add_field(name='Goal ETA', value=humanize.naturaltime(eta.replace(tzinfo=None)))
if totalDiff.change_time.total_seconds()<583200:
embed.description = "ETA may be inaccurate. Using {} of data.".format(humanize.naturaldelta(totalDiff.change_time))
embed.set_footer(text="Total XP: {:,}".format(dailyDiff.new_xp))
return embed
def seen(self, event, user):
try:
msg = Message.select(Message.timestamp).where(
Message.author_id == user.id
).order_by(Message.timestamp.desc()).limit(1).get()
except Message.DoesNotExist:
return event.msg.reply(u"I've never seen {}".format(user))
event.msg.reply(u'I last saw {} {} ago (at {})'.format(
user,
humanize.naturaldelta(datetime.utcnow() - msg.timestamp),
msg.timestamp
))
def voice_log(self, event, user):
if isinstance(user, DiscoUser):
user = user.id
sessions = GuildVoiceSession.select(
GuildVoiceSession.user_id,
GuildVoiceSession.channel_id,
GuildVoiceSession.started_at,
GuildVoiceSession.ended_at
).where(
(GuildVoiceSession.user_id == user) &
(GuildVoiceSession.guild_id == event.guild.id)
).order_by(GuildVoiceSession.started_at.desc()).limit(10)
tbl = MessageTable()
tbl.set_header('Channel', 'Joined At', 'Duration')
for session in sessions:
tbl.add(
unicode(self.state.channels.get(session.channel_id) or 'UNKNOWN'),
'{} ({} ago)'.format(
session.started_at.isoformat(),
humanize.naturaldelta(datetime.utcnow() - session.started_at)),
humanize.naturaldelta(session.ended_at - session.started_at) if session.ended_at else 'Active')
event.msg.reply(tbl.compile())
def command_about(self, event):
embed = MessageEmbed()
embed.set_author(name='Rowboat', icon_url=self.client.state.me.avatar_url, url='https://rowboat.party/')
embed.description = BOT_INFO
embed.add_field(name='Servers', value=str(Guild.select().count()), inline=True)
embed.add_field(name='Uptime', value=humanize.naturaldelta(datetime.utcnow() - self.startup), inline=True)
event.msg.reply(embed=embed)
def human_next(self):
return humanize.naturaldelta(self.get_next())
def update(self, bins, increment=1):
self.duration = datetime.timedelta(seconds=time.monotonic() - self.started_at)
self.updates += increment
start = min([x[0] for x in bins])
endin = max([x[1] for x in bins])
percent_start = ((start - self._start) / float(self.totalrange))
percent_endin = ((endin - self._start) / float(self.totalrange))
filledStart = int(round(self.columns * percent_start))
filledEndin = int(round(self.columns * percent_endin))
percent = sum([filledStart, filledEndin]) / 2
if not self.logger.isEnabledFor(logging.WARNING):
return
if self.logger.isEnabledFor(logging.INFO):
self.logger.info("{s.prelude} consider time range {start}-{endin} = {percent:.1f}% {s.updates} {s.suffix} ({s.duration} elapsed)".format(s=self, start=start, endin=endin, bin=bin, percent=percent))
return
if filledStart == filledEndin:
filledEndin = max(self.columns, filledEndin + 1) # clamp
bar = '-' * filledStart + '=' * (filledEndin - filledStart) + '-' * (self.columns - filledEndin)
out = "\r{s.prelude} [{bar}] {percent:.1f}% {s.updates} {s.suffix} ({duration} elapsed)".format(bar=bar, s=self, percent=percent, duration=humanize.naturaldelta(self.duration))
self.longest_line = max(self.longest_line, len(out))
sys.stdout.write(out)
sys.stdout.write(" " * (len(out) - self.longest_line)) # white out over remaining characters from previous line
sys.stdout.flush()
def process_pull_requests(repository, installation):
now = time.time()
# Get issues labeled as 'Close?'
repo = RepoHandler(repository, 'master', installation)
pull_requests = repo.open_pull_requests()
# User config
enable_autoclose = repo.get_config_value(
'autoclose_stale_pull_request', True)
for n in pull_requests:
print(f'Checking {n}')
pr = PullRequestHandler(repository, n, installation)
if 'keep-open' in pr.labels:
print('-> PROTECTED by label, skipping')
continue
commit_time = pr.last_commit_date
dt = now - commit_time
if current_app.stale_pull_requests_close and dt > current_app.stale_pull_requests_close_seconds:
comment_ids = pr.find_comments('astropy-bot[bot]', filter_keep=is_close_epilogue)
if not enable_autoclose:
print(f'-> Skipping issue {n} (auto-close disabled)')
elif len(comment_ids) == 0:
print(f'-> CLOSING issue {n}')
pr.submit_comment(PULL_REQUESTS_CLOSE_EPILOGUE)
pr.close()
else:
print(f'-> Skipping issue {n} (already closed)')
elif dt > current_app.stale_pull_requests_warn_seconds:
comment_ids = pr.find_comments('astropy-bot[bot]', filter_keep=is_close_warning)
if len(comment_ids) == 0:
print(f'-> WARNING issue {n}')
pr.submit_comment(PULL_REQUESTS_CLOSE_WARNING.format(pasttime=naturaldelta(dt),
futuretime=naturaldelta(current_app.stale_pull_requests_close_seconds - current_app.stale_pull_requests_warn_seconds)))
else:
print(f'-> Skipping issue {n} (already warned)')
else:
print(f'-> OK issue {n}')
def v2_playbook_on_stats(self, stats):
if stats.failures and stats.ok:
color = 'warning'
elif not stats.failures:
color = 'good'
else:
color = 'danger'
end_ts = datetime.now()
delta = humanize.naturaldelta(end_ts - self.start_ts) if self.start_ts else 'unknown'
title = "Playbook {name} run in {time}".format(name=self.playbook_name, time=delta)
hosts = stats.processed.keys()
payload = {'attachments': [
{
'title': title,
'color': color,
'author_name': self.user,
'fields': [{
'title': '',
'value': "{emoji} {host}".format(
emoji=':boom:' if host in stats.failures else ':ok_hand:',
host=host,
),
'short': True,
} for host in hosts],
# For IRC users
'fallback': '\n'.join([title, "by {0}".format(self.user)] + [
' - {host}: {status}'.format(
host=host,
status='failed' if host in stats.failures else 'ok',
)
for host in hosts
]),
}
]}
if os.environ.get('VAGRANT_EXECUTABLE'): # Do not post on slack when playbook is run by Vagrant
print('{delimiter}POST {url}\n{payload}{delimiter}'.format(
url=self.slack_hook_url,
payload=json.dumps(payload, indent=True),
delimiter='\n' + '-' * 30 + ' slack ' + '-' * 30 + '\n',
))
elif not self.skip_slack:
requests.post(self.slack_hook_url, data=json.dumps(payload))
def server(self, event, guild_id=None):
guild = self.state.guilds.get(guild_id) if guild_id else event.guild
if not guild:
raise CommandFail('invalid server')
content = []
content.append(u'**\u276F Server Information**')
created_at = to_datetime(guild.id)
content.append(u'Created: {} ago ({})'.format(
humanize.naturaldelta(datetime.utcnow() - created_at),
created_at.isoformat(),
))
content.append(u'Members: {}'.format(len(guild.members)))
content.append(u'Features: {}'.format(', '.join(guild.features) or 'none'))
content.append(u'\n**\u276F Counts**')
text_count = sum(1 for c in guild.channels.values() if not c.is_voice)
voice_count = len(guild.channels) - text_count
content.append(u'Roles: {}'.format(len(guild.roles)))
content.append(u'Text: {}'.format(text_count))
content.append(u'Voice: {}'.format(voice_count))
content.append(u'\n**\u276F Members**')
status_counts = defaultdict(int)
for member in guild.members.values():
if not member.user.presence:
status = Status.OFFLINE
else:
status = member.user.presence.status
status_counts[status] += 1
for status, count in sorted(status_counts.items(), key=lambda i: str(i[0]), reverse=True):
content.append(u'<{}> - {}'.format(
STATUS_EMOJI[status], count
))
embed = MessageEmbed()
if guild.icon:
embed.set_thumbnail(url=guild.icon_url)
embed.color = get_dominant_colors_guild(guild)
embed.description = '\n'.join(content)
event.msg.reply('', embed=embed)
def trigger_reminder(self, reminder):
message = reminder.message_id
channel = self.state.channels.get(message.channel_id)
if not channel:
self.log.warning('Not triggering reminder, channel %s was not found!',
message.channel_id)
reminder.delete_instance()
return
msg = channel.send_message(u'<@{}> you asked me at {} ({} ago) to remind you about: {}'.format(
message.author_id,
reminder.created_at,
humanize.naturaldelta(reminder.created_at - datetime.utcnow()),
S(reminder.content)
))
# Add the emoji options
msg.add_reaction(SNOOZE_EMOJI)
msg.add_reaction(GREEN_TICK_EMOJI)
try:
mra_event = self.wait_for_event(
'MessageReactionAdd',
message_id=msg.id,
conditional=lambda e: (
(e.emoji.name == SNOOZE_EMOJI or e.emoji.id == GREEN_TICK_EMOJI_ID) and
e.user_id == message.author_id
)
).get(timeout=30)
except gevent.Timeout:
reminder.delete_instance()
return
finally:
# Cleanup
msg.delete_reaction(SNOOZE_EMOJI)
msg.delete_reaction(GREEN_TICK_EMOJI)
if mra_event.emoji.name == SNOOZE_EMOJI:
reminder.remind_at = datetime.utcnow() + timedelta(minutes=20)
reminder.save()
msg.edit(u'Ok, I\'ve snoozed that reminder for 20 minutes.')
return
reminder.delete_instance()
def infraction_search(self, event, query=None):
q = (Infraction.guild_id == event.guild.id)
if query and isinstance(query, list) and isinstance(query[0], DiscoUser):
query = query[0].id
elif query:
query = ' '.join(query)
if query and (isinstance(query, int) or query.isdigit()):
q &= (
(Infraction.id == int(query)) |
(Infraction.user_id == int(query)) |
(Infraction.actor_id == int(query)))
elif query:
q &= (Infraction.reason ** query)
user = User.alias()
actor = User.alias()
infractions = Infraction.select(Infraction, user, actor).join(
user,
on=((Infraction.user_id == user.user_id).alias('user'))
).switch(Infraction).join(
actor,
on=((Infraction.actor_id == actor.user_id).alias('actor'))
).where(q).order_by(Infraction.created_at.desc()).limit(6)
tbl = MessageTable()
tbl.set_header('ID', 'Created', 'Type', 'User', 'Moderator', 'Active', 'Reason')
for inf in infractions:
type_ = {i.index: i for i in Infraction.Types.attrs}[inf.type_]
reason = inf.reason or ''
if len(reason) > 256:
reason = reason[:256] + '...'
if inf.active:
active = 'yes'
if inf.expires_at:
active += ' (expires in {})'.format(humanize.naturaldelta(inf.expires_at - datetime.utcnow()))
else:
active = 'no'
tbl.add(
inf.id,
inf.created_at.isoformat(),
str(type_),
unicode(inf.user),
unicode(inf.actor),
active,
clamp(reason, 128)
)
event.msg.reply(tbl.compile())
def tempmute(self, event, user, duration=None, reason=None):
if not duration and reason:
duration = parse_duration(reason.split(' ')[0], safe=True)
if duration:
if ' ' in reason:
reason = reason.split(' ', 1)[-1]
else:
reason = None
elif duration:
duration = parse_duration(duration)
member = event.guild.get_member(user)
if member:
self.can_act_on(event, member.id)
if not event.config.mute_role:
raise CommandFail('mute is not setup on this server')
if event.config.mute_role in member.roles:
raise CommandFail(u'{} is already muted'.format(member.user))
# If we have a duration set, this is a tempmute
if duration:
# Create the infraction
Infraction.tempmute(self, event, member, reason, duration)
self.queue_infractions()
self.confirm_action(event, maybe_string(
reason,
u':ok_hand: {u} is now muted for {t} (`{o}`)',
u':ok_hand: {u} is now muted for {t}',
u=member.user,
t=humanize.naturaldelta(duration - datetime.utcnow()),
))
else:
existed = False
# If the user is already muted check if we can take this from a temp
# to perma mute.
if event.config.mute_role in member.roles:
existed = Infraction.clear_active(event, member.id, [Infraction.Types.TEMPMUTE])
# The user is 100% muted and not tempmuted at this point, so lets bail
if not existed:
raise CommandFail(u'{} is already muted'.format(member.user))
Infraction.mute(self, event, member, reason)
existed = u' [was temp-muted]' if existed else ''
self.confirm_action(event, maybe_string(
reason,
u':ok_hand: {u} is now muted (`{o}`)' + existed,
u':ok_hand: {u} is now muted' + existed,
u=member.user,
))
else:
raise CommandFail('invalid user')