python类now()的实例源码

feedgenerator.py 文件源码 项目:liberator 作者: libscie 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def latest_post_date(self):
        """
        Returns the latest item's pubdate or updateddate. If no items
        have either of these attributes this returns the current UTC date/time.
        """
        latest_date = None
        date_keys = ('updateddate', 'pubdate')

        for item in self.items:
            for date_key in date_keys:
                item_date = item.get(date_key)
                if item_date:
                    if latest_date is None or item_date > latest_date:
                        latest_date = item_date

        # datetime.now(tz=utc) is slower, as documented in django.utils.timezone.now
        return latest_date or datetime.datetime.utcnow().replace(tzinfo=utc)
client.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def restart(self, timeout=None):
        """Restarts this Splunk instance.

        The service is unavailable until it has successfully restarted.

        If a *timeout* value is specified, ``restart`` blocks until the service
        resumes or the timeout period has been exceeded. Otherwise, ``restart`` returns
        immediately.

        :param timeout: A timeout period, in seconds.
        :type timeout: ``integer``
        """
        msg = { "value": "Restart requested by " + self.username + "via the Splunk SDK for Python"}
        # This message will be deleted once the server actually restarts.
        self.messages.create(name="restart_required", **msg)
        result = self.post("/services/server/control/restart")
        if timeout is None: 
            return result
        start = datetime.now()
        diff = timedelta(seconds=timeout)
        while datetime.now() - start < diff:
            try:
                self.login()
                if not self.restart_required:
                    return result
            except Exception, e:
                sleep(1)
        raise Exception, "Operation time out."
client.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def search(self, query, **kwargs):
        """Runs a search using a search query and any optional arguments you
        provide, and returns a `Job` object representing the search.

        :param query: A search query.
        :type query: ``string``
        :param kwargs: Arguments for the search (optional):

            * "output_mode" (``string``): Specifies the output format of the
              results.

            * "earliest_time" (``string``): Specifies the earliest time in the
              time range to
              search. The time string can be a UTC time (with fractional
              seconds), a relative time specifier (to now), or a formatted
              time string.

            * "latest_time" (``string``): Specifies the latest time in the time
              range to
              search. The time string can be a UTC time (with fractional
              seconds), a relative time specifier (to now), or a formatted
              time string.

            * "rf" (``string``): Specifies one or more fields to add to the
              search.

        :type kwargs: ``dict``
        :rtype: class:`Job`
        :returns: An object representing the created job.
        """
        return self.jobs.create(query, **kwargs)
client.py 文件源码 项目:mongodb-monitoring 作者: jruaux 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def touch(self):
        """Extends the expiration time of the search to the current time (now) plus
        the time-to-live (ttl) value.

        :return: The :class:`Job`.
        """
        self.post("control", action="touch")
        return self
QABacktestclass.py 文件源码 项目:QUANTAXIS 作者: yutiansut 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def __init__(self):

        self.backtest_type = 'day'
        self.account = QA_Account()
        self.market = QA_Market()
        self.order = QA_QAMarket_bid_list()
        self.bid = QA_QAMarket_bid()
        self.setting = QA_Setting()
        self.clients = self.setting.client
        self.user = self.setting.QA_setting_user_name
        self.market_data = []
        self.now = None
        self.last_time = None
        self.strategy_start_date = ''
        self.strategy_start_time = ''
        self.strategy_end_date = ''
        self.strategy_end_time = ''
        self.today = None
        self.strategy_stock_list = []
        self.trade_list = []
        self.start_real_id = 0
        self.end_real_id = 0
        self.temp = {}
        self.commission_fee_coeff = 0.0015
        self.account_d_value = []
        self.account_d_key = []
        self.benchmark_type = 'index'
        self.market_data_dict = {}
        self.backtest_print_log = True  # ??
        self.if_save_to_mongo = True
        self.if_save_to_csv = True
        self.stratey_version = 'V1'
        self.topic_name = 'EXAMPLE'
        self.outside_data = []
        self.outside_data_dict = []
        self.outside_data_hashable = {}
        self.absoult_path = sys.path[0]
        self.dirs = '{}{}QUANTAXIS_RESULT{}{}{}{}{}'.format(
            self.absoult_path, os.sep, os.sep, self.topic_name, os.sep, self.stratey_version, os.sep)
QABacktestclass.py 文件源码 项目:QUANTAXIS 作者: yutiansut 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __save_strategy_files(self):

        file_name = '{}backtest_{}.py'.format(
            self.dirs, self.account.account_cookie)

        with open(sys.argv[0], 'rb') as p:
            data = p.read()

            collection = self.setting.client.quantaxis.strategy

            collection.insert({'cookie': self.account.account_cookie, 'name': self.strategy_name,
                               'topic': self.topic_name, 'version': self.stratey_version, 'user': self.user, 'datetime': datetime.datetime.now(),
                               'content': data.decode('utf-8'),
                               'dirs': self.dirs,
                               'absoultpath': self.absoult_path})
            with open(file_name, 'wb') as f:

                f.write(data)
