python类every()的实例源码

cricket.py 文件源码 项目:crickbuzz_cricket_score 作者: LinuxTerminali 项目源码 文件源码 阅读 83 收藏 0 点赞 0 评论 0
def schedule_match():
    try:
        schedule.every(30).seconds.do(refresh_Scorecard).tag('score_updates', 'task')
        while 1:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        quit()

# method used to cancel the schedule match
thermostat.py 文件源码 项目:thermostat_ita 作者: jpnos26 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def reloadSchedule():
    with scheduleLock:
        schedule.clear()

        activeSched = None

        with thermostatLock:
            thermoSched = JsonStore( "thermostat_schedule.json" )
            if holdControl != "down" :
                if heatControl.state == "down":
                    activeSched = thermoSched[ "heat" ]  
                    log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_CUSTOM + "/load", "heat" )
            if useTestSchedule: 
                activeSched = getTestSchedule()
                log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_CUSTOM + "/load", "test" )
                print "Using Test Schedule!!!"

        if activeSched != None:
            for day, entries in activeSched.iteritems():
                for i, entry in enumerate( entries ):
                    getattr( schedule.every(), day ).at( entry[ 0 ] ).do( setScheduledTemp, entry[ 1 ] )
                    log( LOG_LEVEL_DEBUG, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_TEXT, "Set " + day + ", at: " + entry[ 0 ] + " = " + str( entry[ 1 ] ) + scaleUnits )


##############################################################################
#                                                                            #
#       Web Server Interface                                                 #
#                                                                            #
##############################################################################

##############################################################################
#      encoding: UTF-8                                                       #
# Form based authentication for CherryPy. Requires the                       #
# Session tool to be loaded.                                                 #
##############################################################################
byr_util.py 文件源码 项目:BYR_BBS_Spider 作者: Dogless-plus 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def with_heartbeat(fn):
    # timedtask wrapper
    def call_func(*args):
        loop = 0
        fn(*args)
        schedule.every(HEARTBEAT).seconds.do(fn, *args)
        while 1:
            print("#" * 15, "loop:%s" % loop, "#" * 15)
            schedule.run_pending()
            sleep(HEARTBEAT)
            loop += 1

    return call_func
top_ten.py 文件源码 项目:BYR_BBS_Spider 作者: Dogless-plus 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def with_heartbeat_30s(fn):
    # timedtask wrapper
    def call_func(*args):
        HEARTBEAT = 30
        loop = 0
        fn(*args)
        schedule.every(HEARTBEAT).seconds.do(fn, *args)
        while 1:
            print("#" * 15, "loop:%s" % loop, "#" * 15)
            schedule.run_pending()
            sleep(HEARTBEAT)
            loop += 1

    return call_func
top_ten.py 文件源码 项目:BYR_BBS_Spider 作者: Dogless-plus 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def with_heartbeat_1d(fn):
    # timedtask wrapper
    def call_func(*args):
        HEARTBEAT = 30
        loop = 0
        fn(*args)
        schedule.every().day.at("10:30").do(fn, *args)
        while 1:
            print("#" * 15, "loop:%s" % loop, "#" * 15)
            schedule.run_pending()
            sleep(HEARTBEAT)
            loop += 1

    return call_func
trax_schedule.py 文件源码 项目:trax 作者: EliotBerriot 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        schedule.every(1).hours.do(tasks.kill_obsolete_timers)
        schedule.every(5).seconds.do(tasks.send_reminders)

        self.stdout.write(self.style.SUCCESS('Starting job runner...'))
        while True:
            time.sleep(1)
            try:
                schedule.run_pending()
            except:
                traceback.print_exc()
