python类utcnow()的实例源码

request.py 文件源码 项目:Cortex-Analyzers 作者: CERT-BDF 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def add_credentials_options(self):
        """Add credentials to the Options dictionary (if necessary)."""

        api_username = self.configuration.username
        api_key      = self.configuration.password

        self.options['api_username'] = api_username

        if self.configuration.secure_auth == True:
            timestamp = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')

            uri       = '/' + self.configuration.sub_url + ('/' if self.domain_name.strip()=='' else '/' + self.domain_name + '/') + self.service_name

            self.options['timestamp'] = timestamp
            params                    = ''.join([api_username, timestamp, uri])
            self.options['signature'] = hmac.new(api_key, params, digestmod=hashlib.sha1).hexdigest()
        else:
            self.options['api_key']   = api_key
exporter.py 文件源码 项目:safetyculture-sdk-python 作者: SafetyCulture 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def check_if_media_sync_offset_satisfied(logger, settings, audit):
    """
    Check if the media sync offset is satisfied. The media sync offset is a duration in seconds specified in the
    configuration file. This duration is the amount of time audit media is given to sync up with SafetyCulture servers
    before this tool exports the audit data.
    :param logger:    The logger
    :param settings:  Settings from command line and configuration file
    :param audit:     Audit JSON
    :return:          Boolean - True if the media sync offset is satisfied, otherwise, returns false.
    """
    modified_at = dateutil.parser.parse(audit['modified_at'])
    now = datetime.utcnow()
    elapsed_time_difference = (pytz.utc.localize(now) - modified_at)
    # if the media_sync_offset has been satisfied
    if not elapsed_time_difference > timedelta(seconds=settings[MEDIA_SYNC_OFFSET_IN_SECONDS]):
        logger.info('Audit {0} modified too recently, some media may not have completed syncing. Skipping export until next sync cycle'.format(
            audit['audit_id']))
        return False
    return True
SubProcess.py 文件源码 项目:newsreap 作者: caronc 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def elapsed(self):
        """
        Returns the elapsed time (as a float) of the threaded execution which
        includes the number of microseconds.

        """
        if self._execution_begin is None:
            # No elapsed time has taken place yet
            return 0.0

        if self._execution_finish is not None:
            # Execution has completed, we only want to calculate
            # the execution time.
            elapsed_time = self._execution_finish - self._execution_begin

        else:
            # Calculate Elapsed Time
            elapsed_time = datetime.utcnow() - self._execution_begin

        elapsed_time = (elapsed_time.days * 86400) \
                         + elapsed_time.seconds \
                         + (elapsed_time.microseconds/1e6)

        return elapsed_time
export_metrics.py 文件源码 项目:rca-evaluation 作者: sieve-microservices 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def export(metadata, start, end, container_image_pattern):

    queries = []

    metadata["start"] = start.isoformat() + "Z"
    metadata["end"] = end.isoformat() + "Z"
    metadata["services"] = []

    ts = datetime.utcnow().strftime("%Y%m%d%H%M%S-")
    path = os.path.join(metadata["metrics_export"], ts + metadata["measurement_name"])
    if not os.path.isdir(path):
        os.makedirs(path)

    for app in APPS:
        metadata["services"].append(dump_app(app, path, start, end, container_image_pattern))

    with open(os.path.join(path, "metadata.json"), "w+") as f:
        json.dump(metadata, f, cls=Encoder, sort_keys=True, indent=4)
        f.flush()
achievement.py 文件源码 项目:picoCTF 作者: picoCTF 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def insert_earned_achievement(aid, data):
    """
    Store earned achievement for a user/team.

    Args:
        aid: the achievement id
        data: the data necessary to assess the achievement
              must include tid, uid
    """

    db = api.common.get_conn()

    tid, uid = data.pop("tid"), data.pop("uid")
    name, description = data.pop("name"), data.pop("description")

    db.earned_achievements.insert({
        "aid": aid,
        "tid": tid,
        "uid": uid,
        "data": data,
        "name": name,
        "description": description,
        "timestamp": datetime.utcnow().timestamp(),
        "seen": False
    })
annotations.py 文件源码 项目:picoCTF 作者: picoCTF 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def block_before_competition(return_result):
    """
    Wraps a routing function that should be blocked before the start time of the competition
    """

    def decorator(f):
        """
        Inner decorator
        """

        @wraps(f)
        def wrapper(*args, **kwds):
            if datetime.utcnow().timestamp() > api.config.get_settings()["start_time"].timestamp():
                return f(*args, **kwds)
            else:
                return return_result
        return wrapper
    return decorator
