def localZone(self):
from tzlocal import get_localzone
self.oop.scr.delete('1.0', tk.END)
self.oop.scr.insert(tk.INSERT, get_localzone())
# Format local US time with TimeZone info
python类get_localzone()的实例源码
Callbacks_Refactored.py 文件源码
项目:Python-GUI-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
GUI.py 文件源码
项目:Python-GUI-Programming-Cookbook-Second-Edition
作者: PacktPublishing
项目源码
文件源码
阅读 17
收藏 0
点赞 0
评论 0
def localZone(self):
from tzlocal import get_localzone
self.scr.insert(tk.INSERT, get_localzone())
# Format local US time with TimeZone info
def show_consumer_group():
client, option, project, logstore, consumer_group = prepare_test()
local_timezone = tzlocal.get_localzone()
try:
while True:
ret = client.get_check_point_fixed(project, logstore, consumer_group)
with lock:
# ret.log_print()
print("***** consumer group status*****")
if not ret.consumer_group_check_poins:
print("[]")
else:
print("***consumer\t\t\tshard\tcursor time\t\t\t\t\tupdate time")
for status in ret.consumer_group_check_poins:
update_time = datetime.fromtimestamp(status['updateTime']/1000000, local_timezone)
cursor_time = datetime.fromtimestamp(status.get('checkpoint_previous_cursor_time', 0),
local_timezone)
print("{0}\t{1}\t\t{2}\t{3}".format(status["consumer"], status['shard'],
cursor_time, update_time))
time.sleep(1)
except KeyboardInterrupt:
print("***** exit *****")
def localzone(cls):
try:
tz = tzlocal.get_localzone()
except pytz.exceptions.UnknownTimeZoneError:
raise UnknownTimeZone("Failed to guess local timezone")
return cls.from_pytz(tz)
def do_download_db(database_name=None, master_pwd=None, base_url=None,
backup_format='zip', filename=None, api='9.0',
hostname=None):
if base_url is None:
base_url = 'https://%s.odoo.apertoso.net' % database_name
if base_url.endswith('/'):
base_url = base_url.strip('/')
if filename is None:
ts = datetime.datetime.now(get_localzone()).strftime(
'%Y%m%d-%H%M%S-%Z')
filename = "%s_%s.%s" % (database_name, ts, backup_format)
if api.startswith('9.0'):
return do_download_db_v9(
database_name=database_name,
master_pwd=master_pwd, base_url=base_url,
backup_format=backup_format,
filename=filename)
elif api == '8.0':
return do_download_db_v8(
database_name=database_name,
master_pwd=master_pwd, base_url=base_url,
backup_format=backup_format,
filename=filename)
elif api == 'ssh':
server_name = hostname or \
'openerp.production.{}.clients.apertoso.net'.format(database_name)
return do_download_db_ssh(database_name=database_name,
server=server_name,
filename=filename)
else:
raise NotImplementedError("No support for api %s" % api)
def get_display_timezone():
''' Returns a pytz timezone for the display_timezone setting in the
configuration file or UTC if not specified.
:rtype: timezone
'''
timezone_name = config.get('ckan.display_timezone') or 'utc'
if timezone_name == 'server':
return tzlocal.get_localzone()
return pytz.timezone(timezone_name)
def test_server_timezone(self):
eq_(h.get_display_timezone(), tzlocal.get_localzone())
def print_status(i, status):
row = {}
row[u'index'] = i
row[u'created'] = datetime.strptime(status['created_at'], "%a %b %d %H:%M:%S +0000 %Y").replace(tzinfo=pytz.utc).astimezone(tzlocal.get_localzone()).strftime("%Y-%m-%d %H:%M:%S")
row[u'id'] = status['id']
row[u'screen_name'] = status['user']['screen_name']
row[u'user_id'] = status['user']['id']
row[u'text'] = status['text']
row[u'photo'] = status['photo']['largeurl'] if status.get('photo') else ""
print(u"[%(index)s]%(created)s(%(id)s)%(screen_name)s(%(user_id)s): %(text)s %(photo)s" % row)
def ask_user_input():
global restoreparams, exitvalue
is_safe = ui.ask_yn("Is this system isolated with no access to production database storage")
if is_safe != "Y":
print "Exiting. Please execute this script in an isolated environment."
exitvalue = 1
return
restoreparams['mountpath'] = ui.ask_directory("Directory where to mount clone:", False)
restoreparams['timepoint'] = ui.ask_timestamp("Restore database to time point")
is_utc = ui.ask_yn("Was the timestamp in UTC (answer N for local time)")
if is_utc == "Y":
tz = pytz.utc
else:
tz = get_localzone()
restoreparams['timepoint'] = tz.localize(restoreparams['timepoint'])
restore.set_restore_target_time(restoreparams['timepoint'])
restoreparams['sid'] = ui.ask_string("Target instance name:", 8, True)
#
splitter = "######################################"
print splitter
print ""
print "Database unique name: %s" % configname
print "Oracle home: %s" % Configuration.get("oraclehome", "generic")
print "Clone mount path: %s" % restoreparams['mountpath']
print "Target instance SID: %s" % restoreparams['sid']
print "Restore target time UTC: %s" % restoreparams['timepoint'].astimezone(pytz.utc)
print "Restore target time local: %s" % restoreparams['timepoint'].astimezone(get_localzone())
print "Restored from snapshot: %s" % restore.sourcesnapid
#
print ""
is_ok = ui.ask_yn("Are these parameters correct")
if is_ok != "Y":
print "Exiting. Please execute this script again."
exitvalue = 1
return
print ""
print splitter
def _set_parameters(self):
dbconfig = SafeConfigParser()
dbconfig.read(os.path.join(self._mountdest, 'autorestore.cfg'))
self._dbparams['dbname'] = dbconfig.get('dbparams','db_name')
if self.targettime is None:
self._dbparams['restoretarget'] = datetime.strptime(dbconfig.get('dbparams','lasttime'), '%Y-%m-%d %H:%M:%S')
else:
self._dbparams['restoretarget'] = self.targettime.astimezone(get_localzone())
self._dbparams['bctfile'] = dbconfig.get('dbparams','bctfile')
Configuration.substitutions.update({
'db_name': self._dbparams['dbname'],
'db_compatible': dbconfig.get('dbparams','compatible'),
'db_files': dbconfig.get('dbparams','db_files'),
'db_undotbs': dbconfig.get('dbparams','undo_tablespace'),
'db_block_size': dbconfig.get('dbparams','db_block_size'),
# 'lastscn': dbconfig.get('dbparams','lastscn'),
'lasttime': self._dbparams['restoretarget'].strftime('%Y-%m-%d %H:%M:%S'),
'dbid': Configuration.get('dbid', self._configname),
'instancenumber': Configuration.get('autorestoreinstancenumber', self._configname),
'thread': Configuration.get('autorestorethread', self._configname),
'backupfinishedtime': dbconfig.get('dbparams','backup-finished'),
'bctfile': self._dbparams['bctfile'],
'autorestoredestination': self._restoredest,
'mountdestination': self._mountdest,
})
try:
Configuration.substitutions.update({'cdb': dbconfig.get('dbparams','enable_pluggable_database')})
except:
Configuration.substitutions.update({'cdb': 'FALSE'})
self._initfile = os.path.join(self._restoredest, 'init.ora')
Configuration.substitutions.update({
'initora': self._initfile,
})
def _ts_to_date(self, ts):
return datetime.fromtimestamp(ts, tzlocal.get_localzone())
def human_date_to_datetime(time_input, tz='UTC'):
""" Convert human readable date (e.g. 30 days ago) to datetime.datetime using
parsedatetime module.
Examples:
* August 25th, 2008
* 25 Aug 2008
* Aug 25 5pm
* 5pm August 25
* next saturday
* tomorrow
* next thursday at 4pm
* at 4pm
* eod
* tomorrow eod
* eod tuesday
* eoy
* eom
* in 5 minutes
* 5 minutes from now
* 5 hours before now
* 2 hours before noon
* 2 days from tomorrow
Args:
time_input (string): The time input string (see formats above).
tz (string): The time zone for the returned data.
Returns:
(datetime.datetime): Python datetime.datetime object.
"""
dt = None
cal = pdt.Calendar()
local_tz = timezone(get_localzone().zone)
dt, status = cal.parseDT(time_input, tzinfo=local_tz)
dt = dt.astimezone(timezone(tz))
if status == 0:
dt = None
return dt
def due_date(self, due_date):
""" Sets or clears the due date. """
if due_date and not isinstance(due_date, datetime):
raise TypeError
if due_date:
self.__task_dict['date'] = \
due_date.astimezone(get_localzone()).date()
elif 'date' in self.__task_dict:
del self.__task_dict['date']
def due_date(self, due_date):
""" Sets or clears the due date. """
if due_date and not isinstance(due_date, datetime):
raise TypeError
if due_date:
self.__task_dict['date'] = \
due_date.astimezone(get_localzone()).date()
elif 'date' in self.__task_dict:
del self.__task_dict['date']
def get_event_detail(self, pguid, guid):
"""
Fetches a single event's details by specifying a pguid
(a calendar) and a guid (an event's ID).
"""
params = dict(self.params)
params.update({'lang': 'en-us', 'usertz': get_localzone().zone})
url = '%s/%s/%s' % (self._calendar_event_detail_url, pguid, guid)
req = self.session.get(url, params=params)
self.response = req.json()
return self.response['Event'][0]
def _local_tz(self):
"""Returns the local timezone."""
return get_localzone()
def time(self, timezone_code):
"""Return current time and time in the timezone listed.
**Dependencies**: pip install pytz tzlocal
Keyword arguments:
timezone_code -- Code for timezone
"""
local_time = datetime.now(get_localzone())
converted_time = datetime.now(timezone(timezone_code.upper()))
await self.bot.say("```Local time : {} \n\n {} : {}```"
.format(local_time, timezone_code, converted_time))
def kw_to_sqlalchemy(sqlalchemy_table_cls, kw):
sqlalchemy_table = sqlalchemy_table_cls()
for column, value in kw.iteritems():
try:
column_type = _find_type(sqlalchemy_table_cls, column)
python_type = column_type.python_type
except NameError:
continue
try:
if not issubclass(type(value), python_type):
if issubclass(python_type, bool):
column_value = True if value.lower() == 'true' else False
else:
column_value = python_type(value)
else:
column_value = value
except UnicodeEncodeError:
column_value = unicode(value)
finally:
if issubclass(python_type, datetime):
if column_type.timezone:
# Convert to local time
tz = get_localzone()
column_value = tz.localize(column_value, is_dst=None)
setattr(sqlalchemy_table, column, column_value)
return sqlalchemy_table
def print_log(self, verbose=False):
local_tz = get_localzone()
for vault in self.vaults:
try:
yield from vault.backend.open()
except VaultNotInitialized:
logger.error('%s has not been initialized. Use "syncrypt init" to register the folder as vault.' % vault)
continue
queue = yield from vault.backend.changes(None, None, verbose=verbose)
while True:
item = yield from queue.get()
if item is None:
break
store_hash, metadata, server_info = item
bundle = VirtualBundle(None, vault, store_hash=store_hash)
yield from bundle.write_encrypted_metadata(Once(metadata))
rev_id = server_info['id'].decode(vault.config.encoding)
created_at = iso8601.parse_date(server_info['created_at'].decode())\
.astimezone(local_tz)\
.strftime('%x %X')
operation = server_info['operation'].decode(vault.config.encoding)
if verbose:
user_email = server_info['email'].decode(vault.config.encoding)
print("%s | %s | %s | %-9s %s" % (created_at, rev_id, user_email,
operation, bundle.relpath))
else:
print("%s | %-9s %s" % (created_at, operation, bundle.relpath))
yield from self.wait()
def is_uploaded_after(self, package, when):
if not self.is_uploaded(package):
return False
a_file = self.uploaded_files_for(package)[0]
return datetime.datetime.fromtimestamp(os.path.getmtime(a_file), tzlocal.get_localzone()) >= when