def parse_tweet(raw_tweet, source, now=None):
"""
Parses a single raw tweet line from a twtxt file
and returns a :class:`Tweet` object.
:param str raw_tweet: a single raw tweet line
:param Source source: the source of the given tweet
:param Datetime now: the current datetime
:returns: the parsed tweet
:rtype: Tweet
"""
if now is None:
now = datetime.now(timezone.utc)
raw_created_at, text = raw_tweet.split("\t", 1)
created_at = parse_iso8601(raw_created_at)
if created_at > now:
raise ValueError("Tweet is from the future")
return Tweet(click.unstyle(text.strip()), created_at, source)
python类utc()的实例源码
def set_time_range(self, time_range):
# allow a single time to be passed in place of a range
if type(time_range) not in [tuple, list]:
time_range = (time_range, time_range)
# translate the times to unix timestamps
def parse_time(time):
if type(time) in [int, float, str]:
time = int(time)
# realistic timestamp range
if 10**8 < time < 10**13:
return time
# otherwise archive.org timestamp format (possibly truncated)
time_string = str(time)[::-1].zfill(14)[::-1]
time = datetime.strptime(time_string, self.timestamp_format)
time = time.replace(tzinfo=timezone.utc)
return time.timestamp()
self.time_range = [parse_time(time) for time in time_range]
def create_service_principal(options, sub_config):
credentials = ServicePrincipalCredentials(
tenant=options['tenant_id'],
client_id=options['script_service_principal_client_id'],
secret=options['script_service_principal_secret'],
resource='https://graph.windows.net'
)
rbac_client = GraphRbacManagementClient(
credentials, tenant_id=options['tenant_id'])
# Create Service Principal
current_time = datetime.now(timezone.utc)
key = {
'start_date': current_time.isoformat(),
'end_date': current_time.replace(year=current_time.year + 3).isoformat(),
'key_id': str(uuid.uuid4()),
'value': str(uuid.uuid4())
}
sub_config['secret_key'] = key['value']
sub_config['service_principal'] = rbac_client.service_principals.create({
'app_id': sub_config['application'].app_id,
'account_enabled': True,
'password_credentials': [key]
})
return sub_config
def test_model_with_history_creates_changes_on_creation(self):
model_data = {
'name': 'Daffy Duck',
}
response = self.client.post('/animal/', data=json.dumps(model_data), content_type='application/json')
self.assertEqual(response.status_code, 200)
self.assertEqual(1, Changeset.objects.count())
cs = Changeset.objects.get()
self.assertEqual('testuser', cs.user.username)
self.assertAlmostEqual(datetime.now(tz=timezone.utc), cs.date, delta=timedelta(seconds=1))
self.assertEqual(5, Change.objects.count())
self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='name', before='null', after='"Daffy Duck"').count())
self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='id', before='null', after=Animal.objects.get().id).count())
self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='caretaker', before='null', after='null').count())
self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='zoo', before='null', after='null').count())
self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='deleted', before='null', after='false').count())
def test_model_with_history_creates_changes_on_creation(self):
model_data = {
'name': 'Daffy Duck',
}
response = self.client.post('/animal/', data=json.dumps(model_data), content_type='application/json')
self.assertEqual(response.status_code, 200)
self.assertEqual(1, Changeset.objects.count())
cs = Changeset.objects.get()
self.assertEqual('testuser', cs.user.username)
self.assertAlmostEqual(datetime.now(tz=timezone.utc), cs.date, delta=timedelta(seconds=1))
self.assertEqual(5, Change.objects.count())
self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='name', before='null', after='"Daffy Duck"').count())
self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='id', before='null', after=Animal.objects.get().id).count())
self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='caretaker', before='null', after='null').count())
self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='zoo', before='null', after='null').count())
self.assertEqual(1, Change.objects.filter(changeset=cs, model='Animal', field='deleted', before='null', after='false').count())
def extract_event_info(ctxt, event, mode='good'):
global gbl_channel
my_list=[]
create_field(my_list, "namespaces", event.metadata.namespace)
create_field(my_list, "name", event.metadata.name)
create_field(my_list, "labels", event.metadata.labels)
create_field(my_list, "status.message", str(event.status.container_statuses[0].state))
create_field(my_list, "status.start_time", event.status.start_time)
create_field(my_list, "status.phase", event.status.phase)
c = datetime.now(timezone.utc) - event.status.start_time
create_field(my_list, "Runing since", str(c))
text=[
{
"color" : mode,
"pretext" : "Just from information from k8s" + ctxt ,
"fields" : my_list
}
]
send_slack_msg(gbl_channel, ":k8s:", text)
def convert_between_tz_and_utc(self, tz, utc):
dston = self.dston.replace(tzinfo=tz)
# Because 1:MM on the day DST ends is taken as being standard time,
# there is no spelling in tz for the last hour of daylight time.
# For purposes of the test, the last hour of DST is 0:MM, which is
# taken as being daylight time (and 1:MM is taken as being standard
# time).
dstoff = self.dstoff.replace(tzinfo=tz)
for delta in (timedelta(weeks=13),
DAY,
HOUR,
timedelta(minutes=1),
timedelta(microseconds=1)):
self.checkinside(dston, tz, utc, dston, dstoff)
for during in dston + delta, dstoff - delta:
self.checkinside(during, tz, utc, dston, dstoff)
self.checkoutside(dstoff, tz, utc)
for outside in dston - delta, dstoff + delta:
self.checkoutside(outside, tz, utc)
def test_easy(self):
# Despite the name of this test, the endcases are excruciating.
self.convert_between_tz_and_utc(Eastern, utc_real)
self.convert_between_tz_and_utc(Pacific, utc_real)
self.convert_between_tz_and_utc(Eastern, utc_fake)
self.convert_between_tz_and_utc(Pacific, utc_fake)
# The next is really dancing near the edge. It works because
# Pacific and Eastern are far enough apart that their "problem
# hours" don't overlap.
self.convert_between_tz_and_utc(Eastern, Pacific)
self.convert_between_tz_and_utc(Pacific, Eastern)
# OTOH, these fail! Don't enable them. The difficulty is that
# the edge case tests assume that every hour is representable in
# the "utc" class. This is always true for a fixed-offset tzinfo
# class (lke utc_real and utc_fake), but not for Eastern or Central.
# For these adjacent DST-aware time zones, the range of time offsets
# tested ends up creating hours in the one that aren't representable
# in the other. For the same reason, we would see failures in the
# Eastern vs Pacific tests too if we added 3*HOUR to the list of
# offset deltas in convert_between_tz_and_utc().
#
# self.convert_between_tz_and_utc(Eastern, Central) # can't work
# self.convert_between_tz_and_utc(Central, Eastern) # can't work
def get_volume_from_history(history, candle_size):
"""
Returns volume for given candle_size
:param history: history data
:param candle_size: in minutes
:return: Calculated volume for given candle_size
"""
volume = 0.0
epoch_now = int(time.time())
epoch_candle_start = epoch_now - candle_size * 60
pattern = '%Y-%m-%dT%H:%M:%S'
for item in history:
time_string = item['TimeStamp'].split('.', 1)[0]
dt = datetime.datetime.strptime(time_string, pattern)
item_epoch = dt.replace(tzinfo=timezone.utc).timestamp()
if item_epoch >= epoch_candle_start:
quantity = item['Quantity']
volume += quantity
return volume
def tinkers_construct_file(tinkers_construct) -> addon.File:
"""Tinkers construct file."""
return addon.File(
id=2338518,
mod=tinkers_construct,
name='TConstruct-1.10.2-2.5.6b.jar',
date=datetime(
year=2016, month=10, day=22,
hour=15, minute=11, second=19,
tzinfo=timezone.utc,
),
release=proxy.Release.Release,
url='https://addons.cursecdn.com/files/2338/518/TConstruct-1.10.2-2.5.6b.jar',
dependencies=[74924],
)
def tinkers_update(tinkers_construct) -> addon.File:
"""Update for tinkers_construct_file."""
return addon.File(
id=2353329,
mod=tinkers_construct,
name='TConstruct-1.10.2-2.6.1.jar',
date=datetime(
year=2016, month=12, day=7,
hour=18, minute=35, second=45,
tzinfo=timezone.utc,
),
release=proxy.Release.Release,
url='https://addons.cursecdn.com/files/2353/329/TConstruct-1.10.2-2.6.1.jar',
dependencies=[74924],
)
def mantle_file(mantle) -> addon.File:
"""Mantle (Tinkers dependency) file."""
return addon.File(
id=2366244,
mod=mantle,
name='Mantle-1.10.2-1.1.4.jar',
date=datetime(
year=2017, month=1, day=9,
hour=19, minute=40, second=41,
tzinfo=timezone.utc,
),
release=proxy.Release.Release,
url='https://addons.cursecdn.com/files/2366/244/Mantle-1.10.2-1.1.4.jar',
dependencies=[],
)
def test_file_init():
"""Does the File initialization behaves as expected?"""
m = addon.Mod(id=42, name=str(), summary=str())
addon.File(
id=42, mod=m,
name='test.jar', date=datetime.now(tz=timezone.utc),
release=addon.Release.Release, url='https://httpbin.org',
)
addon.File(
id=43, mod=m,
name='test.jar', date=datetime.now(tz=timezone.utc),
release=addon.Release.Alpha, url='https://httpbin.org',
)
with pytest.raises(TypeError):
addon.File(
id='43', mod=m,
name='test.jar', date=datetime.now(tz=timezone.utc),
release=addon.Release.Beta, url=None,
)
assert len(responses.calls) == 0
def test_calculate_timeout_http_date():
three_minutes_later = datetime.now(tz=timezone.utc) + timedelta(minutes=3)
http_date = '%a, %d %b %Y %H:%M:%S %Z'
assert 179 <= Service.calculate_timeout(
three_minutes_later.strftime(http_date),
) <= 181
def getinfo(run,now):
schedule = run.find_all('tr',attrs={'class':None})
game,runner,console,comment,eta,nextgame,nextrunner,nextconsole,nexteta,nextcomment = '','','','','','','','','',''
for item in schedule:
group = item.find_all('td')
try:
group2 = item.find_next_sibling().find_all('td')
except:
nextgame = False
return (game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment)
st = group[0].getText()
#estfix = timedelta(hours=-5)
starttime = datetime.strptime(st, '%Y-%m-%dT%H:%M:%SZ' )
starttime = starttime.replace(tzinfo=timezone.utc)
#starttime = starttime + estfix
try:
offset = datetime.strptime(group2[0].getText().strip(), "%H:%M:%S")
endtime = starttime + timedelta(hours = offset.hour, minutes = offset.minute, seconds=offset.second)
except:
endtime = datetime(2011,1,1,12,00)
if starttime < now and endtime > now:
game = group[1].getText()
runner = group[2].getText()
#console = group[3].getText()
comment = group2[1].getText()
eta = group2[0].getText().strip()
if starttime > now:
nextgame = group[1].getText()
nextrunner = group[2].getText()
#nextconsole = group[3].getText()
nexteta = group2[0].getText().strip()
nextcomment = group2[1].getText()
break
else:
nextgame = 'done'
nextrunner = 'done'
return (game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment)
def gdq(bot, trigger):
now = datetime.utcnow()
now = now.replace(tzinfo=timezone.utc)
delta = datetime(2018,1,7,16,30,tzinfo=timezone.utc) - now
textdate = "January 7"
url = 'https://gamesdonequick.com/schedule'
try:
x = requests.get(url).content
except:
return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate))
bs = BeautifulSoup(x)
try:
run = bs.find("table",{"id":"runTable"}).tbody
except:
return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate))
try:
gdqstart = datetime.strptime(run.td.getText(), '%Y-%m-%dT%H:%M:%SZ')
gdqstart = gdqstart.replace(tzinfo=timezone.utc)
except:
return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate))
(game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment) = getinfo(run,now)
if not nextgame:
return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate))
if now < gdqstart:
tts = gdqstart - now
if tts.days <= 3:
return bot.say("GDQ is {0}H{1}M away. First game: {2} by {3} ETA: {4} Comment: {5} | https://gamesdonequick.com/schedule".format(int(tts.total_seconds() // 3600),int((tts.total_seconds() % 3600) // 60), nextgame, nextrunner, nexteta, nextcomment))
else:
return bot.say("GDQ is {0} days away ({1}) | https://gamesdonequick.com/schedule".format(tts.days,gdqstart.strftime('%m/%d/%Y')))
if nextgame == 'done':
return bot.say("GDQ is {0} days away ({1} [estimated])".format(delta.days,textdate))
if game:
if comment:
bot.say("Current Game: {0} by {1} ETA: {2} Comment: {3} | Next Game: {4} by {5} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, comment, nextgame, nextrunner))
else:
bot.say("Current Game: {0} by {1} ETA: {2} | Next Game: {3} by {4} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, nextgame, nextrunner))
else:
bot.say("Current Game: setup?? | Next Game {0} by {1} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(nextgame, nextrunner))
def get_processed_at_dict():
return {"processed_at": datetime.now(timezone.utc).isoformat()}
def read(self):
out = {k: None for k in self.FIELDS + self.EXTRA_FIELD}
start = datetime.now()
self.ser.reset_input_buffer()
while not all(out.values()):
line = self.readline()
if (datetime.now() - start).total_seconds() > self.timeout:
break
line = re.sub(r'[\x00-\x1F]|\r|\n|\t|\$', "", line)
cmd = line.split(',')[0]
if cmd not in ['GNGGA', 'GNRMC']:
continue
try:
msg = pynmea2.parse(line)
for key in out:
if hasattr(msg, key):
out[key] = getattr(msg, key)
except pynmea2.ParseError as e:
print("Parse error:", e)
if out['datestamp'] is not None and out['timestamp'] is not None:
timestamp = datetime.combine(out['datestamp'], out['timestamp']).replace(tzinfo=timezone.utc)
out['timestamp'] = timestamp.isoformat()
else:
del out['timestamp']
if out[self.FIELDS[-1]] is not None:
out[self.FIELDS[-1]] *= self.KNOTS_PER_KMPH
if out.get('latitude') is not None and out.get('longitude') is not None:
if out['latitude'] != 0.0 and out['longitude'] != 0.0:
out['pos'] = {
'type': 'Point',
'coordinates': [out['longitude'], out['latitude']]
}
del out['latitude']
del out['longitude']
for f in self.EXTRA_FIELD:
if f in out:
del out[f]
return out
def time_date(msg):
if msg == b'\xff\xff\xff\xff\xff\xff\xff\xff':
return 'err'
return datetime(
year=msg[5] + 1985,
month=msg[3],
day=msg[4] // 4 + 1,
hour=msg[2],
minute=msg[1],
second=msg[0] // 4,
tzinfo=timezone.utc
).isoformat()
def _wrap_payload(payload_key, payload):
now = datetime.now()
timestamp = now.replace(tzinfo=timezone.utc).timestamp()
wrapper = {
'meta': {
'method': 'fernet',
'timestamp': timestamp,
'timezone': 'utc'
},
payload_key: payload
}
return wrapper