thermostat.py 文件源码 项目:RaspberryPiThermostat 作者: scottpav 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def reloadSchedule():
    with scheduleLock:
        schedule.clear()

        activeSched = None

        with thermostatLock:
            thermoSched = JsonStore( "thermostat_schedule.json" )

            if holdControl.state != "down":
                if heatControl.state == "down":
                    activeSched = thermoSched[ "heat" ]  
                    log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_CUSTOM + "/load", "heat" )
                elif coolControl.state == "down":
                    activeSched = thermoSched[ "cool" ]  
                    log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_CUSTOM + "/load", "cool" )

                if useTestSchedule: 
                    activeSched = getTestSchedule()
                    log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_CUSTOM + "/load", "test" )
                    print "Using Test Schedule!!!"

        if activeSched != None:
            for day, entries in activeSched.iteritems():
                for i, entry in enumerate( entries ):
                    getattr( schedule.every(), day ).at( entry[ 0 ] ).do( setScheduledTemp, entry[ 1 ] )
                    log( LOG_LEVEL_DEBUG, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_TEXT, "Set " + day + ", at: " + entry[ 0 ] + " = " + str( entry[ 1 ] ) + scaleUnits )


##############################################################################
#                                                                            #
#       Web Server Interface                                                 #
#                                                                            #
##############################################################################
background.py 文件源码 项目:robin-folios 作者: jshoe 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def schedule_actions():
    # Example:     nohup python MyScheduledProgram.py &> ~/Desktop/output.log
    # ps auxw to see running ones
    print(datetime.datetime.now())
    print("Starting to run")
    times = ['6:07', '06:24']

    # Buy today's positions
    for set_time in times:
        schedule.every().monday.at(set_time).do(action)
        schedule.every().tuesday.at(set_time).do(action)
        schedule.every().wednesday.at(set_time).do(action)
        schedule.every().thursday.at(set_time).do(action)
        schedule.every().friday.at(set_time).do(action)

    # Sell yesterday's positions
    set_time = '06:01'
    schedule.every().monday.at(set_time).do(sell_scheduled)
    schedule.every().tuesday.at(set_time).do(sell_scheduled)
    schedule.every().wednesday.at(set_time).do(sell_scheduled)
    schedule.every().thursday.at(set_time).do(sell_scheduled)
    schedule.every().friday.at(set_time).do(sell_scheduled)

    while True:
        schedule.run_pending()
        sys.stdout.flush()
        time.sleep(1) # Check every 1 second
scheduler.py 文件源码 项目:xspider 作者: zym1115718204 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run(self):
        schedule.every(1).seconds.do(self.run_threaded, self.run_generator_dispatch)
        schedule.every(1).seconds.do(self.run_threaded, self.run_processor_dispatch)
        schedule.every(1).seconds.do(self.run_threaded, self.run_query_project_status)

        while True:
            schedule.run_pending()
            time.sleep(1)
scheduler.py 文件源码 项目:happyfridays 作者: jessicaraepetersen 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def job():
    db.session.query(UserAlbum).delete()

# schedule.every(1).minutes.do(job)
base_algo.py 文件源码 项目:IBTrader 作者: quocble 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def schedule_function(self, func, time):
        print("Scheduled function " + func.__name__ + " at " + time)
        schedule.every().day.at(time).do(self.run_on_trading_day, func)
main.py 文件源码 项目:kibitzr 作者: kibitzr 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def schedule_checks(checkers):
    schedule.clear()
    for checker in checkers:
        conf = checker.conf
        period = conf["period"]
        logger.info(
            "Scheduling checks for %r every %r seconds",
            conf["name"],
            period,
        )
        schedule.every(period).seconds.do(checker.check)
maildelete.py 文件源码 项目:sh8email-django 作者: triplepy 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self):
        def delete_job():
            try:
                Mail.delete_one_day_ago()
            except:
                logger.exception("Exception raised in MailDeleteBatch#run()#delete_job()")
                raise

        schedule.every().hour.do(delete_job)

        while True:
            schedule.run_pending()
            time.sleep(1)