annotations.py 文件源码 项目:picoCTF 作者: picoCTF 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def block_after_competition(return_result):
    """
    Wraps a routing function that should be blocked after the end time of the competition
    """

    def decorator(f):
        """
        Inner decorator
        """

        @wraps(f)
        def wrapper(*args, **kwds):
            if datetime.utcnow().timestamp() < api.config.get_settings()["end_time"].timestamp():
                return f(*args, **kwds)
            else:
                return return_result
        return wrapper
    return decorator
rfclass.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def LOGEXCEPTION(message):
    if LOGGER:
        LOGGER.exception(message)
    else:
        print(
            '%sZ [EXC!]: %s\nexception was: %s' % (
                datetime.utcnow().isoformat(),
                message, format_exc()
                )
            )



#######################
## UTILITY FUNCTIONS ##
#######################

# Utility function to report best scores
# modified from a snippet taken from:
# http://scikit-learn.org/stable/auto_examples/model_selection/plot_randomized_search.html
k2hat.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def LOGEXCEPTION(message):
    if LOGGER:
        LOGGER.exception(message)
    else:
        print(
            '%sZ [EXC!]: %s\nexception was: %s' % (
                datetime.utcnow().isoformat(),
                message, format_exc()
                )
            )



########################
## COLUMN DEFINITIONS ##
########################

# LC column definitions
# the first elem is the column description, the second is the format to use when
# writing a CSV LC column, the third is the type to use when parsing a CSV LC
# column
hplc.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def LOGEXCEPTION(message):
    if LOGGER:
        LOGGER.exception(message)
    else:
        print(
            '%sZ [EXC!]: %s\nexception was: %s' % (
                datetime.utcnow().isoformat(),
                message, format_exc()
                )
            )



###################
## USEFUL CONFIG ##
###################

# used to find HATIDs
magnitudes.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def LOGEXCEPTION(message):
    if LOGGER:
        LOGGER.exception(message)
    else:
        print(
            '%sZ [EXC!]: %s\nexception was: %s' % (
                dtime.utcnow().isoformat(),
                message, format_exc()
                )
            )


###############################################
## MAGIC CONSTANTS FOR 2MASS TO COUSINS/SDSS ##
###############################################

# converting from JHK to BVRI
generation.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 48 收藏 0 点赞 0 评论 0
def LOGEXCEPTION(message):
    if LOGGER:
        LOGGER.exception(message)
    else:
        print(
            '%sZ [EXC!]: %s\nexception was: %s' % (
                datetime.utcnow().isoformat(),
                message, format_exc()
                )
            )


###################
## LOCAL IMPORTS ##
###################

# LC reading functions
plotbase.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def LOGEXCEPTION(message):
    if LOGGER:
        LOGGER.exception(message)
    else:
        print(
            '%sZ [EXC!]: %s\nexception was: %s' % (
                datetime.utcnow().isoformat(),
                message, format_exc()
                )
            )



###################
## LOCAL IMPORTS ##
###################
flares.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def LOGEXCEPTION(message):
    if LOGGER:
        LOGGER.exception(message)
    else:
        print(
            '%sZ [EXC!]: %s\nexception was: %s' % (
                datetime.utcnow().isoformat(),
                message, format_exc()
                )
            )



###########################
## FLARE MODEL FUNCTIONS ##
###########################
__init__.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def LOGEXCEPTION(message):
    if LOGGER:
        LOGGER.exception(message)
    else:
        print(
            '%sZ [EXC!]: %s\nexception was: %s' % (
                datetime.utcnow().isoformat(),
                message, format_exc()
                )
            )



########################
## SOME OTHER IMPORTS ##
########################
lcproc.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def LOGEXCEPTION(message):
    if LOGGER:
        LOGGER.exception(message)
    else:
        print(
            '%sZ [EXC!]: %s\nexception was: %s' % (
                datetime.utcnow().isoformat(),
                message, format_exc()
                )
            )


###################
## LOCAL IMPORTS ##
###################

# LC reading functions
hatlc.py 文件源码 项目:astrobase 作者: waqasbhatti 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def LOGEXCEPTION(message):
    if LOGGER:
        LOGGER.exception(message)
    else:
        print(
            '%sZ [EXC!]: %s\nexception was: %s' % (
                datetime.utcnow().isoformat(),
                message, format_exc()
                )
            )



