def validateLogDate(self, lines):
delta = timedelta(hours=2)
for line in lines:
if not line:
continue
elif line.startswith("# Time: "):
log_time = datetime.strptime(line[8:], "%y%m%d %H:%M:%S")
log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
log_time = log_time.replace(tzinfo=None)
print(self._now, log_time)
print("diff :", self._now - log_time)
if (self._now - log_time) > delta:
return False
else:
return True
return True
python类tzutc()的实例源码
def validate_log_date(self, line):
delta = timedelta(hours=2)
m = REG_GENERAL_ERR.match(line)
if m:
log_time = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S")
log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
log_time = log_time.replace(tzinfo=None)
if (self._now - log_time) > delta:
return False
elif BEGIN_DEADLOCK in line:
m = REG_DEADLOCK.match(line)
log_time = datetime.strptime(m.group(1), "%Y-%m-%d %H:%M:%S")
log_time = log_time.replace(tzinfo=tz.tzutc()).astimezone(zoneinfo.gettz(self._GENERAL_CONFIG["TIMEZONE"]))
log_time = log_time.replace(tzinfo=None)
if (self._now - log_time) > delta:
return False
return True
def test_recurrence(self):
# Ensure date valued UNTILs in rrules are in a reasonable timezone,
# and include that day (12/28 in this test)
test_file = get_test_file("recurrence.ics")
cal = base.readOne(test_file, findBegin=False)
dates = list(cal.vevent.getrruleset())
self.assertEqual(
dates[0],
datetime.datetime(2006, 1, 26, 23, 0, tzinfo=tzutc())
)
self.assertEqual(
dates[1],
datetime.datetime(2006, 2, 23, 23, 0, tzinfo=tzutc())
)
self.assertEqual(
dates[-1],
datetime.datetime(2006, 12, 28, 23, 0, tzinfo=tzutc())
)
def datetime2timestamp(dt, default_timezone=None):
"""Calculate the timestamp based on the given datetime instance.
:type dt: datetime
:param dt: A datetime object to be converted into timestamp
:type default_timezone: tzinfo
:param default_timezone: If it is provided as None, we treat it as tzutc().
But it is only used when dt is a naive datetime.
:returns: The timestamp
"""
epoch = datetime.datetime(1970, 1, 1)
if dt.tzinfo is None:
if default_timezone is None:
default_timezone = tzutc()
dt = dt.replace(tzinfo=default_timezone)
d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
if hasattr(d, "total_seconds"):
return d.total_seconds() # Works in Python 2.7+
return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
def test_import_anno_ok_2(wa_image):
catcha = wa_image
now = datetime.now(tz.tzutc())
# import first because CRUD.create changes created time in input
catcha['id'] = 'naomi-xx-imported'
resp = CRUD.import_annos([catcha])
x2 = Anno._default_manager.get(pk=catcha['id'])
assert x2 is not None
assert Anno._default_manager.count() == 1
# x2 was created more in the past? import preserves created date?
delta = timedelta(hours=25)
assert x2.created < (now - delta)
# about to create
catcha['id'] = 'naomi-xx'
x1 = CRUD.create_anno(catcha)
assert x1 is not None
assert Anno._default_manager.count() == 2
# x1 was created less than 1m ago?
delta = timedelta(minutes=1)
assert (now - delta) < x1.created
def test_import_anno_ok(wa_image):
catcha = wa_image
catcha_reply = make_wa_object(
age_in_hours=1, reply_to=catcha['id'])
now = datetime.now(tz.tzutc())
resp = CRUD.import_annos([catcha, catcha_reply])
x2 = Anno._default_manager.get(pk=catcha['id'])
assert x2 is not None
# preserve replies?
assert x2.total_replies == 1
assert x2.replies[0].anno_id == catcha_reply['id']
# import preserve created date?
delta = timedelta(hours=25)
assert x2.created < (now - delta)
def test_deleted_entry(self):
deleted_entry = DeletedEntry({
"sys": {
"space": {
"sys": {
"type": "Link",
"linkType": "Space",
"id": "cfexampleapi"
}
},
"id": "5c6VY0gWg0gwaIeYkUUiqG",
"type": "DeletedEntry",
"createdAt": "2013-09-09T16:17:12.600Z",
"updatedAt": "2013-09-09T16:17:12.600Z",
"deletedAt": "2013-09-09T16:17:12.600Z",
"revision": 1
}
})
self.assertEqual(deleted_entry.id, '5c6VY0gWg0gwaIeYkUUiqG')
self.assertEqual(deleted_entry.deleted_at, datetime.datetime(2013, 9, 9, 16, 17, 12, 600000, tzinfo=tzutc()))
self.assertEqual(str(deleted_entry), "<DeletedEntry id='5c6VY0gWg0gwaIeYkUUiqG'>")
def test_deleted_asset(self):
deleted_asset = DeletedAsset({
"sys": {
"space": {
"sys": {
"type": "Link",
"linkType": "Space",
"id": "cfexampleapi"
}
},
"id": "5c6VY0gWg0gwaIeYkUUiqG",
"type": "DeletedAsset",
"createdAt": "2013-09-09T16:17:12.600Z",
"updatedAt": "2013-09-09T16:17:12.600Z",
"deletedAt": "2013-09-09T16:17:12.600Z",
"revision": 1
}
})
self.assertEqual(deleted_asset.id, '5c6VY0gWg0gwaIeYkUUiqG')
self.assertEqual(deleted_asset.deleted_at, datetime.datetime(2013, 9, 9, 16, 17, 12, 600000, tzinfo=tzutc()))
self.assertEqual(str(deleted_asset), "<DeletedAsset id='5c6VY0gWg0gwaIeYkUUiqG'>")
def datetime2timestamp(dt, default_timezone=None):
"""Calculate the timestamp based on the given datetime instance.
:type dt: datetime
:param dt: A datetime object to be converted into timestamp
:type default_timezone: tzinfo
:param default_timezone: If it is provided as None, we treat it as tzutc().
But it is only used when dt is a naive datetime.
:returns: The timestamp
"""
epoch = datetime.datetime(1970, 1, 1)
if dt.tzinfo is None:
if default_timezone is None:
default_timezone = tzutc()
dt = dt.replace(tzinfo=default_timezone)
d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
if hasattr(d, "total_seconds"):
return d.total_seconds() # Works in Python 2.7+
return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
def _build_datetime(parts):
timestamp = parts.get('timestamp')
if timestamp:
tz_utc = tz.tzutc()
return datetime.fromtimestamp(timestamp, tz=tz_utc)
am_pm = parts.get('am_pm')
hour = parts.get('hour', 0)
if am_pm == 'pm' and hour < 12:
hour += 12
elif am_pm == 'am' and hour == 12:
hour = 0
return datetime(year=parts.get('year', 1), month=parts.get('month', 1),
day=parts.get('day', 1), hour=hour, minute=parts.get('minute', 0),
second=parts.get('second', 0), microsecond=parts.get('microsecond', 0),
tzinfo=parts.get('tzinfo'))
def format_event(device, event):
from_zone = tz.tzutc()
to_zone = tz.tzlocal()
utc = event['created_at'].replace(tzinfo=from_zone)
local_time = utc.astimezone(to_zone)
local_time_string = local_time.strftime('%I:%M %p ')
local_time_string += ADDON.getLocalizedString(30300)
local_time_string += local_time.strftime(' %A %b %d %Y')
event_name = ''
if event['kind'] == 'on_demand':
event_name = ADDON.getLocalizedString(30301)
if event['kind'] == 'motion':
event_name = ADDON.getLocalizedString(30302)
if event['kind'] == 'ding':
event_name = ADDON.getLocalizedString(30303)
return ' '.join([device.name, event_name, local_time_string])
def _wait_for_completion(self):
"""
Waits for a stack operation to finish. Prints CloudFormation events
while it waits.
:returns: The final stack status.
:rtype: sceptre.stack_status.StackStatus
"""
status = StackStatus.IN_PROGRESS
self.most_recent_event_datetime = (
datetime.now(tzutc()) - timedelta(seconds=3)
)
while status == StackStatus.IN_PROGRESS:
status = self._get_simplified_status(self.get_status())
self._log_new_events()
time.sleep(4)
return status
def datetime2timestamp(dt, default_timezone=None):
"""Calculate the timestamp based on the given datetime instance.
:type dt: datetime
:param dt: A datetime object to be converted into timestamp
:type default_timezone: tzinfo
:param default_timezone: If it is provided as None, we treat it as tzutc().
But it is only used when dt is a naive datetime.
:returns: The timestamp
"""
epoch = datetime.datetime(1970, 1, 1)
if dt.tzinfo is None:
if default_timezone is None:
default_timezone = tzutc()
dt = dt.replace(tzinfo=default_timezone)
d = dt.replace(tzinfo=None) - dt.utcoffset() - epoch
if hasattr(d, "total_seconds"):
return d.total_seconds() # Works in Python 2.7+
return (d.microseconds + (d.seconds + d.days * 24 * 3600) * 10**6) / 10**6
def combine_histories_and_messages(histories, smsMessages):
cmdSuccessStyle = 'class="commandSuccess"'
cmdErrorStyle = 'class="commandError"'
basicStyle = ''
from_zone = tz.tzutc()
to_zone = tz.tzlocal()
combinedList = []
for history in histories:
combinedList.append({"action": history.get('state'), "timestamp": history.get('changedAt'), "style": basicStyle})
for smsMessage in smsMessages:
style = cmdSuccessStyle if smsMessage.get('status').lower() == 'processed' else cmdErrorStyle
createdAt = smsMessage.get('createdAt').replace(tzinfo=from_zone)
combinedList.append({"action": smsMessage.get('body').lower(), "timestamp": createdAt.astimezone(to_zone), "style": style})
combinedList.sort(key=lambda c: c.get("timestamp"))
return combinedList
def time_params_to_dates(args):
now = datetime.now(tz=tzutc())
if args['--hours'] or args['--days']:
if args['--days']:
hours = int(args['--days']) * 24
else:
hours = int(args['--hours'])
until_date = now
from_date = until_date - timedelta(hours=hours)
else:
if args['--from']:
from_date = parse_date(args['--from'])
if args['--until']:
until_date = parse_date(args['--until'])
else:
until_date = now
else:
until_date = now
from_date = until_date - timedelta(days=1)
return from_date, until_date
def __init__(self, token):
"""
Container for storing Windows Live Refresh Token.
Subclass of :class:`Token`
WARNING: Only invoke when creating a FRESH token!
Don't use to convert saved token into object
Refresh Token usually has a lifetime of 14 days
Args:
token (str): The JWT Refresh-Token
"""
date_issued = datetime.now(tzutc())
date_valid = date_issued + timedelta(days=14)
super(RefreshToken, self).__init__(token, date_issued, date_valid)
def test_scratchbuild(self):
"""
CreateCalendar 2.0 format from scratch
"""
test_cal = get_test_file("simple_2_0_test.ics")
cal = base.newFromBehavior('vcalendar', '2.0')
cal.add('vevent')
cal.vevent.add('dtstart').value = datetime.datetime(2006, 5, 9)
cal.vevent.add('description').value = "Test event"
cal.vevent.add('created').value = \
datetime.datetime(2006, 1, 1, 10,
tzinfo=dateutil.tz.tzical(
"test_files/timezones.ics").get('US/Pacific'))
cal.vevent.add('uid').value = "Not very random UID"
cal.vevent.add('dtstamp').value = datetime.datetime(2017, 6, 26, 0, tzinfo=tzutc())
# Note we're normalizing line endings, because no one got time for that.
self.assertEqual(
cal.serialize().replace('\r\n', '\n'),
test_cal.replace('\r\n', '\n')
)
def test_recurrence(self):
"""
Ensure date valued UNTILs in rrules are in a reasonable timezone,
and include that day (12/28 in this test)
"""
test_file = get_test_file("recurrence.ics")
cal = base.readOne(test_file)
dates = list(cal.vevent.getrruleset())
self.assertEqual(
dates[0],
datetime.datetime(2006, 1, 26, 23, 0, tzinfo=tzutc())
)
self.assertEqual(
dates[1],
datetime.datetime(2006, 2, 23, 23, 0, tzinfo=tzutc())
)
self.assertEqual(
dates[-1],
datetime.datetime(2006, 12, 28, 23, 0, tzinfo=tzutc())
)
def cleanup_instances(self):
"""Cleanup Instances.
Delete instances if they are in an error state or are over the
age defined in INSTANCE_AGE_LIMIT.
Instances that don't match PROTECTED_PREFIX are cleaned up
"""
current_time = datetime.datetime.now(tzutc())
max_age = datetime.timedelta(hours=self.age_limit)
for server in self.unprotected_servers:
created_time = dateutil.parser.parse(server.created_at)
age = current_time - created_time
errored = server.status == "ERROR"
if errored or age > max_age:
_indp("Deleting {name} Errored: {error} Age: {age}".format(
name=server.name, error=errored, age=age))
self.conn.compute.delete_server(server.id)