def add_credentials_options(self):
"""Add credentials to the Options dictionary (if necessary)."""
api_username = self.configuration.username
api_key = self.configuration.password
self.options['api_username'] = api_username
if self.configuration.secure_auth == True:
timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
uri = '/' + self.configuration.sub_url + ('/' if self.domain_name.strip()=='' else '/' + self.domain_name + '/') + self.service_name
self.options['timestamp'] = timestamp
params = ''.join([api_username, timestamp, uri])
self.options['signature'] = hmac.new(api_key, params, digestmod=hashlib.sha1).hexdigest()
else:
self.options['api_key'] = api_key
python类utcnow()的实例源码
def check_if_media_sync_offset_satisfied(logger, settings, audit):
"""
Check if the media sync offset is satisfied. The media sync offset is a duration in seconds specified in the
configuration file. This duration is the amount of time audit media is given to sync up with SafetyCulture servers
before this tool exports the audit data.
:param logger: The logger
:param settings: Settings from command line and configuration file
:param audit: Audit JSON
:return: Boolean - True if the media sync offset is satisfied, otherwise, returns false.
"""
modified_at = dateutil.parser.parse(audit['modified_at'])
now = datetime.utcnow()
elapsed_time_difference = (pytz.utc.localize(now) - modified_at)
# if the media_sync_offset has been satisfied
if not elapsed_time_difference > timedelta(seconds=settings[MEDIA_SYNC_OFFSET_IN_SECONDS]):
logger.info('Audit {0} modified too recently, some media may not have completed syncing. Skipping export until next sync cycle'.format(
audit['audit_id']))
return False
return True
def elapsed(self):
"""
Returns the elapsed time (as a float) of the threaded execution which
includes the number of microseconds.
"""
if self._execution_begin is None:
# No elapsed time has taken place yet
return 0.0
if self._execution_finish is not None:
# Execution has completed, we only want to calculate
# the execution time.
elapsed_time = self._execution_finish - self._execution_begin
else:
# Calculate Elapsed Time
elapsed_time = datetime.utcnow() - self._execution_begin
elapsed_time = (elapsed_time.days * 86400) \
+ elapsed_time.seconds \
+ (elapsed_time.microseconds/1e6)
return elapsed_time
def export(metadata, start, end, container_image_pattern):
queries = []
metadata["start"] = start.isoformat() + "Z"
metadata["end"] = end.isoformat() + "Z"
metadata["services"] = []
ts = datetime.utcnow().strftime("%Y%m%d%H%M%S-")
path = os.path.join(metadata["metrics_export"], ts + metadata["measurement_name"])
if not os.path.isdir(path):
os.makedirs(path)
for app in APPS:
metadata["services"].append(dump_app(app, path, start, end, container_image_pattern))
with open(os.path.join(path, "metadata.json"), "w+") as f:
json.dump(metadata, f, cls=Encoder, sort_keys=True, indent=4)
f.flush()
def insert_earned_achievement(aid, data):
"""
Store earned achievement for a user/team.
Args:
aid: the achievement id
data: the data necessary to assess the achievement
must include tid, uid
"""
db = api.common.get_conn()
tid, uid = data.pop("tid"), data.pop("uid")
name, description = data.pop("name"), data.pop("description")
db.earned_achievements.insert({
"aid": aid,
"tid": tid,
"uid": uid,
"data": data,
"name": name,
"description": description,
"timestamp": datetime.utcnow().timestamp(),
"seen": False
})
def block_before_competition(return_result):
"""
Wraps a routing function that should be blocked before the start time of the competition
"""
def decorator(f):
"""
Inner decorator
"""
@wraps(f)
def wrapper(*args, **kwds):
if datetime.utcnow().timestamp() > api.config.get_settings()["start_time"].timestamp():
return f(*args, **kwds)
else:
return return_result
return wrapper
return decorator
def block_after_competition(return_result):
"""
Wraps a routing function that should be blocked after the end time of the competition
"""
def decorator(f):
"""
Inner decorator
"""
@wraps(f)
def wrapper(*args, **kwds):
if datetime.utcnow().timestamp() < api.config.get_settings()["end_time"].timestamp():
return f(*args, **kwds)
else:
return return_result
return wrapper
return decorator
def LOGEXCEPTION(message):
if LOGGER:
LOGGER.exception(message)
else:
print(
'%sZ [EXC!]: %s\nexception was: %s' % (
datetime.utcnow().isoformat(),
message, format_exc()
)
)
#######################
## UTILITY FUNCTIONS ##
#######################
# Utility function to report best scores
# modified from a snippet taken from:
# http://scikit-learn.org/stable/auto_examples/model_selection/plot_randomized_search.html
def LOGEXCEPTION(message):
if LOGGER:
LOGGER.exception(message)
else:
print(
'%sZ [EXC!]: %s\nexception was: %s' % (
datetime.utcnow().isoformat(),
message, format_exc()
)
)
########################
## COLUMN DEFINITIONS ##
########################
# LC column definitions
# the first elem is the column description, the second is the format to use when
# writing a CSV LC column, the third is the type to use when parsing a CSV LC
# column
def LOGEXCEPTION(message):
if LOGGER:
LOGGER.exception(message)
else:
print(
'%sZ [EXC!]: %s\nexception was: %s' % (
datetime.utcnow().isoformat(),
message, format_exc()
)
)
###################
## USEFUL CONFIG ##
###################
# used to find HATIDs
def LOGEXCEPTION(message):
if LOGGER:
LOGGER.exception(message)
else:
print(
'%sZ [EXC!]: %s\nexception was: %s' % (
dtime.utcnow().isoformat(),
message, format_exc()
)
)
###############################################
## MAGIC CONSTANTS FOR 2MASS TO COUSINS/SDSS ##
###############################################
# converting from JHK to BVRI
def LOGEXCEPTION(message):
if LOGGER:
LOGGER.exception(message)
else:
print(
'%sZ [EXC!]: %s\nexception was: %s' % (
datetime.utcnow().isoformat(),
message, format_exc()
)
)
###################
## LOCAL IMPORTS ##
###################
# LC reading functions
def LOGEXCEPTION(message):
if LOGGER:
LOGGER.exception(message)
else:
print(
'%sZ [EXC!]: %s\nexception was: %s' % (
datetime.utcnow().isoformat(),
message, format_exc()
)
)
###################
## LOCAL IMPORTS ##
###################
def LOGEXCEPTION(message):
if LOGGER:
LOGGER.exception(message)
else:
print(
'%sZ [EXC!]: %s\nexception was: %s' % (
datetime.utcnow().isoformat(),
message, format_exc()
)
)
###########################
## FLARE MODEL FUNCTIONS ##
###########################
def LOGEXCEPTION(message):
if LOGGER:
LOGGER.exception(message)
else:
print(
'%sZ [EXC!]: %s\nexception was: %s' % (
datetime.utcnow().isoformat(),
message, format_exc()
)
)
########################
## SOME OTHER IMPORTS ##
########################
def LOGEXCEPTION(message):
if LOGGER:
LOGGER.exception(message)
else:
print(
'%sZ [EXC!]: %s\nexception was: %s' % (
datetime.utcnow().isoformat(),
message, format_exc()
)
)
###################
## LOCAL IMPORTS ##
###################
# LC reading functions
def LOGEXCEPTION(message):
if LOGGER:
LOGGER.exception(message)
else:
print(
'%sZ [EXC!]: %s\nexception was: %s' % (
datetime.utcnow().isoformat(),
message, format_exc()
)
)
#######################
## UTILITY FUNCTIONS ##
#######################
# this is from Tornado's source (MIT License):
# http://www.tornadoweb.org/en/stable/_modules/tornado/escape.html#squeeze
def get_package_loader(self, package, package_path):
from pkg_resources import DefaultProvider, ResourceManager, \
get_provider
loadtime = datetime.utcnow()
provider = get_provider(package)
manager = ResourceManager()
filesystem_bound = isinstance(provider, DefaultProvider)
def loader(path):
if path is None:
return None, None
path = posixpath.join(package_path, path)
if not provider.has_resource(path):
return None, None
basename = posixpath.basename(path)
if filesystem_bound:
return basename, self._opener(
provider.get_resource_filename(manager, path))
return basename, lambda: (
provider.get_resource_stream(manager, path),
loadtime,
0
)
return loader
def container_sas(self):
container_name = self._create_container()
self.service.create_blob_from_text(container_name, 'blob1', b'hello world')
# Access only to the blobs in the given container
# Read permissions to access blobs
# Expires in an hour
token = self.service.generate_container_shared_access_signature(
container_name,
ContainerPermissions.READ,
datetime.utcnow() + timedelta(hours=1),
)
# Create a service and use the SAS
sas_service = BlockBlobService(
account_name=self.account.account_name,
sas_token=token,
)
blob = sas_service.get_blob_to_text(container_name, 'blob1')
content = blob.content # hello world
self.service.delete_container(container_name)
def test_streams_tasks():
mock_scheduler = Mock()
streams_tasks(mock_scheduler)
mock_scheduler.schedule.assert_called_once_with(
scheduled_time=datetime.utcnow(),
func=groom_redis_precaches,
interval=60 * 60 * 24,
)
def streams_tasks(scheduler):
scheduler.schedule(
scheduled_time=datetime.utcnow(),
func=groom_redis_precaches,
interval=60*60*24, # a day
)
def export_actions(logger, settings, sc_client):
"""
Export all actions created after date specified
:param logger: The logger
:param settings: Settings from command line and configuration file
:param sc_client: instance of safetypy.SafetyCulture class
"""
logger.info('Exporting iAuditor actions')
last_successful_actions_export = get_last_successful_actions_export(logger)
actions_array = sc_client.get_audit_actions(last_successful_actions_export)
if actions_array is not None:
logger.info('Found ' + str(len(actions_array)) + ' actions')
save_exported_actions_to_csv_file(logger, settings[EXPORT_PATH], actions_array)
utc_iso_datetime_now = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.000Z')
update_actions_sync_marker_file(logger, utc_iso_datetime_now)
def utctimestamp():
ts = datetime.utcnow() - datetime(1970, 1, 1)
return ts.seconds + ts.days * 24 * 3600
def send_remind_activating_email(username):
#admin_email_address = env.getenv('ADMIN_EMAIL_ADDRESS')
nulladdr = ['\'\'', '\"\"', '']
email_from_address = settings.get('EMAIL_FROM_ADDRESS')
admin_email_address = settings.get('ADMIN_EMAIL_ADDRESS')
if (email_from_address in nulladdr or admin_email_address in nulladdr):
return
#text = 'Dear '+ username + ':\n' + ' Your account in docklet has been activated'
text = '<html><h4>Dear '+ 'admin' + ':</h4>'
text += '''<p> An activating request for %s in <a href='%s'>%s</a> has been sent</p>
<p> Please check it !</p>
<br/><br/>
<p> Docklet Team, SEI, PKU</p>
''' % (username, env.getenv("PORTAL_URL"), env.getenv("PORTAL_URL"))
text += '<p>'+ str(datetime.utcnow()) + '</p>'
text += '</html>'
subject = 'An activating request in Docklet has been sent'
if admin_email_address[0] == '"':
admins_addr = admin_email_address[1:-1].split(" ")
else:
admins_addr = admin_email_address.split(" ")
alladdr=""
for addr in admins_addr:
alladdr = alladdr+addr+", "
alladdr=alladdr[:-2]
msg = MIMEMultipart()
textmsg = MIMEText(text,'html','utf-8')
msg['Subject'] = Header(subject, 'utf-8')
msg['From'] = email_from_address
msg['To'] = alladdr
msg.attach(textmsg)
s = smtplib.SMTP()
s.connect()
try:
s.sendmail(email_from_address, admins_addr, msg.as_string())
except Exception as e:
logger.error(e)
s.close()
def __init__(self, title, content=''):
self.title = title
self.content = content
self.create_date = datetime.utcnow()
self.status = 'open'
def check_competition_active():
"""
Is the competition currently running
"""
settings = api.config.get_settings()
return settings["start_time"].timestamp() < datetime.utcnow().timestamp() < settings["end_time"].timestamp()
def get_time():
return WebSuccess(data=int(datetime.utcnow().timestamp()))
def gdq(bot, trigger):
now = datetime.utcnow()
now = now.replace(tzinfo=timezone.utc)
delta = datetime(2018,1,7,16,30,tzinfo=timezone.utc) - now
textdate = "January 7"
url = 'https://gamesdonequick.com/schedule'
try:
x = requests.get(url).content
except:
return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate))
bs = BeautifulSoup(x)
try:
run = bs.find("table",{"id":"runTable"}).tbody
except:
return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate))
try:
gdqstart = datetime.strptime(run.td.getText(), '%Y-%m-%dT%H:%M:%SZ')
gdqstart = gdqstart.replace(tzinfo=timezone.utc)
except:
return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate))
(game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment) = getinfo(run,now)
if not nextgame:
return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate))
if now < gdqstart:
tts = gdqstart - now
if tts.days <= 3:
return bot.say("GDQ is {0}H{1}M away. First game: {2} by {3} ETA: {4} Comment: {5} | https://gamesdonequick.com/schedule".format(int(tts.total_seconds() // 3600),int((tts.total_seconds() % 3600) // 60), nextgame, nextrunner, nexteta, nextcomment))
else:
return bot.say("GDQ is {0} days away ({1}) | https://gamesdonequick.com/schedule".format(tts.days,gdqstart.strftime('%m/%d/%Y')))
if nextgame == 'done':
return bot.say("GDQ is {0} days away ({1} [estimated])".format(delta.days,textdate))
if game:
if comment:
bot.say("Current Game: {0} by {1} ETA: {2} Comment: {3} | Next Game: {4} by {5} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, comment, nextgame, nextrunner))
else:
bot.say("Current Game: {0} by {1} ETA: {2} | Next Game: {3} by {4} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, nextgame, nextrunner))
else:
bot.say("Current Game: setup?? | Next Game {0} by {1} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(nextgame, nextrunner))
def get_time(time, compare_time):
if not time or not time.strip():
return datetime.utcnow()
else:
time = parse_time(time)
if time:
if time == compare_time:
return None
elif time > compare_time:
return time if (time - compare_time > ONE_SECOND) else None
else:
return time if (compare_time - time > ONE_SECOND) else None
else:
return None
def iso_time_now():
return datetime.utcnow().strftime(ISO_TIME_FORMAT)