QABacktestclass.py 文件源码 项目:QUANTAXIS 作者: yutiansut 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def _make_slice(self):

        QA_Setting.client.quantaxis.slice.insert({
            'cookie': self.account.account_cookie,
            'account_message': self.__messages,
            'account_d_value': self.account_d_value,
            'account_d_key': self.account_d_key,
            'now': self.now,
            'today': self.today,
            'running_date': self.running_date,
            'strategy_stock_list': self.strategy_stock_list,
            'dirs': self.dirs,

        })
QABacktestclass.py 文件源码 项目:QUANTAXIS 作者: yutiansut 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _deal_from_order_queue(self):

        # ??bar?????,????
        __result = []
        self.order.__init__()
        if len(self.account.order_queue) >= 1:
            __bid_list = self.order.from_dataframe(self.account.order_queue.query(
                'status!=200').query('status!=500').query('status!=400'))

            for item in __bid_list:
                # ?????????????
                item.date = self.today
                item.datetime = self.now

                __bid, __market = self.__wrap_bid(self, item)

                __message = self.__send_bid(
                    __bid, __market)

                if isinstance(__message, dict):
                    if __message['header']['status'] in ['200', 200]:
                        self.__sync_order_LM(
                            'trade', __bid, __message['header']['order_id'], __message['header']['trade_id'], __message)
                    else:
                        self.__sync_order_LM('wait')
        else:
            self.__QA_backtest_log_info(
                'FROM BACKTEST: Order Queue is empty at %s!' % self.now)
            pass
QABacktestclass.py 文件源码 项目:QUANTAXIS 作者: yutiansut 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def before_backtest(self):
            global start_time
            start_time = datetime.now()
            global risk_position

            print(self.market_data_hashable)
            input()
QABacktestclass.py 文件源码 项目:QUANTAXIS 作者: yutiansut 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def end_backtest(self):
            global start_time
            end_time = datetime.now()
            cost_time = (end_time - start_time).total_seconds()
            QA.QA_util_log_info('???? {} {}'.format(cost_time, 'seconds'))

            self.if_save_to_csv = True
            self.if_save_to_mongo = True
recipe-578977.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def SqlExecute(conn, sqlquery=''):

    """
    Executes sqlquery and returns lists with column names and data
    The connection info is passed as a dictionary with these required keys:
    servername, username,password
    If username is empty will use integrated security
    These keys are optional: defdb, colseparator
    """

    if 'colseparator' not in conn.keys():
        conn['colseparator'] = chr(1)
    if conn['username'] == '':
        constr = "sqlcmd -E -S" + conn['servername'] + "  /w 8192 -W " + ' -s' + conn['colseparator'] + '  '
    else:
        constr = "sqlcmd -U" + conn['username'] + " -P" + conn['password'] + ' -S' + conn['servername'] + '  /w 8192 -W  -s' + conn['colseparator'] + '  '

    # now we execute
    try:
        data = subprocess.Popen(constr + '-Q"' + sqlquery + '"', stdout=subprocess.PIPE).communicate()
    except Exception as inst:
        print('Exception in SqlExecute:', inst)
        return -1

    records = []
    lst = data[0].splitlines()
    # lst[0] column names;  lst[1] dashed lines, (skip); lst[2:] data
    # now we decode
    for x in lst:
        try:
            #try default utf-8 decoding
            line = x.decode()
        except UnicodeDecodeError:
            #in case of weird characters this one works most of the time
            line = x.decode('ISO-8859-1')
        lst2 = line.split(conn['colseparator'])
        records.append(lst2)
    fieldnames = records[0]
    data = records[2:]

    return data, fieldnames
recipe-578977.py 文件源码 项目:code 作者: ActiveState 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def DatedString():
    """
    Returns dated string with this format
    2014_12_30_135857_4581860
    """

    from datetime import datetime
    now = str(datetime.now())
    now = now.replace('-', '_')
    now = now.replace(' ', '_')
    now = now.replace(':', '')
    now = now.replace('.', '_') + '0'

    return now
client.py 文件源码 项目:Splunk_CBER_App 作者: MHaggis 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def restart(self, timeout=None):
        """Restarts this Splunk instance.

        The service is unavailable until it has successfully restarted.

        If a *timeout* value is specified, ``restart`` blocks until the service
        resumes or the timeout period has been exceeded. Otherwise, ``restart`` returns
        immediately.

        :param timeout: A timeout period, in seconds.
        :type timeout: ``integer``
        """
        msg = { "value": "Restart requested by " + self.username + "via the Splunk SDK for Python"}
        # This message will be deleted once the server actually restarts.
        self.messages.create(name="restart_required", **msg)
        result = self.post("/services/server/control/restart")
        if timeout is None: 
            return result
        start = datetime.now()
        diff = timedelta(seconds=timeout)
        while datetime.now() - start < diff:
            try:
                self.login()
                if not self.restart_required:
                    return result
            except Exception, e:
                sleep(1)
        raise Exception, "Operation time out."