#######################
## UTILITY FUNCTIONS ##
#######################

# this is from Tornado's source (MIT License):
# http://www.tornadoweb.org/en/stable/_modules/tornado/escape.html#squeeze
wsgi.py 文件源码 项目:Flask_Blog 作者: sugarguo 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_package_loader(self, package, package_path):
        from pkg_resources import DefaultProvider, ResourceManager, \
            get_provider
        loadtime = datetime.utcnow()
        provider = get_provider(package)
        manager = ResourceManager()
        filesystem_bound = isinstance(provider, DefaultProvider)

        def loader(path):
            if path is None:
                return None, None
            path = posixpath.join(package_path, path)
            if not provider.has_resource(path):
                return None, None
            basename = posixpath.basename(path)
            if filesystem_bound:
                return basename, self._opener(
                    provider.get_resource_filename(manager, path))
            return basename, lambda: (
                provider.get_resource_stream(manager, path),
                loadtime,
                0
            )
        return loader
sas_usage.py 文件源码 项目:blobfs 作者: mbartoli 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def container_sas(self):        
        container_name = self._create_container()
        self.service.create_blob_from_text(container_name, 'blob1', b'hello world')

        # Access only to the blobs in the given container
        # Read permissions to access blobs
        # Expires in an hour
        token = self.service.generate_container_shared_access_signature(
            container_name,
            ContainerPermissions.READ,
            datetime.utcnow() + timedelta(hours=1),
        )

        # Create a service and use the SAS
        sas_service = BlockBlobService(
            account_name=self.account.account_name,
            sas_token=token,
        )

        blob = sas_service.get_blob_to_text(container_name, 'blob1')
        content = blob.content # hello world

        self.service.delete_container(container_name)
test_tasks.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def test_streams_tasks():
    mock_scheduler = Mock()
    streams_tasks(mock_scheduler)
    mock_scheduler.schedule.assert_called_once_with(
        scheduled_time=datetime.utcnow(),
        func=groom_redis_precaches,
        interval=60 * 60 * 24,
    )
tasks.py 文件源码 项目:socialhome 作者: jaywink 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def streams_tasks(scheduler):
    scheduler.schedule(
        scheduled_time=datetime.utcnow(),
        func=groom_redis_precaches,
        interval=60*60*24,  # a day
    )
exporter.py 文件源码 项目:safetyculture-sdk-python 作者: SafetyCulture 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def export_actions(logger, settings, sc_client):
    """
    Export all actions created after date specified
    :param logger:      The logger
    :param settings:    Settings from command line and configuration file
    :param sc_client:   instance of safetypy.SafetyCulture class
    """
    logger.info('Exporting iAuditor actions')
    last_successful_actions_export = get_last_successful_actions_export(logger)
    actions_array = sc_client.get_audit_actions(last_successful_actions_export)
    if actions_array is not None:
        logger.info('Found ' + str(len(actions_array)) + ' actions')
        save_exported_actions_to_csv_file(logger, settings[EXPORT_PATH], actions_array)
        utc_iso_datetime_now = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%S.000Z')
        update_actions_sync_marker_file(logger, utc_iso_datetime_now)
osdk.py 文件源码 项目:ekko 作者: openstack 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def utctimestamp():
    ts = datetime.utcnow() - datetime(1970, 1, 1)
    return ts.seconds + ts.days * 24 * 3600
userManager.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def send_remind_activating_email(username):
    #admin_email_address = env.getenv('ADMIN_EMAIL_ADDRESS')
    nulladdr = ['\'\'', '\"\"', '']
    email_from_address = settings.get('EMAIL_FROM_ADDRESS')
    admin_email_address = settings.get('ADMIN_EMAIL_ADDRESS')
    if (email_from_address in nulladdr or admin_email_address in nulladdr):
        return
    #text = 'Dear '+ username + ':\n' + '  Your account in docklet has been activated'
    text = '<html><h4>Dear '+ 'admin' + ':</h4>'
    text += '''<p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;An activating request for %s in <a href='%s'>%s</a> has been sent</p>
               <p>&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Please check it !</p>
               <br/><br/>
               <p> Docklet Team, SEI, PKU</p>
            ''' % (username, env.getenv("PORTAL_URL"), env.getenv("PORTAL_URL"))
    text += '<p>'+  str(datetime.utcnow()) + '</p>'
    text += '</html>'
    subject = 'An activating request in Docklet has been sent'
    if admin_email_address[0] == '"':
        admins_addr = admin_email_address[1:-1].split(" ")
    else:
        admins_addr = admin_email_address.split(" ")
    alladdr=""
    for addr in admins_addr:
        alladdr = alladdr+addr+", "
    alladdr=alladdr[:-2]
    msg = MIMEMultipart()
    textmsg = MIMEText(text,'html','utf-8')
    msg['Subject'] = Header(subject, 'utf-8')
    msg['From'] = email_from_address
    msg['To'] = alladdr
    msg.attach(textmsg)
    s = smtplib.SMTP()
    s.connect()
    try:
        s.sendmail(email_from_address, admins_addr, msg.as_string())
    except Exception as e:
        logger.error(e)
    s.close()
