def occurred(at_):
"""Calculate when a service event occurred.
Arguments:
at_ (:py:class:`str`): When the event occurred.
Returns:
:py:class:`str`: The humanized occurrence time.
"""
try:
occurred_at = parse(at_)
except (TypeError, ValueError):
logger.warning('failed to parse occurrence time %r', at_)
return 'time not available'
utc_now = datetime.now(tz=timezone.utc)
try:
return naturaltime((utc_now - occurred_at).total_seconds())
except TypeError: # at_ is a naive datetime
return naturaltime((datetime.now() - occurred_at).total_seconds())
python类naturaltime()的实例源码
def update(self):
finished_delta = datetime.now(timezone.utc) - self.finish
if finished_delta < timedelta(minutes=1):
finished_str = "just now"
else:
finished_str = humanize.naturaltime(finished_delta)
self.finish_label.set_text(finished_str)
self.rerun_btn.set_sensitive(self.source_available == SourceAvailability.AVAILABLE
and self.compression_available)
tooltip = RERUN_TIP
if self.source_available == SourceAvailability.MOUNTED:
tooltip = rm_dev(self.msg['source']) + " is currently mounted"
elif self.source_available == SourceAvailability.GONE:
tooltip = rm_dev(self.msg['source']) + " is not currently available"
elif self.source_available == SourceAvailability.UUID_MISMATCH:
tooltip = 'current {} does not match cloned partition uuid'\
.format(rm_dev(self.msg['source']))
elif not self.compression_available:
tooltip = extract_compression_option(self.msg['destination']) + \
' compression is not installed'
self.rerun_btn.set_tooltip_text(tooltip)
def raids(self, ctx):
"""Not a list of active raids"""
message = ''
gyms = set(list(self.going_users.keys())+list(self.arrived_users.keys()))
if not gyms:
message = 'There are no raids on at the moment'
for gym_id in gyms:
gym = Gym.get(id=gym_id)
monacle_gym = await self.get_monacle_gym(gym)
if monacle_gym and monacle_gym.raid_start and monacle_gym.raid_start <= datetime.datetime.now() and monacle_gym.raid_end >= datetime.datetime.now():
num_users = len(self.going_users[gym_id]) + len(self.arrived_users[gym_id])
message += str(num_users)
if num_users == 1:
message += ' user is'
else:
message += ' users are'
message += ' on the way to the {} raid at {} - ends at {} ({}).\n'.format(monacle_gym.raid_pokemon.name, gym.title, monacle_gym.raid_end.strftime("%H:%M:%S"), humanize.naturaltime(datetime.datetime.now()-monacle_gym.raid_end))
await self.bot.say(message)
await self.bot.delete_message(ctx.message)
def get_services(ctx, cluster, sort_by):
if not cluster:
cluster = ctx.obj['cluster']
bw = ctx.obj['bw']
records = bw.get_services(cluster=cluster)
if sort_by:
records.sort(key=lambda r: jp(r, sort_by))
out = []
now = datetime.datetime.now(pytz.utc)
for r in records:
service_name = r['serviceName']
task_def = display.simple_task_definition(r['taskDefinition'])
status = r['status']
created_at = r['createdAt']
desired_count = r['desiredCount']
running_count = r['runningCount']
age = humanize.naturaltime(now - created_at)
row = (service_name, task_def, desired_count,
running_count, status, age)
out.append(row)
headers = ['NAME', 'TASK DEFINITION', 'DESIRED', 'RUNNING',
'STATUS', 'AGE']
output = tabulate.tabulate(out, headers=headers, tablefmt='plain')
click.echo(output)
def get_task(ctx, cluster, sort_by):
if not cluster:
cluster = ctx.obj['cluster']
bw = ctx.obj['bw']
records = bw.get_tasks(cluster=cluster)
if sort_by:
records.sort(key=lambda r: jp(r, sort_by))
out = []
now = datetime.datetime.now(pytz.utc)
for r in records:
status = r['lastStatus']
created_at = r['createdAt']
task_id = display.simple_task(r['taskArn'])
task_def = display.simple_task_definition(r['taskDefinitionArn'])
age = humanize.naturaltime(now - created_at)
row = (task_id, status, task_def, age)
out.append(row)
headers = ['TASK ID', 'STATUS', 'TASK DEFINITION', 'AGE']
output = tabulate.tabulate(out, headers=headers, tablefmt='plain')
click.echo(output)
def humanize_dt_ago(dtime):
"""format to "23 minutes ago" style format.
Parameters
----------
dtime : datetime.datetime
A datetime object
Returns
-------
str
Humanized string.
"""
# and here we went through all the trouble to make everything
# UTC and offset-aware. Le sigh. The underlying lib uses datetime.now()
# as the comparison reference, so we need naive localtime.
return humanize.naturaltime(dtime.astimezone(LOCAL_TZ).replace(tzinfo=None))
def print_probes():
termx, termy = get_termsize()
col1_width = termx // 2
col2_width = termx - col1_width - 1
DISPLAY_PROBES_MAX = termy - 2
print TERM_RESET + TERM_POS_ZERO + str(datetime.now()).center(termx)
for i in range(len(probes)):
probe = probes[i]
# ssid
out = ''
out += TERM_BOLD
age = (datetime.now() - probe['last_seen']).total_seconds()
if age < 60:
out += TERM_RED
elif age < 180:
out += TERM_YELLOW
else:
out += TERM_GREEN
out += probe['ssid'].rjust(col1_width)
# time
out += ' '
out += TERM_RESET + TERM_BLUE
out += humanize.naturaltime(probe['last_seen']).ljust(col2_width)
print out
def process_issues(repository, installation):
now = time.time()
# Get issues labeled as 'Close?'
repo = RepoHandler(repository, 'master', installation)
issuelist = repo.get_issues('open', 'Close?')
for n in issuelist:
print(f'Checking {n}')
issue = IssueHandler(repository, n, installation)
labeled_time = issue.get_label_added_date('Close?')
if labeled_time is None:
continue
dt = now - labeled_time
if current_app.stale_issue_close and dt > current_app.stale_issue_close_seconds:
comment_ids = issue.find_comments('astropy-bot[bot]', filter_keep=is_close_epilogue)
if len(comment_ids) == 0:
print(f'-> CLOSING issue {n}')
issue.submit_comment(ISSUE_CLOSE_EPILOGUE)
issue.close()
else:
print(f'-> Skipping issue {n} (already closed)')
elif dt > current_app.stale_issue_warn_seconds:
comment_ids = issue.find_comments('astropy-bot[bot]', filter_keep=is_close_warning)
if len(comment_ids) == 0:
print(f'-> WARNING issue {n}')
issue.submit_comment(ISSUE_CLOSE_WARNING.format(pasttime=naturaltime(dt),
futuretime=naturaldelta(current_app.stale_issue_close_seconds - current_app.stale_issue_warn_seconds)))
else:
print(f'-> Skipping issue {n} (already warned)')
else:
print(f'-> OK issue {n}')
def pretty_print_datetime(dt):
# Make sure the `dt` is timezone aware before pretty printing.
if dt.tzinfo is None or dt.tzinfo.utcoffset(dt) is None:
dt_pretty = dt.replace(tzinfo=tz.gettz('UTC'))
dt_pretty = dt_pretty.astimezone(tz.tzlocal())
else:
dt_pretty = dt.astimezone(tz.tzlocal())
dt_pretty_friendly = dt_pretty.strftime('%a %d %b, %I:%M%p')
dt_pretty_humanized = humanize.naturaltime(dt_pretty.replace(tzinfo=None))
return '%s (%s)' % (dt_pretty_friendly, dt_pretty_humanized,)
def naturaltime(val):
"""Get humanized version of time."""
val = val.replace(tzinfo=pytz.utc) \
if isinstance(val, datetime) else parse(val)
now = datetime.utcnow().replace(tzinfo=pytz.utc)
return humanize.naturaltime(now - val)
def index():
"""Display list of the user's repositories."""
github = GitHubAPI(user_id=current_user.id)
token = github.session_token
ctx = dict(connected=False)
if token:
# The user is authenticated and the token we have is still valid.
if github.account.extra_data.get('login') is None:
github.init_account()
db.session.commit()
# Sync if needed
if request.method == 'POST' or github.check_sync():
# When we're in an XHR request, we want to synchronously sync hooks
github.sync(async_hooks=(not request.is_xhr))
db.session.commit()
# Generate the repositories view object
extra_data = github.account.extra_data
repos = extra_data['repos']
if repos:
# 'Enhance' our repos dict, from our database model
db_repos = Repository.query.filter(
Repository.github_id.in_([int(k) for k in repos.keys()]),
).all()
for repo in db_repos:
repos[str(repo.github_id)]['instance'] = repo
repos[str(repo.github_id)]['latest'] = GitHubRelease(
repo.latest_release())
last_sync = humanize.naturaltime(
(utcnow() - parse_timestamp(extra_data['last_sync'])))
ctx.update({
'connected': True,
'repos': sorted(repos.items(), key=lambda x: x[1]['full_name']),
'last_sync': last_sync,
})
return render_template(current_app.config['GITHUB_TEMPLATE_INDEX'], **ctx)
def human_readable_last_seen(self):
return humanize.naturaltime(self.last_seen)
def time():
return humanize.naturaltime(datetime.now(pytz.UTC) - starttime)
def humanize_datetime_filter(dt: datetime, fmt=None):
"""??humanize????????datetime??"""
humanize.i18n.activate('zh_CN', path='etc/humanize')
return humanize.naturaltime(dt)
def create_app(object_name):
"""
An flask application factory
object_name: the python path of the config object,
e.g. oniongate.settings.ProdConfig
"""
app = Flask(__name__, instance_relative_config=True)
app.config.from_object(object_name)
try:
app.config.from_pyfile("config.py")
except FileNotFoundError:
pass
# Create zone file directory if it doesn't exist
zone_directory = app.config.get('zone_dir') or os.path.join(app.instance_path, 'zones')
if not os.path.isdir(zone_directory):
os.makedirs(zone_directory)
app.config["zone_dir"] = zone_directory
api_bp = Blueprint('api', __name__)
api = Api(api_bp)
CORS(app, resources={r"/api/*": {"origins": "*"}})
db.init_app(app)
# register our blueprints
app.register_blueprint(main_bp)
app.register_blueprint(api_bp, url_prefix='/api/v1')
api.add_resource(Domains, '/domains', '/domains/<domain_name>')
api.add_resource(Records, '/records/<domain_name>', '/records/<domain_name>/<record_id>')
api.add_resource(Proxies, '/proxies', '/proxies/<ip_address>')
app.jinja_env.filters['naturaltime'] = humanize.naturaltime
return app
def slang_time(self):
""""Returns human slang representation of time."""
dt = self.datetime(naive=True, to_timezone=self.local_timezone)
return humanize.naturaltime(dt)
def search(current, query=None):
if not query:
raise ValueError("query must be provided.")
query = query.strip()
condition = current.db.clients.id > 0
orderby = None
audit.log(current, "ClientSearch", query=query)
# Search for a client ID directly.
if query.startswith("C."):
condition = current.db.clients.client_id == query
elif query.startswith("label:"):
label = query.split(":", 1)[1]
condition = (current.db.clients.labels == label) | (
current.db.clients.custom_labels == label)
orderby = current.db.clients.id
else:
# AppEngine uses Bigtable which does not support `like` operation. We
# only support a prefix match.
condition = ((current.db.clients.hostname >= query) &
(current.db.clients.hostname < query + u"\ufffd"))
result = []
for row in current.db(condition).select(
orderby_on_limitby=False, limitby=(0, 1000),
orderby=orderby):
labels = set([x for x in row.labels if x])
result.append(dict(
last=row.last,
last_humanized=humanize.naturaltime(
datetime.datetime.now()-row.last),
client_id=row.client_id,
summary=json.loads(row.summary),
labels=sorted(labels),
# These are the only labels which may be modified.
custom_labels=row.custom_labels))
return dict(data=result)
def gym(self, ctx, *, gym: str):
"""
Lookup a gym, responds with an image, title, description and a google maps link.
Gyms that have active raids are prioritized over gyms that do not.
"""
hit, monacle_gym = await self.find_gym(gym)
if not hit:
return
description = "{}\n[Get Directions](https://www.google.com/maps/?daddr={},{})".format(hit.description, hit.latitude, hit.longitude)
embed=discord.Embed(title=hit.title, url='https://www.pokemongomap.info'+hit.url, description=description)
embed.set_thumbnail(url=hit.image)
if monacle_gym:
embed.set_image(url='https://maps.googleapis.com/maps/api/staticmap?center={0},{1}&zoom=15&size=250x125&maptype=roadmap&markers=color:{3}%7C{0},{1}&key={2}'.format(hit.latitude, hit.longitude, 'AIzaSyCEadifeA8X02v2OKv-orZWm8nQf1Q2EZ4', "0x{:02X}".format(TEAM_COLORS[monacle_gym.team])))
embed.color = TEAM_COLORS[monacle_gym.team]
if monacle_gym.slots_available > 0:
embed.add_field(name='Slots available', value=monacle_gym.slots_available)
embed.add_field(name='Owned by', value=monacle_gym.team_name)
if monacle_gym.raid_start and monacle_gym.raid_start <= datetime.datetime.now() and monacle_gym.raid_end >= datetime.datetime.now():
embed.add_field(name='Raid level', value=monacle_gym.raid_level)
embed.add_field(name='Raid Pokemon', value=monacle_gym.raid_pokemon.name)
embed.add_field(name='CP', value=monacle_gym.raid_pokemon.cp)
embed.add_field(name='Moveset', value=MOVES[monacle_gym.raid_pokemon.move_1]+' / '+MOVES[monacle_gym.raid_pokemon.move_2])
embed.add_field(name='Started at', value=monacle_gym.raid_start.strftime("%H:%M:%S"))
embed.add_field(name='Ends at', value="{} ({})".format(monacle_gym.raid_end.strftime("%H:%M:%S"), humanize.naturaltime(datetime.datetime.now()-monacle_gym.raid_end)))
else:
embed.set_image(url='https://maps.googleapis.com/maps/api/staticmap?center={0},{1}&zoom=15&size=250x125&maptype=roadmap&markers=color:{3}%7C{0},{1}&key={2}'.format(hit.latitude, hit.longitude, 'AIzaSyCEadifeA8X02v2OKv-orZWm8nQf1Q2EZ4', 'white'))
await self.bot.say(embed=embed)
def updateCard(self, trainer):
dailyDiff = await self.getDiff(trainer, 1)
level=trainer.level
embed=discord.Embed(timestamp=dailyDiff.new_date, colour=int(trainer.team().colour.replace("#", ""), 16))
try:
embed.set_author(name=trainer.username, icon_url=trainer.account().discord().avatar_url)
except:
embed.set_author(name=trainer.username)
embed.add_field(name='Level', value=level.level)
if level.level != 40:
embed.add_field(name='XP', value='{:,} / {:,}'.format(trainer.update.xp-level.total_xp,level.xp_required))
else:
embed.add_field(name='Total XP', value='{}'.format(humanize.intword(trainer.update.xp)))
if dailyDiff.change_xp and dailyDiff.change_time:
gain = '{:,} since {}. '.format(dailyDiff.change_xp, humanize.naturalday(dailyDiff.old_date))
if dailyDiff.change_time.days>1:
gain += "That's {:,} xp/day.".format(round(dailyDiff.change_xp/dailyDiff.change_time.days))
embed.add_field(name='Gain', value=gain)
if trainer.goal_daily and dailyDiff.change_time.days>0:
dailyGoal = trainer.goal_daily
embed.add_field(name='Daily completion', value='{}% towards {:,}'.format(pycent.percentage(dailyDiff.change_xp/max(1,dailyDiff.change_time.days), dailyGoal), dailyGoal))
if trainer.goal_total and trainer.goal_total!=0:
totalGoal = trainer.goal_total
else:
totalGoal = None
if totalGoal:
totalDiff = await self.getDiff(trainer, 7)
embed.add_field(name='Goal remaining', value='{:,} out of {}'.format(totalGoal-totalDiff.new_xp, humanize.intword(totalGoal)))
if totalDiff.change_time.seconds>=1:
eta = lambda x, y, z: round(x/(y/z))
eta = eta(totalGoal-totalDiff.new_xp, totalDiff.change_xp, totalDiff.change_time.total_seconds())
eta = totalDiff.new_date+datetime.timedelta(seconds=eta)
embed.add_field(name='Goal ETA', value=humanize.naturaltime(eta.replace(tzinfo=None)))
if totalDiff.change_time.total_seconds()<583200:
embed.description = "ETA may be inaccurate. Using {} of data.".format(humanize.naturaldelta(totalDiff.change_time))
embed.set_footer(text="Total XP: {:,}".format(dailyDiff.new_xp))
return embed
def ago_filter(dt):
"""
Format a datetime using humanize.naturaltime, "ago"
:param dt: datetime to compare to now
:type dt: datetime.datetime
:return: ago string
:rtype: str
"""
if dt == '' or dt is None or isinstance(dt, Undefined):
return ''
return naturaltime(dtnow() - dt)
def hr_time(self):
return humanize.naturaltime(time.time() - self.upload_time)
def _jinja2_filter_datetime_local(datetime):
if datetime is None:
return 'None'
return naturaltime(datetime)
def human_since(since, include_tz=False):
tz = dateutil.tz.tzlocal() if include_tz else None
return naturaltime(datetime.datetime.now(tz=tz) - dateutil.parser.parse(since))
def on_guild_member_add(self, event):
created = humanize.naturaltime(datetime.utcnow() - to_datetime(event.user.id))
new = (
event.config.new_member_threshold and
(time.time() - to_unix(event.user.id)) < event.config.new_member_threshold
)
self.log_action(Actions.GUILD_MEMBER_ADD, event, new=' :new:' if new else '', created=created)
def format_time(time_event):
""" Return a formatted time and humanized time for a time event """
try:
if not isinstance(time_event, datetime.datetime):
time_event = dateutil.parser.parse(time_event)
tz_info = time_event.tzinfo
time_diff = datetime.datetime.now(tz_info) - time_event
humanize_format = humanize.naturaltime(time_diff)
time_format = datetime.datetime.strftime(time_event, "%F %X")
return time_format + " (" + humanize_format + ")"
except AttributeError:
return "(Time format error)"
def check_notes(self, target, mask, data, event):
del data, event
try:
msgs = self.db.filter(Memo, {'recipient': mask.nick.lower()})
msgword = "message" if len(msgs) < 2 else "messages" # Fix: I have 1 messages for you!
except Memo.DoesNotExist:
return
if len(msgs) == 0:
return
# Avoid telling people they have messages in public, if any of them are set public=False
if contains_private_messages(msgs):
self.msg(mask, mask.nick, "I have %s %s for you, %s!" % (len(msgs), msgword, mask.nick))
else:
self.msg(mask, target, "I have %s %s for you, %s!" % (len(msgs), msgword, mask.nick))
# Actually deliver the memos
for msg in msgs:
# This looks ridiculous but we don't care about the timezone really, only the relative time
# from the local system clock.
now = datetime.datetime.strptime(ctime(), "%a %b %d %H:%M:%S %Y")
reltime = humanize.naturaltime(now - datetime.datetime.strptime(msg.timestamp, "%a %b %d %H:%M:%S %Y"))
message_text = "%s // %s // %s" % (msg.sender, reltime, msg.text)
if msg.public:
self.msg(mask, target, message_text)
self.db.delete(msg)
else:
self.bot.privmsg(mask.nick, message_text)
self.db.delete(msg)
self.db.commit()
def process(self, msg):
"""
`schedule: 10s cmd: test`
`schedule: every friday cmd: test`
`schedule: every friday at 10:30 cmd: test`
"""
params = msg.extract_parameters(
['schedule'], r"\s?(.*cmd:?|\S*['-]*|\w*.+|'[^']+')")
params.update(msg.extract_parameters(['cmd']))
if params['cmd'] is not None:
cal = parsedatetime.Calendar()
try:
when, status = cal.parse(params['schedule'])
when = datetime(*when[:6])
except Exception as e:
msg.reply("Cannot parse %s with %s" % (when, e))
return True
msg.bot.state.jobs.update(params['cmd'], {
'every': "every" in params['schedule'],
'when': when,
'raw': params,
'delta': when - datetime.now().replace(microsecond=0),
'cmd': params['cmd'],
'user': msg.user
})
msg.reply("Your job: %s is scheduled in %s. %s" % (
params['cmd'], humanize.naturaltime(when), when))
else:
msg.reply("Your jobs: %s" % '\n'.join([
j.__repr__() for j in msg.bot.state.jobs.all()]))
def __repr__(self):
return "<%s-%s>" % (
humanize.naturaltime(self.get_next()),
self.cmd)
def format_time(time):
if time is None or time < UNKNOWN_CUTOFF:
return "Unknown"
return "{} ({} UTC)".format(
humanize.naturaltime(time + (datetime.now() - datetime.utcnow())), time)
def ap_list(host):
hosts = service.expand_host(host[0])
def _calc_load(x):
return x / 65535
@coroutine
def _run():
rows = []
aps = yield service.create_multiple_ap(hosts)
details = yield service.ap_list(aps)
header_out('name, host, #clients, loadavg, mem, uptime')
for ap in details:
row = []
row.append(_val(ap, 'board.hostname'))
row.append(_val(ap, 'host'))
row.append('%s' % (_val(ap, 'num_clients')))
row.append('%.2f / %.2f / %.2f' % (
_val(ap, 'system.load.0', _calc_load),
_val(ap, 'system.load.1', _calc_load),
_val(ap, 'system.load.2', _calc_load)))
row.append('%s / %s' %
(_val(ap, 'system.memory.free', naturalsize),
_val(ap, 'system.memory.total', naturalsize)))
row.append('%s' % (_val(ap, 'system.uptime', naturaltime)))
rows.append(', '.join(row))
out('\n'.join(sorted(rows, cmp_host)))
IOLoop.instance().run_sync(_run)