collect_data.py 文件源码 项目:ebay 作者: fgscivittaro 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def write_new_file_and_scrape_all_data(filename, link_list, num_retries = 10):
    """
    Writes a new file, scrapes data from every product link in a list, and
    appends each product's data to the previously created file.
    """

    open_new_file(filename)
    scrape_all_data_from_all_featured_products(filename, link_list, num_retries)
collect_data.py 文件源码 项目:ebay 作者: fgscivittaro 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def write_new_file_and_dynamically_scrape_all_data(filename,
                                                   link_list,
                                                   interval,
                                                   num_retries = 10):
    """
    Writes a new file and repeatedly runs the scraper every time the specified
    interval has passed and continuously appends the data to a file.
    """

    open_new_file(filename)
    dynamically_scrape_data(filename, link_list, num_retries, interval)
collect_data.py 文件源码 项目:ebay 作者: fgscivittaro 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def write_new_file_update_links_and_dynamically_scrape(filename,
                                                       interval,
                                                       num_retries = 10):
    """
    Writes a new file and repeatedly updates the link list and runs the scraper
    every time the specified interval has passed and continuously appends the
    data to a file.
    """

    open_new_file(filename)
    clean_links_and_dynamically_scrape(filename, interval, num_retries)
collect_sales_data.py 文件源码 项目:ebay 作者: fgscivittaro 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def dynamically_scrape_and_append_sales_data(filename,
                                             interval,
                                             num_retries = 10):
    """
    Dynamically scrapes sales data and appends the data to a file by generating
    a list of links, checking it against an old list and only keeping new links,
    and scraping those links for sales data.
    """

    old_list = []

    def job(old_list):
        new_list = collect_all_featured_links()
        new_links = remove_old_links(old_list, new_list)
        bad_links = collect_bad_links(new_links)
        clean_links = remove_bad_links_from_link_list(bad_links, new_links)

        scrape_and_append_sales_data_from_featured_links(filename,
                                                         clean_links,
                                                         num_retries)

        old_list = new_list

    job(old_list)
    schedule.every(interval).hours.do(job)

    while True:
        schedule.run_pending()
        time.sleep(30)

    print "Dynamic scraping finished"
combined.py 文件源码 项目:ebay 作者: fgscivittaro 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def scrape_combined_data_from_all_featured_products(data_filename,
                                                    sales_filename,
                                                    link_list,
                                                    num_retries = 10):
    """
    Scrapes all data from every featured product and appends that data to
    their respective files.
    """

    for url in link_list:
        scrape_and_append_combined_data(url,
                                        data_filename,
                                        sales_filename,
                                        num_retries)
combined.py 文件源码 项目:ebay 作者: fgscivittaro 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def dynamically_scrape_combined_data(data_filename,
                                     sales_filename,
                                     interval,
                                     num_retries = 10):
    """
    Dynamically scrapes a continuously updated list of unique clean links and
    appends the data to their respective files.
    """

    old_list = []

    def job(old_list):
        new_list = collect_all_featured_links()
        new_links = remove_old_links(old_list, new_list)
        bad_links = collect_bad_links(new_links)
        clean_links = remove_bad_links_from_link_list(bad_links, new_links)

        scrape_combined_data_from_all_featured_products(data_filename,
                                                        sales_filename,
                                                        clean_links,
                                                        num_retries)

        old_list = new_list

    job(old_list)
    schedule.every(interval).hours.do(job)

    while True:
        schedule.run_pending()
        time.sleep(30)

    print "Dynamic scraping finished"
agent.py 文件源码 项目:agent 作者: upd89 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def main():
    if not _config.is_registered():
        register()
    refreshinstalled()
    system_notify()
    schedule.every(2).hours.do(refreshinstalled)
    schedule.every(10).minutes.do(system_notify)
    schedule.every(30).seconds.do(do_update)

    while True:
        schedule.run_pending()
        sleep(5)


问题


面经


文章

微信
公众号

扫码关注公众号