client.py 文件源码 项目:Splunk_CBER_App 作者: MHaggis 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def search(self, query, **kwargs):
        """Runs a search using a search query and any optional arguments you
        provide, and returns a `Job` object representing the search.

        :param query: A search query.
        :type query: ``string``
        :param kwargs: Arguments for the search (optional):

            * "output_mode" (``string``): Specifies the output format of the
              results.

            * "earliest_time" (``string``): Specifies the earliest time in the
              time range to
              search. The time string can be a UTC time (with fractional
              seconds), a relative time specifier (to now), or a formatted
              time string.

            * "latest_time" (``string``): Specifies the latest time in the time
              range to
              search. The time string can be a UTC time (with fractional
              seconds), a relative time specifier (to now), or a formatted
              time string.

            * "rf" (``string``): Specifies one or more fields to add to the
              search.

        :type kwargs: ``dict``
        :rtype: class:`Job`
        :returns: An object representing the created job.
        """
        return self.jobs.create(query, **kwargs)
client.py 文件源码 项目:Splunk_CBER_App 作者: MHaggis 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def touch(self):
        """Extends the expiration time of the search to the current time (now) plus
        the time-to-live (ttl) value.

        :return: The :class:`Job`.
        """
        self.post("control", action="touch")
        return self
sunrise.py 文件源码 项目:FlipDotWorker 作者: ArduinoHannover 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def sunrise(self,when=None):
  """
  return the time of sunrise as a datetime.time object
  when is a datetime.datetime object. If none is given
  a local time zone is assumed (including daylight saving
  if present)
  """
  if when is None : when = datetime.now(tz=LocalTimezone())
  self.__preptime(when)
  self.__calc()
  return sun.__timefromdecimalday(self.sunrise_t)
sunrise.py 文件源码 项目:FlipDotWorker 作者: ArduinoHannover 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def sunset(self,when=None):
  if when is None : when = datetime.now(tz=LocalTimezone())
  self.__preptime(when)
  self.__calc()
  return sun.__timefromdecimalday(self.sunset_t)
sunrise.py 文件源码 项目:FlipDotWorker 作者: ArduinoHannover 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def solarnoon(self,when=None):
  if when is None : when = datetime.now(tz=LocalTimezone())
  self.__preptime(when)
  self.__calc()
  return sun.__timefromdecimalday(self.solarnoon_t)
helper.py 文件源码 项目:plexivity 作者: mutschler 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def generateSSLCert():
    if not os.path.exists(os.path.join(config.DATA_DIR, 'plexivity.key')) or not os.path.exists(os.path.join(config.DATA_DIR, 'plexivity.crt')):
        logger.warning("plexivity was started with ssl support but no cert was found, trying to generating cert and key now")
        try:
            from OpenSSL import crypto, SSL
            from socket import gethostname


            # create a key pair
            k = crypto.PKey()
            k.generate_key(crypto.TYPE_RSA, 1024)

            # create a self-signed cert
            cert = crypto.X509()
            cert.get_subject().C = "US"
            cert.get_subject().ST = "plex land"
            cert.get_subject().L = "plex land"
            cert.get_subject().O = "plexivity"
            cert.get_subject().OU = "plexivity"
            cert.get_subject().CN = gethostname()
            cert.set_serial_number(1000)
            cert.gmtime_adj_notBefore(0)
            cert.gmtime_adj_notAfter(10*365*24*60*60)
            cert.set_issuer(cert.get_subject())
            cert.set_pubkey(k)
            cert.sign(k, 'sha1')

            open(os.path.join(config.DATA_DIR, 'plexivity.crt'), "wt").write(crypto.dump_certificate(crypto.FILETYPE_PEM, cert))
            open(os.path.join(config.DATA_DIR, 'plexivity.key'), "wt").write(crypto.dump_privatekey(crypto.FILETYPE_PEM, k))
            logger.info("ssl cert and key generated and saved to: %s" % config.DATA_DIR)
        except:
            logger.error("unable to generate ssl key and cert")
helper.py 文件源码 项目:plexivity 作者: mutschler 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def startScheduler():
    db.create_all()
    #create default roles!
    if not db.session.query(models.Role).filter(models.Role.name == "admin").first():
        admin_role = models.Role(name='admin', description='Administrator Role')
        user_role = models.Role(name='user', description='User Role')
        db.session.add(admin_role)
        db.session.add(user_role)
        db.session.commit()

    try:
        import tzlocal

        tz = tzlocal.get_localzone()
        logger.info("local timezone: %s" % tz)
    except:
        tz = None

    if not tz or tz.zone == "local":
        logger.error('Local timezone name could not be determined. Scheduler will display times in UTC for any log'
                 'messages. To resolve this set up /etc/timezone with correct time zone name.')
        tz = pytz.utc
    #in debug mode this is executed twice :(
    #DONT run flask in auto reload mode when testing this!
    scheduler = BackgroundScheduler(logger=sched_logger, timezone=tz)
    scheduler.add_job(notify.task, 'interval', seconds=config.SCAN_INTERVAL, max_instances=1,
                      start_date=datetime.datetime.now(tz) + datetime.timedelta(seconds=2))
    scheduler.start()
    sched = scheduler
    #notify.task()


问题


面经


文章

微信
公众号

扫码关注公众号