def to_utc_timestamp(date_time):
"""Convert a naive or timezone-aware datetime to UTC timestamp.
Arguments:
date_time (:py:class:`datetime.datetime`): The datetime to
convert.
Returns:
:py:class:`int`: The timestamp (in seconds).
"""
if date_time is None:
return
if date_time.tzname() is None:
timestamp = date_time.replace(tzinfo=timezone.utc).timestamp()
else:
timestamp = date_time.timestamp()
return int(round(timestamp, 0))
python类utc()的实例源码
def occurred(at_):
"""Calculate when a service event occurred.
Arguments:
at_ (:py:class:`str`): When the event occurred.
Returns:
:py:class:`str`: The humanized occurrence time.
"""
try:
occurred_at = parse(at_)
except (TypeError, ValueError):
logger.warning('failed to parse occurrence time %r', at_)
return 'time not available'
utc_now = datetime.now(tz=timezone.utc)
try:
return naturaltime((utc_now - occurred_at).total_seconds())
except TypeError: # at_ is a naive datetime
return naturaltime((datetime.now() - occurred_at).total_seconds())
def calculate_timeout(http_date):
"""Extract request timeout from e.g. ``Retry-After`` header.
Note:
Per :rfc:`2616#section-14.37`, the ``Retry-After`` header can
be either an integer number of seconds or an HTTP date. This
function can handle either.
Arguments:
http_date (:py:class:`str`): The date to parse.
Returns:
:py:class:`int`: The timeout, in seconds.
"""
try:
return int(http_date)
except ValueError:
date_after = parse(http_date)
utc_now = datetime.now(tz=timezone.utc)
return int((date_after - utc_now).total_seconds())
def reset(self, variantId, buildId, resultHash):
self.__variantId = variantId
self.__buildId = buildId
self.__resultHash = resultHash
self.__recipes = None
self.__defines = {}
u = os.uname()
self.__build = {
'sysname' : u.sysname,
'nodename' : u.nodename,
'release' : u.release,
'version' : u.version,
'machine' : u.machine,
'date' : datetime.now(timezone.utc).isoformat(),
}
self.__env = ""
self.__metaEnv = {}
self.__scms = []
self.__deps = []
self.__tools = {}
self.__sandbox = None
self.__id = None
def test_throttling_compute_fine(SETTINGS):
SETTINGS.RATE_LIMIT_THRESHOLD = 0
from jenkins_epo.procedures import compute_throttling
# Consumed 1/5 calls at 2/3 of the time.
now = datetime(2017, 1, 18, 14, 40, tzinfo=timezone.utc)
reset = datetime(2017, 1, 18, 15, tzinfo=timezone.utc)
remaining = 4000
seconds = compute_throttling(
now=now,
rate_limit=dict(rate=dict(
limit=5000, remaining=remaining,
reset=reset.timestamp(),
)),
)
assert 0 == seconds # Fine !
def convert_time(time):
# 2014-11-05T09:00:00Z
# yyyy-mm-ddThh:mm:ssZ
# 2013-06-12T16:00:00+09:00
if time is None:
return None
if len(time) < 10:
return None
year = int(time[:4])
month = int(time[5:7])
date = int(time[8:10])
hour = int(time[11:13])
minute = int(time[14:16])
second = int(time[17:19])
if len(time) > 21:
tz = timezone(timedelta(hours=9))
else:
tz = timezone.utc
dt = datetime.datetime(year, month, date, hour, minute, second, 0, tz)
return dt
def event_remaining(dt_start, dt_end):
now = datetime.datetime.now(timezone.utc)
diff_end = dt_end - now
diff_start = now - dt_start
if diff_start.total_seconds() < 0:
return "Event has not started yet!"
elif diff_end.total_seconds() < 0:
return "Event has ended!"
else:
seconds = diff_end.seconds
hours = seconds // 3600
seconds -= hours * 3600
minutes = seconds // 60
seconds -= minutes * 60
return "Event ends in {} days, {} hours, {} minutes and {} seconds.".format(diff_end.days, hours, minutes, seconds)
###### Sunshine ######
def header(self):
dDc = self.configs.Dc_cell
Dc_min, Dc_max = self.configs.Dc_limit
header = fits.Header()
header["BUNIT"] = (self.configs.unit, "Data unit")
header["zmin"] = (self.configs.zmin, "HI simulation minimum redshift")
header["zmax"] = (self.configs.zmax, "HI simulation maximum redshift")
header["dz"] = (self.configs.dz, "HI simulation redshift step size")
header["Dc_min"] = (Dc_min, "[cMpc] comoving distance at zmin")
header["Dc_max"] = (Dc_max, "[cMpc] comoving distance at zmax")
header["Dc_step"] = (dDc, "[cMpc] comoving distance between slices")
header["Lside"] = (self.configs.Lside, "[cMpc] Simulation side length")
header["Nside"] = (self.configs.Nside, "Number of cells at each side")
header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
"File creation date")
header.add_history(" ".join(sys.argv))
header.extend(self.wcs.to_header(), update=True)
return header
def write_slice(self, outfile, data, z, clobber=False):
freq = z2freq(z)
Dc = cosmo.comoving_distance(z).value # [Mpc]
header = fits.Header()
header["BUNIT"] = (self.header["BUNIT"],
self.header.comments["BUNIT"])
header["Lside"] = (self.header["Lside"],
self.header.comments["Lside"])
header["Nside"] = (self.header["Nside"],
self.header.comments["Nside"])
header["REDSHIFT"] = (z, "redshift of this slice")
header["FREQ"] = (freq, "[MHz] observed HI signal frequency")
header["Dc"] = (Dc, "[cMpc] comoving distance")
header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
"File creation date")
header.add_history(" ".join(sys.argv))
hdu = fits.PrimaryHDU(data=data, header=header)
try:
hdu.writeto(outfile, overwrite=clobber)
except TypeError:
hdu.writeto(outfile, clobber=clobber)
logger.info("Wrote slice to file: %s" % outfile)
def header(self):
dDc = self.Dc_cell
header = fits.Header()
header["BUNIT"] = (str(self.unit), "Data unit")
header["zmin"] = (self.zmin, "HI simulation minimum redshift")
header["zmax"] = (self.zmax, "HI simulation maximum redshift")
header["Dc_min"] = (self.Dc_min, "[cMpc] comoving distance at zmin")
header["Dc_max"] = (self.Dc_max, "[cMpc] comoving distance at zmax")
header["Dc_step"] = (dDc, "[cMpc] comoving distance between slices")
header["Lside"] = (self.Lside, "[cMpc] Simulation side length")
header["Nside"] = (self.Nside, "Number of cells at each side")
header["DATE"] = (datetime.now(timezone.utc).astimezone().isoformat(),
"File creation date")
header.add_history(" ".join(sys.argv))
header.extend(self.wcs.to_header(), update=True)
return header
def create_game():
"""Create a new game."""
form = GameCreateForm(request.form)
_set_game_create_choices(form)
if form.validate_on_submit():
white = Player.get_by_id(form.white_id.data)
black = Player.get_by_id(form.black_id.data)
played_at = None
if form.played_at.data is not None:
played_at = form.played_at.data.astimezone(timezone.utc)
game = Game.create(
white=white,
black=black,
winner=form.winner.data,
handicap=form.handicap.data,
komi=form.komi.data,
season=form.season.data,
episode=form.episode.data,
played_at=played_at
)
messenger.notify_slack(_slack_game_msg(game))
return jsonify(game.to_dict()), 201
else:
return jsonify(**form.errors), 404
def _slack_game_msg(game):
if game.winner is Color.white:
msg = '<{w_url}|{w_name}> (W) defeated <{b_url}|{b_name}> (B)'
else:
msg = '<{b_url}|{b_name}> (B) defeated <{w_url}|{w_name}> (W)'
result = (msg + ' at {handicap} stones, {komi}.5 komi at <!date^{date_val}'
'^{{time}} on {{date_num}}|{date_string}> '
'(S{season:0>2}E{episode:0>2})')
# Gross hack around the fact that we retrieve as naive DateTimes.
# See: https://github.com/massgo/league/issues/93
utc_time = int(game.played_at.replace(tzinfo=timezone.utc).timestamp())
return result.format(w_name=game.white.full_name,
w_url=url_for('dashboard.get_player',
player_id=game.white.id, _external=True),
b_name=game.black.full_name,
b_url=url_for('dashboard.get_player',
player_id=game.black.id, _external=True),
handicap=game.handicap,
komi=game.komi,
date_string=game.played_at,
date_val=utc_time,
season=game.season,
episode=game.episode)
def update_game():
"""Update an existing game."""
form = GameUpdateForm(request.form)
_set_game_create_choices(form)
if form.validate_on_submit():
white = Player.get_by_id(form.white_id.data)
black = Player.get_by_id(form.black_id.data)
played_at = None
if form.played_at.data is not None:
played_at = form.played_at.data.astimezone(timezone.utc)
game = Game.get_by_id(form.game_id.data)
game.update(
white=white,
black=black,
winner=form.winner.data,
handicap=form.handicap.data,
komi=form.komi.data,
season=form.season.data,
episode=form.episode.data,
played_at=played_at
)
return jsonify(game.to_dict()), 200
else:
return jsonify(**form.errors), 404
def _step(self, exc=None):
"""
Wrapper around `Task._step()` to automatically dispatch a
`TaskExecState.BEGIN` event.
"""
if not self._in_progress:
self._start = datetime.now(timezone.utc)
source = {'task_exec_id': self.uid}
if self._template:
source['task_template_id'] = self._template.uid
if self._workflow:
source['workflow_template_id'] = self._workflow.template.uid
source['workflow_exec_id'] = self._workflow.uid
self._source = EventSource(**source)
self._in_progress = True
data = {
'type': TaskExecState.BEGIN.value,
'content': self._inputs
}
self._broker.dispatch(
data,
topics=workflow_exec_topics(self._source._workflow_exec_id),
source=self._source,
)
super()._step(exc)
def get_next_event(self):
"""Access to the next Event in the calendar.
Returns:
The Event object corresponding to the next event in the calendar
or None if there is no event.
"""
now = datetime.now(timezone.utc)
while self.event_list and self.event_list[0].end < now:
self.event_list.pop(0)
if len(self.event_list) == 0:
return None
elif self.event_list[0].start > now:
return self.event_list[0]
elif len(self.event_list) == 1:
return None
else:
return self.event_list[1]
def get_now_event(self):
"""Access to the current Event in the calendar.
Returns:
The Event object corresponding to the current event in the calendar
or None if there is no event.
"""
now = datetime.now(timezone.utc)
while self.event_list and self.event_list[0].end < now:
self.event_list.pop(0)
if len(self.event_list) == 0:
return None
elif self.event_list[0].start < now < self.event_list[0].end:
return self.event_list[0]
else:
return None
def make_new_entry(self, rel_path, id_handler):
"""
Generates a new entry for the specified path.
Note: This will mutate the id_handler!
"""
# Try to match to an existing book.
e_id = id_handler.new_id()
abs_path = os.path.join(read_from_config('media_loc').path, rel_path)
lmtime = os.path.getmtime(abs_path)
added_dt = datetime.utcfromtimestamp(lmtime)
last_modified = added_dt.replace(tzinfo=timezone.utc)
entry_obj = oh.Entry(id=e_id, path=rel_path,
date_added=datetime.now(timezone.utc),
last_modified=last_modified,
type='Book', table=self.BOOK_TABLE_NAME, data_id=None,
hashseed=_rand.randint(0, 2**32))
return entry_obj
def map_external_gallery_data_to_internal(gallery_data: DataDict) -> GalleryData:
internal_gallery_data = GalleryData(
gallery_data['gid'],
token=gallery_data['token'],
archiver_key=gallery_data['archiver_key'],
title=unescape(gallery_data['title']),
title_jpn=unescape(gallery_data['title_jpn']),
thumbnail_url=gallery_data['thumb'],
category=gallery_data['category'],
provider=constants.provider_name,
uploader=gallery_data['uploader'],
posted=datetime.fromtimestamp(int(gallery_data['posted']), timezone.utc),
filecount=gallery_data['filecount'],
filesize=gallery_data['filesize'],
expunged=gallery_data['expunged'],
rating=gallery_data['rating'],
tags=translate_tag_list(gallery_data['tags']),
)
m = re.search(constants.default_fjord_tags, ",".join(internal_gallery_data.tags))
if m:
internal_gallery_data.fjord = True
if constants.ex_thumb_url in internal_gallery_data.thumbnail_url:
internal_gallery_data.thumbnail_url = internal_gallery_data.thumbnail_url.replace(constants.ex_thumb_url, constants.ge_thumb_url)
return internal_gallery_data
def t_date(s):
"""
TaskWarrior provides times as UTC timestamps in ISO 8601
"""
year = int(s[0:4])
month = int(s[4:6])
day = int(s[6:8])
hour = int(s[9:11])
minute = int(s[11:13])
second = int(s[13:15])
# This is UTC time
ts = datetime(year, month, day, hour, minute, second)
# Convert to local time
local_time = ts.replace(tzinfo=timezone.utc).astimezone(tz=None)
# Convert to ISO display format, and remove timezone offset
iso_format = local_time.isoformat(sep=" ")[:-6]
return iso_format
# TODO: move to separate module
def convert_between_tz_and_utc(self, tz, utc):
dston = self.dston.replace(tzinfo=tz)
# Because 1:MM on the day DST ends is taken as being standard time,
# there is no spelling in tz for the last hour of daylight time.
# For purposes of the test, the last hour of DST is 0:MM, which is
# taken as being daylight time (and 1:MM is taken as being standard
# time).
dstoff = self.dstoff.replace(tzinfo=tz)
for delta in (timedelta(weeks=13),
DAY,
HOUR,
timedelta(minutes=1),
timedelta(microseconds=1)):
self.checkinside(dston, tz, utc, dston, dstoff)
for during in dston + delta, dstoff - delta:
self.checkinside(during, tz, utc, dston, dstoff)
self.checkoutside(dstoff, tz, utc)
for outside in dston - delta, dstoff + delta:
self.checkoutside(outside, tz, utc)
def test_easy(self):
# Despite the name of this test, the endcases are excruciating.
self.convert_between_tz_and_utc(Eastern, utc_real)
self.convert_between_tz_and_utc(Pacific, utc_real)
self.convert_between_tz_and_utc(Eastern, utc_fake)
self.convert_between_tz_and_utc(Pacific, utc_fake)
# The next is really dancing near the edge. It works because
# Pacific and Eastern are far enough apart that their "problem
# hours" don't overlap.
self.convert_between_tz_and_utc(Eastern, Pacific)
self.convert_between_tz_and_utc(Pacific, Eastern)
# OTOH, these fail! Don't enable them. The difficulty is that
# the edge case tests assume that every hour is representable in
# the "utc" class. This is always true for a fixed-offset tzinfo
# class (lke utc_real and utc_fake), but not for Eastern or Central.
# For these adjacent DST-aware time zones, the range of time offsets
# tested ends up creating hours in the one that aren't representable
# in the other. For the same reason, we would see failures in the
# Eastern vs Pacific tests too if we added 3*HOUR to the list of
# offset deltas in convert_between_tz_and_utc().
#
# self.convert_between_tz_and_utc(Eastern, Central) # can't work
# self.convert_between_tz_and_utc(Central, Eastern) # can't work
def parse_match(self, response):
item = MatchInfo()
item['id'] = parse_qs(response.xpath('//div[@class="clearfix subnav level-1"]//li//a/@href').extract()[3])['id'][0]
item['area'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[1]
item['competition'] = response.xpath('//div[@class="clearfix subnav level-1"]//li//a/text()').extract()[2]
item['home_team'] = response.xpath('//div[@class="container left"]//a/text()').extract_first()
item['away_team'] = response.xpath('//div[@class="container right"]//a/text()').extract_first()
item['ht_last5'] = ''.join(response.xpath('//div[@class="container left"]//a/text()').extract()[1:6])
item['at_last5'] = ''.join(response.xpath('//div[@class="container right"]//a/text()').extract()[1:6])
item['datetime'] = datetime.fromtimestamp(int(response.xpath('//div[@class="details clearfix"]/dl/dt[.="Date"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Date"]//span/@data-value').extract_first()), timezone.utc).isoformat(' ')
#item['competition'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Competition"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Competition"]/a/text()').extract_first()
item['game_week'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Game week"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Game week"]/text()').extract_first()
item['kick_off'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Kick-off"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Kick-off"]//span/text()').extract_first()
item['venue'] = response.xpath('//div[@class="details clearfix"]/dl/dt[.="Venue"]/following-sibling::dd[preceding-sibling::dt[1]/text()="Venue"]//a/text()').extract_first()
item['updated'] = datetime.utcnow().isoformat(' ')
yield item
return item
#self.log('URL: {}'.format(response.url))
def _decode(self, o):
if isinstance(o, dict):
if len(o) == 1:
if "$escape" in o:
return self._decode_escaped(o['$escape'])
if "$date" in o:
return datetime.fromtimestamp(o["$date"] / 1000.0, timezone.utc)
if "$binary" in o:
return b64decode(o['$binary'])
if len(o) == 2 and "$type" in o and "$value" in o:
try:
reviver = self.custom_type_hooks[o['$type']]
except KeyError:
raise UnknownTypeError(o["$type"])
return reviver(o["$value"])
if self.object_pairs_hook is not None:
return self.object_pairs_hook((k, self._decode(v)) for k, v in o.items())
return {k: self._decode(v) for k, v in o.items()}
if isinstance(o, (list, tuple)):
return [self._decode(v) for v in o]
return o
def benchmark():
ds = aw_datastore.Datastore(aw_datastore.storages.PeeweeStorage, testing=True)
api = aw_server.api.ServerAPI(ds, testing=True)
print(api.get_info())
bucket_id = "test-benchmark"
try:
api.create_bucket(bucket_id, "test", "test", "test")
except Exception as e:
print(e)
print("Benchmarking... this will take 30 seconds")
for i in range(120):
sleep(0.1)
api.heartbeat(bucket_id, Event(timestamp=datetime.now(tz=tz.utc), data={"test": str(int(i))}), pulsetime=0.3)
def _create_heartbeat_events(start=datetime.now(tz=timezone.utc),
delta=timedelta(seconds=1)):
e1_ts = start
e2_ts = e1_ts + delta
# Needed since server (or underlying datastore) drops precision up to milliseconds.
# Update: Even with millisecond precision it sometimes fails. (tried using `round` and `int`)
# Now rounding down to 10ms precision to prevent random failure.
# 10ms precision at least seems to work well.
# TODO: Figure out why it sometimes fails with millisecond precision. Would probably
# be useful to find the microsecond values where it consistently always fails.
e1_ts = e1_ts.replace(microsecond=int(e1_ts.microsecond / 10000) * 100)
e2_ts = e2_ts.replace(microsecond=int(e2_ts.microsecond / 10000) * 100)
e1 = Event(timestamp=e1_ts, data={"label": "test"})
e2 = Event(timestamp=e2_ts, data={"label": "test"})
return e1, e2
def test_midnight_heartbeats(client, bucket):
now = datetime.now(tz=timezone.utc)
midnight = now.replace(hour=23, minute=50)
events = _create_periodic_events(20, start=midnight, delta=timedelta(minutes=1))
label_ring = ["1", "1", "2", "3", "4"]
for i, e in enumerate(events):
e.data["label"] = label_ring[i % len(label_ring)]
client.heartbeat(bucket, e, pulsetime=90)
recv_events_merged = client.get_events(bucket, limit=-1)
assert len(recv_events_merged) == 4 / 5 * len(events)
recv_events_after_midnight = client.get_events(bucket, start=midnight + timedelta(minutes=10))
pprint(recv_events_after_midnight)
assert len(recv_events_after_midnight) == int(len(recv_events_merged) / 2)
def test_astimezone(self):
d = Pendulum(2015, 1, 15, 18, 15, 34)
now = Pendulum(2015, 1, 15, 18, 15, 34)
self.assertEqual('UTC', d.timezone_name)
self.assertPendulum(d, now.year, now.month, now.day, now.hour, now.minute)
d = d.astimezone('Europe/Paris')
self.assertEqual('Europe/Paris', d.timezone_name)
self.assertPendulum(d, now.year, now.month, now.day, now.hour + 1, now.minute)
if sys.version_info >= (3, 2):
d = d.astimezone(timezone.utc)
self.assertEqual('+00:00', d.timezone_name)
self.assertPendulum(d, now.year, now.month, now.day, now.hour, now.minute)
d = d.astimezone(timezone(timedelta(hours=-8)))
self.assertEqual('-08:00', d.timezone_name)
self.assertPendulum(d, now.year, now.month, now.day, now.hour - 8, now.minute)
def convert_between_tz_and_utc(self, tz, utc):
dston = self.dston.replace(tzinfo=tz)
# Because 1:MM on the day DST ends is taken as being standard time,
# there is no spelling in tz for the last hour of daylight time.
# For purposes of the test, the last hour of DST is 0:MM, which is
# taken as being daylight time (and 1:MM is taken as being standard
# time).
dstoff = self.dstoff.replace(tzinfo=tz)
for delta in (timedelta(weeks=13),
DAY,
HOUR,
timedelta(minutes=1),
timedelta(microseconds=1)):
self.checkinside(dston, tz, utc, dston, dstoff)
for during in dston + delta, dstoff - delta:
self.checkinside(during, tz, utc, dston, dstoff)
self.checkoutside(dstoff, tz, utc)
for outside in dston - delta, dstoff + delta:
self.checkoutside(outside, tz, utc)
def test_easy(self):
# Despite the name of this test, the endcases are excruciating.
self.convert_between_tz_and_utc(Eastern, utc_real)
self.convert_between_tz_and_utc(Pacific, utc_real)
self.convert_between_tz_and_utc(Eastern, utc_fake)
self.convert_between_tz_and_utc(Pacific, utc_fake)
# The next is really dancing near the edge. It works because
# Pacific and Eastern are far enough apart that their "problem
# hours" don't overlap.
self.convert_between_tz_and_utc(Eastern, Pacific)
self.convert_between_tz_and_utc(Pacific, Eastern)
# OTOH, these fail! Don't enable them. The difficulty is that
# the edge case tests assume that every hour is representable in
# the "utc" class. This is always true for a fixed-offset tzinfo
# class (lke utc_real and utc_fake), but not for Eastern or Central.
# For these adjacent DST-aware time zones, the range of time offsets
# tested ends up creating hours in the one that aren't representable
# in the other. For the same reason, we would see failures in the
# Eastern vs Pacific tests too if we added 3*HOUR to the list of
# offset deltas in convert_between_tz_and_utc().
#
# self.convert_between_tz_and_utc(Eastern, Central) # can't work
# self.convert_between_tz_and_utc(Central, Eastern) # can't work
def parse_tweets(raw_tweets, source, now=None):
"""
Parses a list of raw tweet lines from a twtxt file
and returns a list of :class:`Tweet` objects.
:param list raw_tweets: list of raw tweet lines
:param Source source: the source of the given tweets
:param Datetime now: the current datetime
:returns: a list of parsed tweets :class:`Tweet` objects
:rtype: list
"""
if now is None:
now = datetime.now(timezone.utc)
tweets = []
for line in raw_tweets:
try:
tweet = parse_tweet(line, source, now)
except (ValueError, OverflowError) as e:
logger.debug("{0} - {1}".format(source.url, e))
else:
tweets.append(tweet)
return tweets