def load_setting_export_timezone(logger, config_settings):
"""
If a user supplied timezone is found in the config settings it will
be used to set the dates in the generated audit report, otherwise
a local timezone will be used.
:param logger: the logger
:param config_settings: config settings loaded from config file
:return: a timezone from config if valid, else local timezone for this machine
"""
try:
timezone = config_settings['export_options']['timezone']
if timezone is None or timezone not in pytz.all_timezones:
timezone = get_localzone()
logger.info('No valid timezone found in config file, defaulting to local timezone')
return str(timezone)
except Exception as ex:
log_critical_error(logger, ex, 'Exception parsing timezone from config file')
timezone = get_localzone()
return str(timezone)
python类get_localzone()的实例源码
def wait_until_stable(self):
"""
Wait until AWS reports the service as "stable".
"""
tz = tzlocal.get_localzone()
self.its_run_start_time = datetime.now(tz)
for i in range(40):
time.sleep(15)
success = self._show_current_status()
if success:
print("\nDeployment successful.\n")
return True
else:
print("\nDeployment unready\n")
print('Deployment failed...')
# waiter = self.ecs.get_waiter('services_stable')
# waiter.wait(cluster=self.clusterName, services=[self.serviceName])
return False
def from_str(cls, time):
''' takes a string like 1h, 2m, and returns a time_period object '''
# let's compute some sane things from these
time_period = cls()
if time == 'today':
time_period.hours = int(dt.now(tzlocal.get_localzone()).hour)
return time_period
if time.lower().endswith(('h', 'm')) and time[:-1].isdigit():
qualifier, time = time[-1].lower(), int(time[:-1])
if qualifier == 'h':
time_period.time_str = 'Hour' if time == 1 else 'Hours'
time_period.hours = time
if qualifier == 'm':
time_period.time_str = 'Minutes'
time_period.minutes = time if time >= 5 else 5
return time_period
def __init__(self, test_id, status, message, timestamp=None):
"""
Parametrized constructor for the TestProgress model
:param test_id: The value of the 'test_id' field of TestProgress model
:type test_id: int
:param status: The value of the 'status' field of TestProgress model
:type status: TestStatus
:param message: The value of the 'message' field of TestProgress model
:type message: str
:param timestamp: The value of the 'timestamp' field of TestProgress
model (None by default)
:type timestamp: datetime
"""
self.test_id = test_id
self.status = status
tz = get_localzone()
if timestamp is None:
timestamp = tz.localize(datetime.datetime.now(), is_dst=None)
timestamp = timestamp.astimezone(pytz.UTC)
if timestamp.tzinfo is None:
timestamp = pytz.utc.localize(timestamp, is_dst=None)
self.timestamp = timestamp
self.message = message
def __init__(self, loop, base_url,
username=None, password=None, verify_ssl=True,
tz=None, logger=None):
if logger is None: # pragma: no coverage
logger = mylogger
self._client = RESTfm(loop, logger)
self._client.verify_ssl = verify_ssl
self._client.base_url = base_url
self._closed = False
if isinstance(tz, (str,)):
self._client.timezone = timezone(tz) # pragma: no coverage
elif tz is None: # pragma: no coverage
self._client.timezone = get_localzone()
else:
self._client.timezone = tz
if username is not None and password is not None:
self._client.basic_auth(username, password) # pragma: no cover
def setUp(self):
super(TestCase, self).setUp()
# We use Yakutsk, Russia timezone to check if they convert right
# with the local one
self.other_timezone = pytz.timezone('Asia/Yakutsk')
if get_localzone() == self.other_timezone:
self.other_timezone = pytz.timezone('Europe/Vienna')
self.client = Client(None, 'http://localhost').rest_client
self.client.timezone = self.other_timezone
self.a_date = datetime.datetime(
1986, 3, 6, 10, 28, 47,
tzinfo=pytz.UTC,
).astimezone(pytz.timezone('Europe/Vienna'))\
.astimezone(get_localzone())
self.a_uuid = uuid.uuid1()
def date_to_datetime(time_input, tz=None):
""" Convert ISO 8601 and other date strings to datetime.datetime type.
Args:
time_input (string): The time input string (see formats above).
tz (string): The time zone for the returned data.
Returns:
(datetime.datetime): Python datetime.datetime object.
"""
dt = None
if tz is not None:
tz = timezone(tz)
try:
dt = parser.parse(time_input)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone(get_localzone().zone))
if tz is not None:
dt = dt.astimezone(tz)
except ValueError:
pass
return dt
def parse_journal(data):
""""
Parse systemd journal entries.
We do this on the agent rather than the manager because:
* It allows us to distribute this bit of load
* It localizes the means of log acquisition entirely to the agent,
the manager never has any idea that the journal is in use or what
forwarding protocol is being used.
"""
utc_dt = get_localzone().localize(data['__REALTIME_TIMESTAMP'], is_dst=None).astimezone(pytz.utc)
return {
'datetime': datetime.datetime.isoformat(utc_dt),
'severity': data['PRIORITY'],
'facility': data['SYSLOG_FACILITY'],
'source': data.get('SYSLOG_IDENTIFIER', "unknown"),
'message': data['MESSAGE']
}
def parse_date_local(date, milliseconds=True):
"""Parses dates from ISO8601 or Epoch formats to a standard datetime object
in the current local timezone.
**Note that this function should not be used in time calculations.**
**It is primarily intended for displaying dates and times to the user.**
Args:
date (str): A date string in either iso8601 or Epoch format.
milliseconds (bool): If True, then epoch times are treated as
millisecond values, otherwise they are evaluated as seconds.
Returns:
datetime: The parsed date time in local time.
"""
return parse_date_utc(date, milliseconds).astimezone(get_localzone())
def parse_date_local(date, milliseconds=True):
"""Parses dates from ISO8601 or Epoch formats to a standard datetime object
in the current local timezone.
**Note that this function should not be used in time calculations.**
**It is primarily intended for displaying dates and times to the user.**
Args:
date (str): A date string in either iso8601 or Epoch format.
milliseconds (bool): If True, then epoch times are treated as
millisecond values, otherwise they are evaluated as seconds.
Returns:
datetime: The parsed date time in local time.
"""
return parse_date_utc(date, milliseconds).astimezone(get_localzone())
def refresh_client(self, from_dt=None, to_dt=None):
"""
Refreshes the CalendarService endpoint, ensuring that the
event data is up-to-date. If no 'from_dt' or 'to_dt' datetimes
have been given, the range becomes this month.
"""
today = datetime.today()
first_day, last_day = monthrange(today.year, today.month)
if not from_dt:
from_dt = datetime(today.year, today.month, first_day)
if not to_dt:
to_dt = datetime(today.year, today.month, last_day)
params = dict(self.params)
params.update({
'lang': 'en-us',
'usertz': get_localzone().zone,
'startDate': from_dt.strftime('%Y-%m-%d'),
'endDate': to_dt.strftime('%Y-%m-%d')
})
req = self.session.get(self._calendar_refresh_url, params=params)
self.response = req.json()
def __init__(self, project_name, cache_instance):
"""
Initialize a ProjectStats class for the specified project.
:param project_name: project name to calculate stats for
:type project_name: str
:param cache_instance: DataCache instance
:type cache_instance: :py:class:`~.DiskDataCache`
"""
logger.debug('Initializing ProjectStats for project: %s', project_name)
self.project_name = project_name
self.cache = cache_instance
self.cache_data = {}
self.cache_dates = self._get_cache_dates()
self.as_of_timestamp = self._cache_get(
self.cache_dates[-1])['cache_metadata']['data_ts']
self.as_of_datetime = datetime.fromtimestamp(
self.as_of_timestamp, utc).astimezone(get_localzone())
def dispatch_history(self, request):
vault_id = request.match_info['id']
vault = self.find_vault_by_id(vault_id)
local_tz = get_localzone()
queue = yield from vault.backend.changes(None, None, verbose=True)
log_items = []
while True:
item = yield from queue.get()
if item is None:
break
store_hash, metadata, server_info = item
bundle = VirtualBundle(None, vault, store_hash=store_hash)
yield from bundle.write_encrypted_metadata(Once(metadata))
rev_id = server_info['id'].decode(vault.config.encoding)
created_at = server_info['created_at'].decode(vault.config.encoding)
operation = server_info['operation'].decode(vault.config.encoding)
user_email = server_info['email'].decode(vault.config.encoding)
log_items.append({
'operation': operation,
'user_email': user_email,
'created_at': created_at,
'path': bundle.relpath
})
return JSONResponse({'items': log_items})
def main(args):
tz = None
if args.stamp is not None:
dt = datetime.datetime.utcfromtimestamp(args.stamp)
tz = pytz.utc
else:
dt = datetime.datetime.now()
if args.delta is not None:
dt = dt + datetime.timedelta(days=args.delta)
if args.localize:
tz = pytz.timezone(args.localize)
elif tz is None:
tz = tzlocal.get_localzone()
show(tz.localize(dt), show_zones=args.zones)
def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None,
end_date=None, timezone=None):
self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes,
seconds=seconds)
self.interval_length = timedelta_seconds(self.interval)
if self.interval_length == 0:
self.interval = timedelta(seconds=1)
self.interval_length = 1
if timezone:
self.timezone = astimezone(timezone)
elif start_date and start_date.tzinfo:
self.timezone = start_date.tzinfo
elif end_date and end_date.tzinfo:
self.timezone = end_date.tzinfo
else:
self.timezone = get_localzone()
start_date = start_date or (datetime.now(self.timezone) + self.interval)
self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None,
end_date=None, timezone=None):
self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes,
seconds=seconds)
self.interval_length = timedelta_seconds(self.interval)
if self.interval_length == 0:
self.interval = timedelta(seconds=1)
self.interval_length = 1
if timezone:
self.timezone = astimezone(timezone)
elif start_date and start_date.tzinfo:
self.timezone = start_date.tzinfo
elif end_date and end_date.tzinfo:
self.timezone = end_date.tzinfo
else:
self.timezone = get_localzone()
start_date = start_date or (datetime.now(self.timezone) + self.interval)
self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
def timestamp_to_datetime(input_timestamp, tz=None):
"""
Converts epoch timestamp into datetime object.
:param input_timestamp: Epoch timestamp. Microsecond or millisecond inputs accepted.
:type input_timestamp: long
:param tz: String representation of timezone accepted by pytz. eg. 'Asia/Hong_Kong'. If param is unfilled, system timezone is used.
:return: timezone aware datetime object
"""
input_timestamp = long(input_timestamp)
if tz is None:
tz = get_localzone()
else:
tz = timezone(tz)
# if timestamp granularity is microseconds
try:
return_value = datetime.fromtimestamp(input_timestamp, tz=tz)
except ValueError:
input_timestamp = float(input_timestamp)/1000
return_value = datetime.fromtimestamp(input_timestamp, tz=tz)
return return_value
def _get_session_tz(self):
""" Get the session timezone or use the local computer's timezone. """
try:
tz = self.get_parameter(u'TIMEZONE')
if not tz:
tz = 'UTC'
return pytz.timezone(tz)
except pytz.exceptions.UnknownTimeZoneError:
logger.warning('converting to tzinfo failed')
if tzlocal is not None:
return tzlocal.get_localzone()
else:
try:
return datetime.timezone.utc
except AttributeError: # py2k
return pytz.timezone('UTC')
def process_shakemap(shake_id=None):
"""Process a given shake_id for realtime shake"""
LOGGER.info('Inotify received new shakemap')
tz = get_localzone()
notify_realtime_rest(datetime.datetime.now(tz=tz))
done = False
while not done:
try:
done = process_event(
working_dir=working_dir,
event_id=shake_id,
locale=locale_option)
except Exception as e: # pylint: disable=W0702
LOGGER.info('Process event failed')
LOGGER.exception(e)
LOGGER.info('Retrying to process event')
LOGGER.info('Shakemap %s handled' % (shake_id, ))
def __init__(self, weeks=0, days=0, hours=0, minutes=0, seconds=0, start_date=None,
end_date=None, timezone=None):
self.interval = timedelta(weeks=weeks, days=days, hours=hours, minutes=minutes,
seconds=seconds)
self.interval_length = timedelta_seconds(self.interval)
if self.interval_length == 0:
self.interval = timedelta(seconds=1)
self.interval_length = 1
if timezone:
self.timezone = astimezone(timezone)
elif start_date and start_date.tzinfo:
self.timezone = start_date.tzinfo
elif end_date and end_date.tzinfo:
self.timezone = end_date.tzinfo
else:
self.timezone = get_localzone()
start_date = start_date or (datetime.now(self.timezone) + self.interval)
self.start_date = convert_to_datetime(start_date, self.timezone, 'start_date')
self.end_date = convert_to_datetime(end_date, self.timezone, 'end_date')
tcex_utils.py 文件源码
项目:threatconnect-developer-docs
作者: ThreatConnect-Inc
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def date_to_datetime(time_input, tz=None):
""" Convert ISO 8601 and other date strings to datetime.datetime type.
Args:
time_input (string): The time input string (see formats above).
tz (string): The time zone for the returned data.
Returns:
(datetime.datetime): Python datetime.datetime object.
"""
dt = None
if tz is not None:
tz = timezone(tz)
try:
dt = parser.parse(time_input)
if dt.tzinfo is None:
dt = dt.replace(tzinfo=timezone(get_localzone().zone))
if tz is not None:
dt = dt.astimezone(tz)
except ValueError:
pass
return dt
def convert_to_local_from_utc(utc_time):
local_timezone = tzlocal.get_localzone()
local_time = utc_time.replace(tzinfo=pytz.utc).astimezone(local_timezone)
return local_time
test_exporter_settings.py 文件源码
项目:safetyculture-sdk-python
作者: SafetyCulture
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def test_use_local_timezone_if_none_given(self):
config_settings = [{}, None, '']
for config_setting in config_settings:
message = '{0} should cause ' + str(get_localzone()) + ' to be returned'.format(str(config_setting))
self.assertEqual(exp.load_setting_export_timezone(logger, config_setting), (str(get_localzone())), msg=message)
def iso8601_as_datetime(iso,
localize=False # Default into local time zone
):
try:
parsed = isodate.parse_datetime(iso)
if localize and parsed.tzinfo is None:
parsed = get_localzone().localize(parsed)
return parsed
except isodate.isoerror.ISO8601Error as ex:
raise ValueError("Invalid ISO8601 date")
# TODO: This function exists in datetime as .isoformat()
def converter(self, timestamp):
"""Convert date to local timezone"""
ltz = get_localzone()
converted = arrow.get(
timestamp,
tz=ltz).to('UTC')
return converted.datetime.timetuple()
def modtime():
""" Get the current local time as a string in iso format """
if six.PY2:
local_tz = get_localzone()
now = datetime.utcnow().replace(tzinfo=pytz.utc).astimezone(local_tz)
else:
now = datetime.now(timezone.utc).astimezone()
nowstr = now.replace(microsecond=0).isoformat()
return nowstr
def TimeEnd(self, timeend):
if timeend is None:
self._timeend = datetime.now(tz=pytz.utc)
elif not isinstance(timeend, datetime):
raise TypeError("req.TimeEnd must be a datetime.datetime object.")
else:
# Always use timezone-aware datetime.
if timeend.tzinfo is None:
_logger.warning('Naive HistDataReq.TimeEnd. '
'Assumeing system local time zone.')
tz_system = get_localzone()
timeend = tz_system.localize(timeend)
self._timeend = timeend
def startScheduler():
db.create_all()
#create default roles!
if not db.session.query(models.Role).filter(models.Role.name == "admin").first():
admin_role = models.Role(name='admin', description='Administrator Role')
user_role = models.Role(name='user', description='User Role')
db.session.add(admin_role)
db.session.add(user_role)
db.session.commit()
try:
import tzlocal
tz = tzlocal.get_localzone()
logger.info("local timezone: %s" % tz)
except:
tz = None
if not tz or tz.zone == "local":
logger.error('Local timezone name could not be determined. Scheduler will display times in UTC for any log'
'messages. To resolve this set up /etc/timezone with correct time zone name.')
tz = pytz.utc
#in debug mode this is executed twice :(
#DONT run flask in auto reload mode when testing this!
scheduler = BackgroundScheduler(logger=sched_logger, timezone=tz)
scheduler.add_job(notify.task, 'interval', seconds=config.SCAN_INTERVAL, max_instances=1,
start_date=datetime.datetime.now(tz) + datetime.timedelta(seconds=2))
scheduler.start()
sched = scheduler
#notify.task()
def get_next_day_cutoff_seconds(hour):
"Return the next cutoff for a particular hour as seconds since epoch."
local_tz = tzlocal.get_localzone()
dt_cutoff = local_tz.localize(datetime.now().replace(minute=0, second=0, microsecond=0, hour=hour))
# TODO doesn't actually work. if it is 1 am then it doens't chose 4 am of the same day.
# dt_cutoff += timedelta(days=1)
epoch = datetime(1970, 1, 1, tzinfo=pytz.utc)
return (dt_cutoff - epoch).total_seconds()
def localDatetime( datetime_or_timestamp ):
if type(datetime_or_timestamp) in (int, float):
dt = utcDatetime( datetime_or_timestamp )
else:
dt = datetime_or_timestamp
local_timezone = tzlocal.get_localzone()
local_dt = dt.astimezone( local_timezone )
return local_dt