def _get_local_timestamp(record):
"""
Get the record's UTC timestamp as an ISO-formatted date / time string.
:param record: The LogRecord.
:type record: StructuredLogRecord
:return: The ISO-formatted date / time string.
:rtype: str
"""
timestamp = datetime.fromtimestamp(
timestamp=record.created,
tz=tzlocal()
)
return timestamp.isoformat(sep=' ')
python类tzlocal()的实例源码
def check_item(item):
current_time = datetime.now(tz.tzlocal())
start = item.get_start_time()
end = item.get_end_time()
if start <= current_time < end:
logging.debug('within the start and end time, %s should be %s',
item.__class__.__name__,
item.get_status_name(1))
item.set_status(1)
else:
logging.debug('outside the start and end time, %s should be %s',
item.__class__.__name__,
item.get_status_name(0))
item.set_status(0)
def testTzAll(self):
from dateutil.tz import tzutc
from dateutil.tz import tzoffset
from dateutil.tz import tzlocal
from dateutil.tz import tzfile
from dateutil.tz import tzrange
from dateutil.tz import tzstr
from dateutil.tz import tzical
from dateutil.tz import gettz
from dateutil.tz import tzwin
from dateutil.tz import tzwinlocal
tz_all = ["tzutc", "tzoffset", "tzlocal", "tzfile", "tzrange",
"tzstr", "tzical", "gettz"]
tz_all += ["tzwin", "tzwinlocal"] if sys.platform.startswith("win") else []
lvars = locals()
for var in tz_all:
self.assertIsNot(lvars[var], None)
def get_file_stat(path):
"""
This is a helper function that given a local path return the size of
the file in bytes and time of last modification.
"""
try:
stats = os.stat(path)
except IOError as e:
raise ValueError('Could not retrieve file stat of "%s": %s' % (
path, e))
try:
update_time = datetime.fromtimestamp(stats.st_mtime, tzlocal())
except (ValueError, OSError, OverflowError):
# Python's fromtimestamp raises value errors when the timestamp is out
# of range of the platform's C localtime() function. This can cause
# issues when syncing from systems with a wide range of valid
# timestamps to systems with a lower range. Some systems support
# 64-bit timestamps, for instance, while others only support 32-bit.
# We don't want to fail in these cases, so instead we pass along none.
update_time = None
return stats.st_size, update_time
def print_objects(object_list):
'''Print name, size, and creation date of objects in list.
Parameters
----------
object_list : list (of boto3 objects)
'''
object_names = objects2names(object_list)
if len(object_names):
maxlen = max(map(len, object_names))
dates = [t.last_modified.astimezone(tzlocal()).strftime('%Y/%m/%d (%H:%M:%S)')\
for t in object_list]
padding = '{0: <%i} {1} {2}M'%(min(maxlen+3, 70))
sizes = [round(t.meta.data['Size']/2.**20,1) for t in object_list]
info = [padding.format(name[-100:],date,size) for name,date,size in zip(object_names, dates, sizes)]
print('\n'.join(info))
# object naming convention
##############################
def calculate_time_offset(latest_date, auto, preferred_offset):
if auto:
preferred_offset = int(datetime.now(tzlocal()).strftime("%z")[0:3])
print("Detected offset: UTC{:+03d}:00".format(preferred_offset))
if 11 >= preferred_offset > 10:
preferred_offset = 10
print("Offset is greater than +10, +10 will be used...")
elif 12 >= preferred_offset > 11:
preferred_offset = -12
print("Offset is greater than +10, -12 will be used...")
himawari_offset = 10 # UTC+10:00 is the time zone that himawari is over
offset = int(preferred_offset - himawari_offset)
offset_tmp = datetime.fromtimestamp(mktime(latest_date)) + timedelta(hours=offset)
offset_time = offset_tmp.timetuple()
return offset_time
def my_local_time():
# METHOD 1: Hardcode zones:
# from_zone = tz.gettz('UTC')
# to_zone = tz.gettz('America/New_York')
# METHOD 2: Auto-detect zones:
from_zone = tz.tzutc()
to_zone = tz.tzlocal()
utc = datetime.utcnow()
# utc = datetime.strptime('2011-01-21 02:37:21', '%Y-%m-%d %H:%M:%S')
# Tell the datetime object that it's in UTC time zone since
# datetime objects are 'naive' by default
utc = utc.replace(tzinfo=from_zone)
# Convert time zone
return utc.astimezone(to_zone)
def local_time(self,ttime,year,month,day):
match = re.search(r'(.{1,2}):(.{2}) ?(.{2})',ttime)
if match:
hour = int(match.group(1))
minute = int(match.group(2))
ampm = match.group(3)
if ampm == "pm":
if hour < 12:
hour = hour + 12
hour = hour % 24
else:
if hour == 12:
hour = 0
london = timezone('Europe/Copenhagen')
utc = timezone('UTC')
utc_dt = datetime.datetime(int(year),int(month),int(day),hour,minute,0,tzinfo=utc)
to_zone = tz.tzlocal()
loc_dt = utc_dt.astimezone(to_zone)
return loc_dt
#ttime = "%02d:%02d" % (loc_dt.hour,loc_dt.minute)
return ttime
def date_as_iso8601(self):
from dateutil.tz import tzlocal
ts = self.meta.get("date", None)
if ts is None: return None
# TODO: Take timezone from config instead of tzlocal()
tz = tzlocal()
ts = ts.astimezone(tz)
offset = tz.utcoffset(ts)
offset_sec = (offset.days * 24 * 3600 + offset.seconds)
offset_hrs = offset_sec // 3600
offset_min = offset_sec % 3600
if offset:
tz_str = '{0:+03d}:{1:02d}'.format(offset_hrs, offset_min // 60)
else:
tz_str = 'Z'
return ts.strftime("%Y-%m-%d %H:%M:%S") + tz_str
def parse_timestamp(value):
"""Parse a timestamp into a datetime object.
Supported formats:
* iso8601
* rfc822
* epoch (value is an integer)
This will return a ``datetime.datetime`` object.
"""
if isinstance(value, (int, float)):
# Possibly an epoch time.
return datetime.datetime.fromtimestamp(value, tzlocal())
else:
try:
return datetime.datetime.fromtimestamp(float(value), tzlocal())
except (TypeError, ValueError):
pass
try:
return dateutil.parser.parse(value)
except (TypeError, ValueError) as e:
raise ValueError('Invalid timestamp "%s": %s' % (value, e))
def apply(self, input_list):
now = datetime.datetime.now(tzlocal())
try:
days_delta = self.config['minimum_age']
except KeyError:
LOGGER.info('Minimum age not set in config, using default 90 days')
days_delta = 90
ago = datetime.timedelta(days=days_delta)
too_young = []
for role in input_list:
if role.create_date > now - ago:
LOGGER.info('Role {name} created too recently to cleanup. ({date})'.format(
name=role.role_name, date=role.create_date))
too_young.append(role)
return too_young
def initialize(self):
super(AppLogger, self).initialize()
config = AppLogger._config
now = datetime.now(tzlocal())
self._accessid = uuid.uuid4().hex[:6]
self._uniqueid = "UNIQID"
self._accesssec = now
self._accesstime = now.strftime(config['datefmt'])
if len(self.handlers) == 0:
if config['filelog']:
self._add_file_handler(config)
stream_handler = logging.StreamHandler()
stream_handler.setLevel(config['verbosity'])
stream_handler.setFormatter(
ColoredFormatter(config['fmt'], config['datefmt']))
self.addHandler(stream_handler)
message = "LOG Start with ACCESSID=[%s] UNIQUEID=[%s] ACCESSTIME=[%s]"
self.info(message % (self._accessid, self._uniqueid, self._accesstime))
def format_event(device, event):
from_zone = tz.tzutc()
to_zone = tz.tzlocal()
utc = event['created_at'].replace(tzinfo=from_zone)
local_time = utc.astimezone(to_zone)
local_time_string = local_time.strftime('%I:%M %p ')
local_time_string += ADDON.getLocalizedString(30300)
local_time_string += local_time.strftime(' %A %b %d %Y')
event_name = ''
if event['kind'] == 'on_demand':
event_name = ADDON.getLocalizedString(30301)
if event['kind'] == 'motion':
event_name = ADDON.getLocalizedString(30302)
if event['kind'] == 'ding':
event_name = ADDON.getLocalizedString(30303)
return ' '.join([device.name, event_name, local_time_string])
def combine_histories_and_messages(histories, smsMessages):
cmdSuccessStyle = 'class="commandSuccess"'
cmdErrorStyle = 'class="commandError"'
basicStyle = ''
from_zone = tz.tzutc()
to_zone = tz.tzlocal()
combinedList = []
for history in histories:
combinedList.append({"action": history.get('state'), "timestamp": history.get('changedAt'), "style": basicStyle})
for smsMessage in smsMessages:
style = cmdSuccessStyle if smsMessage.get('status').lower() == 'processed' else cmdErrorStyle
createdAt = smsMessage.get('createdAt').replace(tzinfo=from_zone)
combinedList.append({"action": smsMessage.get('body').lower(), "timestamp": createdAt.astimezone(to_zone), "style": style})
combinedList.sort(key=lambda c: c.get("timestamp"))
return combinedList
def getPredictionLoop():
# Get eventual glucose from Loop via NS
eventualGlucoseRequest = "api/v1/devicestatus.json"
eventualGlucoseURL = nsURL + eventualGlucoseRequest
eventualGlucoseResponse = urllib.urlopen(eventualGlucoseURL).read().decode('utf-8')
eventualGlucoseData = json.loads(eventualGlucoseResponse)
# I'm unsure how to better accomplish what is happening below; the correct device entry may not be the 0th or 1st entry in the returned array ... need to search for it?
try:
eventualGlucose = eventualGlucoseData[0]["loop"]["predicted"]["values"][-1]
predictionStartTime = dateutil.parser.parse(eventualGlucoseData[0]["loop"]["predicted"]["startDate"])
predictionEndTime = predictionStartTime + datetime.timedelta(minutes=(5*(len(eventualGlucoseData[0]["loop"]["predicted"]["values"])-5)))
except:
eventualGlucose = eventualGlucoseData[1]["loop"]["predicted"]["values"][-1]
predictionStartTime = dateutil.parser.parse(eventualGlucoseData[1]["loop"]["predicted"]["startDate"])
predictionEndTime = predictionStartTime + datetime.timedelta(minutes=(5*(len(eventualGlucoseData[1]["loop"]["predicted"]["values"])-5)))
print("Eventual Glucose (Loop): " + str(eventualGlucose) + " " + glucoseUnit + " at " + predictionEndTime.astimezone(tzlocal()).strftime("%-I:%M:%S %p on %A, %B %d, %Y"))
print(" ... predicted at " + predictionStartTime.astimezone(tzlocal()).strftime("%-I:%M:%S %p on %A, %B %d, %Y"))
return eventualGlucose
def localized_date(date=None, timezone=None):
"""
Returns the given date, localized to the default timezone.
If no date is given, return the localized current date.
timezone should be a valid timezone object obtained via pytz.
"""
if not timezone:
timezone = TZ
if not date:
date = pytz.utc.localize(datetime.utcnow())
if not date.tzinfo:
# Attempt to guezz current server timezone
# Should be ok for any date coming fom a naive call to datetime.now()
date = date.replace(tzinfo=tzlocal())
return date.astimezone(timezone)
def convert_data(self, line, prev_func):
datetime_pattern = '\d+-\d+-\d+ \d+:\d+'
r = re.findall(datetime_pattern, line)
if len(r) == 0:
return None
t = arrow.get(r[0], tzinfo=tz.tzlocal())
day_of_week = t.isoweekday()
hour = int(t.format('HH'))
minute = int(t.format('mm'))
if day_of_week == 6 or day_of_week == 7:
is_holiday = 1
else:
is_holiday = 0
return np.array([day_of_week, hour, minute, prev_func,
is_holiday], dtype=np.int32)
def get_last_commit_before_due_date(self, aname, group):
due_date_str = self.course_config[aname]["deadline"]
due_date_object = datetime.strptime(due_date_str, "%Y-%m-%d %H:%M")
due_date_object_utc = due_date_object.replace(tzinfo=tz.tzlocal()).astimezone(tz.tzutc())
# above: convert due date to utc because commits are in utc and we want to compare them to the due date
repo = self.repos[get_assessment_repo_name(group, self.config, aname)]
get_time_f = lambda commit: datetime.strptime(commit.commit.committer["date"], gh3_time_fmt)
# # hopefully committer is the better choice than author?
latest_commit = max(repo.commits(until=due_date_object_utc), key=get_time_f)
return latest_commit.sha
# note: in the unlikely event of two commits at the same time, i think max takes the first one. which is good because
# i'm guessing the commits are ordered anyway? ok this is not really relevant.
# TODO: fix this to work with multiple student teams
def __get_localtime(self, iso_time, timezone = tz.tzlocal()):
d = self.__get_datetime(iso_time)
try:
d = d.astimezone(timezone) # aware object can be in any timezone
except ValueError: # naive
d = d.replace(tzinfo = timezone)
return d
def __parse_time(self, time_string, current_time, format = '%H:%M:%S'):
time = datetime.strptime(time_string, format).replace(year = current_time.year, month = current_time.month, day = current_time.day, tzinfo = tz.tzlocal())
return self.__get_localtime(time)
def __get_compiled_policy(self, light_policy, current_time = None, enforce_update = False):
if current_time is None:
current_time = datetime.now().replace(tzinfo = tz.tzlocal())
if enforce_update or self.__compiled_policy_date is None or current_time.date() != self.__compiled_policy_date.date():
self.__unregister_all_device_for_monitor()
self.__compiled_policy = self.__compile_policy(light_policy, current_time)
self.__compiled_policy_date = current_time
self.__logger.info('Local policy cache updated: %s', self.__compiled_policy)
return self.__compiled_policy
def calculate_light_brightness(self, current_time = None, light_policy = None):
self.__logger.debug('Calculating light brightness..')
if current_time is None:
current_time = datetime.now().replace(tzinfo = tz.tzlocal())
calculated_light_brigtness = []
compiled_policy = self.__compiled_policy
if light_policy is not None:
compiled_policy = self.__compile_policy(light_policy, current_time)
if compiled_policy is None:
self.__logger.error("No policy is found. Skip light update")
return calculated_light_brigtness
for bulb in compiled_policy:
calculated_light = { "bulb_ip" : bulb["bulb_ip"] }
if 'light_on_only_when_device_online' in bulb and not self.__at_least_one_device_online(bulb["light_on_only_when_device_online"]):
# if required devices are not online, turn off the light
calculated_light["calculated_brightness"] = 0
else:
for policy in bulb['policies']:
brightness = self.__calculate_light_brightness(current_time, policy)
if brightness > -1:
calculated_light["calculated_brightness"] = brightness
calculated_light["policy_matched"] = policy
break
if 'calculated_brightness' in calculated_light:
calculated_light_brigtness.append(calculated_light)
self.__logger.debug('Calculated light brightness: %s', calculated_light_brigtness)
return calculated_light_brigtness
def time_now():
"""
Return the time in the current time zone
"""
return datetime.datetime.now(tzlocal())
def _format_date(self, dt, fmt):
if not dt:
return ''
# Format dates using local timezone
if dt.tzinfo:
return dt.astimezone(tzlocal()).strftime(fmt)
else:
return dt.strftime(fmt)
def get_start_time(self):
"""
Determine the time the door should open.
Returns:
Datetime
"""
# minimum door opening is 8am local time
# if it opens too early the raccoons arrive for breakfast :(
return parser.parse('08:00:00').replace(tzinfo=tz.tzlocal())
def main():
settings = get_settings_data(sysargs.settings)
if sysargs.loglevel:
logging.getLogger().setLevel(getattr(logging, sysargs.loglevel.upper(), logging.WARNING))
api_data = get_api_data(settings,sysargs.date)
# the results look like 2015-12-10T15:47:11+00:00
# so we need to convert from UTC to whatever the local time is
utc = parser.parse(api_data['results']['sunrise'])
sunrise_lcl = utc.astimezone(tz.tzlocal())
utc = parser.parse(api_data['results']['sunset'])
sunset_lcl = utc.astimezone(tz.tzlocal())
# this data is also in api_data['results']['day_length']
daylight_hours = (sunset_lcl - sunrise_lcl).total_seconds() / 60 / 60
if daylight_hours < settings['ideal-day']:
light_lcl = sunrise_lcl - timedelta(hours=(settings['ideal-day']-daylight_hours))
light_lcl = light_lcl.strftime("%H:%M:%S%z")
else:
light_lcl = 0
# write it out to a cache file, basically just makes it so we don't have
# to talk to the sunrise API all the time
data = {}
data['date_local'] = sunrise_lcl.strftime("%Y-%m-%d")
data['sunrise_localtime'] = sunrise_lcl.strftime("%H:%M:%S%z")
data['sunset_localtime'] = sunset_lcl.strftime("%H:%M:%S%z")
data['daylight_hours'] = daylight_hours
data['light_on_localtime'] = light_lcl
writeJSONData(settings,data)
def utcToLocal(dt):
utc = tz.gettz('UTC')
local = tz.tzlocal()
dtUtc = dt.replace(tzinfo=utc)
return dtUtc.astimezone(local)
def from_timestring_to_utctimestamp(self, timestamp, type=None):
# Sigh, TrainPredictions include seconds, BusPredictions don't
if type == 'train':
timestr_format = "%Y%m%d %H:%M:%S"
else:
timestr_format = "%Y%m%d %H:%M"
local_datetime = datetime.strptime(timestamp, timestr_format).replace(tzinfo=tz.tzlocal())
return (local_datetime - datetime(1970, 1, 1, tzinfo=tz.tzutc())).total_seconds()
def get_requested_time_hr(self):
return datetime.fromtimestamp(self.requested_time).replace(tzinfo=tz.tzlocal()).hour
def __init__(self, **kwargs):
self.stop_name = kwargs.get("stpnm")
self.route = kwargs.get("rt")
self.direction = kwargs.get("rtdir")
self.arrival_time = self.from_timestring_to_utctimestamp(kwargs.get("prdtm"), type='bus')
self.distance = int(kwargs.get("dstp", 0))
self.requested_time = kwargs.get('requested_time') # self.from_timestring_to_utctimestamp(kwargs.get("tmstmp"), type='bus')
self.vehicle_id = kwargs.get("vid")
self.eta = self.set_eta()
# def get_requested_time_hour(self):
# self.from_timestring_to_utctimestamp()
# return int(datetime.fromtimestamp(self.requested_time).replace(tzinfo=tz.tzlocal()).hour)