model.py 文件源码 项目:docklet 作者: unias 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self, title, content=''):
        self.title = title
        self.content = content
        self.create_date = datetime.utcnow()
        self.status = 'open'
utilities.py 文件源码 项目:picoCTF 作者: picoCTF 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def check_competition_active():
    """
    Is the competition currently running
    """

    settings = api.config.get_settings()

    return settings["start_time"].timestamp() < datetime.utcnow().timestamp() < settings["end_time"].timestamp()
app.py 文件源码 项目:picoCTF 作者: picoCTF 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def get_time():
    return WebSuccess(data=int(datetime.utcnow().timestamp()))
gdq.py 文件源码 项目:sopel-modules 作者: phixion 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def gdq(bot, trigger):
    now = datetime.utcnow()
    now = now.replace(tzinfo=timezone.utc)
    delta = datetime(2018,1,7,16,30,tzinfo=timezone.utc) - now
    textdate = "January 7"
    url = 'https://gamesdonequick.com/schedule'
    try:
        x = requests.get(url).content
    except:
        return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate))
    bs = BeautifulSoup(x)
    try:
        run = bs.find("table",{"id":"runTable"}).tbody
    except:
        return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate))
    try:
        gdqstart = datetime.strptime(run.td.getText(), '%Y-%m-%dT%H:%M:%SZ')
        gdqstart = gdqstart.replace(tzinfo=timezone.utc)
    except:
        return bot.say("GDQ is {0} days away ({1})".format(delta.days, textdate))
    (game, runner, console, comment, eta, nextgame, nextrunner, nexteta, nextconsole, nextcomment) = getinfo(run,now)
    if not nextgame:
        return bot.say("GDQ is {0} days away ({1})".format(delta.days,textdate))
    if now < gdqstart:
        tts = gdqstart - now
        if tts.days <= 3:
            return bot.say("GDQ is {0}H{1}M away.  First game: {2} by {3} ETA: {4} Comment: {5} | https://gamesdonequick.com/schedule".format(int(tts.total_seconds() // 3600),int((tts.total_seconds() % 3600) // 60), nextgame, nextrunner, nexteta, nextcomment))
        else:
            return bot.say("GDQ is {0} days away ({1}) | https://gamesdonequick.com/schedule".format(tts.days,gdqstart.strftime('%m/%d/%Y')))

    if nextgame == 'done':
        return bot.say("GDQ is {0} days away ({1} [estimated])".format(delta.days,textdate))
    if game:
        if comment:
            bot.say("Current Game: {0} by {1} ETA: {2} Comment: {3} | Next Game: {4} by {5} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, comment, nextgame, nextrunner))
        else:
            bot.say("Current Game: {0} by {1} ETA: {2} | Next Game: {3} by {4} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(game, runner, eta, nextgame, nextrunner))
    else:
        bot.say("Current Game: setup?? | Next Game {0} by {1} | http://www.twitch.tv/gamesdonequick | https://gamesdonequick.com/schedule".format(nextgame, nextrunner))
time_format.py 文件源码 项目:tornado-ssdb-project 作者: ego008 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def get_time(time, compare_time):
    if not time or not time.strip():
        return datetime.utcnow()
    else:
        time = parse_time(time)
        if time:
            if time == compare_time:
                return None
            elif time > compare_time:
                return time if (time - compare_time > ONE_SECOND) else None
            else:
                return time if (compare_time - time > ONE_SECOND) else None
        else:
            return None
time_format.py 文件源码 项目:tornado-ssdb-project 作者: ego008 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def iso_time_now():
    return datetime.utcnow().strftime(ISO_TIME_FORMAT)


问题


面经


文章

微信
公众号

扫码关注公众号