python类run_pending()的实例源码

cricket.py 文件源码 项目:crickbuzz_cricket_score 作者: LinuxTerminali 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def schedule_match():
    try:
        schedule.every(30).seconds.do(refresh_Scorecard).tag('score_updates', 'task')
        while 1:
            schedule.run_pending()
            time.sleep(1)
    except KeyboardInterrupt:
        quit()

# method used to cancel the schedule match
thermostat.py 文件源码 项目:thermostat_ita 作者: jpnos26 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def startScheduler():
    log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_TEXT, "Started" )
    while True:
        if holdControl.state == "normal":
            with scheduleLock:
                log( LOG_LEVEL_DEBUG, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_TEXT, "Running pending" )
                schedule.run_pending()

        time.sleep( 10 )
byr_util.py 文件源码 项目:BYR_BBS_Spider 作者: Dogless-plus 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def with_heartbeat(fn):
    # timedtask wrapper
    def call_func(*args):
        loop = 0
        fn(*args)
        schedule.every(HEARTBEAT).seconds.do(fn, *args)
        while 1:
            print("#" * 15, "loop:%s" % loop, "#" * 15)
            schedule.run_pending()
            sleep(HEARTBEAT)
            loop += 1

    return call_func
top_ten.py 文件源码 项目:BYR_BBS_Spider 作者: Dogless-plus 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def with_heartbeat_30s(fn):
    # timedtask wrapper
    def call_func(*args):
        HEARTBEAT = 30
        loop = 0
        fn(*args)
        schedule.every(HEARTBEAT).seconds.do(fn, *args)
        while 1:
            print("#" * 15, "loop:%s" % loop, "#" * 15)
            schedule.run_pending()
            sleep(HEARTBEAT)
            loop += 1

    return call_func
top_ten.py 文件源码 项目:BYR_BBS_Spider 作者: Dogless-plus 项目源码 文件源码 阅读 14 收藏 0 点赞 0 评论 0
def with_heartbeat_1d(fn):
    # timedtask wrapper
    def call_func(*args):
        HEARTBEAT = 30
        loop = 0
        fn(*args)
        schedule.every().day.at("10:30").do(fn, *args)
        while 1:
            print("#" * 15, "loop:%s" % loop, "#" * 15)
            schedule.run_pending()
            sleep(HEARTBEAT)
            loop += 1

    return call_func
trax_schedule.py 文件源码 项目:trax 作者: EliotBerriot 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def handle(self, *args, **options):
        schedule.every(1).hours.do(tasks.kill_obsolete_timers)
        schedule.every(5).seconds.do(tasks.send_reminders)

        self.stdout.write(self.style.SUCCESS('Starting job runner...'))
        while True:
            time.sleep(1)
            try:
                schedule.run_pending()
            except:
                traceback.print_exc()
thermostat.py 文件源码 项目:RaspberryPiThermostat 作者: scottpav 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def startScheduler():
    log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_TEXT, "Started" )
    while True:
        if holdControl.state == "normal":
            with scheduleLock:
                log( LOG_LEVEL_DEBUG, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_TEXT, "Running pending" )
                schedule.run_pending()

        time.sleep( 10 )
bot.py 文件源码 项目:SoftcatalaTelegramBot 作者: Softcatala 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def spawn():
    while True:
        schedule.run_pending()
        time.sleep(1)
background.py 文件源码 项目:robin-folios 作者: jshoe 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def schedule_actions():
    # Example:     nohup python MyScheduledProgram.py &> ~/Desktop/output.log
    # ps auxw to see running ones
    print(datetime.datetime.now())
    print("Starting to run")
    times = ['6:07', '06:24']

    # Buy today's positions
    for set_time in times:
        schedule.every().monday.at(set_time).do(action)
        schedule.every().tuesday.at(set_time).do(action)
        schedule.every().wednesday.at(set_time).do(action)
        schedule.every().thursday.at(set_time).do(action)
        schedule.every().friday.at(set_time).do(action)

    # Sell yesterday's positions
    set_time = '06:01'
    schedule.every().monday.at(set_time).do(sell_scheduled)
    schedule.every().tuesday.at(set_time).do(sell_scheduled)
    schedule.every().wednesday.at(set_time).do(sell_scheduled)
    schedule.every().thursday.at(set_time).do(sell_scheduled)
    schedule.every().friday.at(set_time).do(sell_scheduled)

    while True:
        schedule.run_pending()
        sys.stdout.flush()
        time.sleep(1) # Check every 1 second
scheduler.py 文件源码 项目:web-dev-for-data-scientists 作者: realpython 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run():
    while True:
        schedule.run_pending()
        time.sleep(1)
ProjectFlask.py 文件源码 项目:Project1 作者: GeldofCaitlin 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run_schedule():
    while 1:
        schedule.run_pending()
        time.sleep(1)
scheduler.py 文件源码 项目:xspider 作者: zym1115718204 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self):
        schedule.every(1).seconds.do(self.run_threaded, self.run_generator_dispatch)
        schedule.every(1).seconds.do(self.run_threaded, self.run_processor_dispatch)
        schedule.every(1).seconds.do(self.run_threaded, self.run_query_project_status)

        while True:
            schedule.run_pending()
            time.sleep(1)
maildelete.py 文件源码 项目:sh8email-django 作者: triplepy 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self):
        def delete_job():
            try:
                Mail.delete_one_day_ago()
            except:
                logger.exception("Exception raised in MailDeleteBatch#run()#delete_job()")
                raise

        schedule.every().hour.do(delete_job)

        while True:
            schedule.run_pending()
            time.sleep(1)
collect_sales_data.py 文件源码 项目:ebay 作者: fgscivittaro 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def dynamically_scrape_and_append_sales_data(filename,
                                             interval,
                                             num_retries = 10):
    """
    Dynamically scrapes sales data and appends the data to a file by generating
    a list of links, checking it against an old list and only keeping new links,
    and scraping those links for sales data.
    """

    old_list = []

    def job(old_list):
        new_list = collect_all_featured_links()
        new_links = remove_old_links(old_list, new_list)
        bad_links = collect_bad_links(new_links)
        clean_links = remove_bad_links_from_link_list(bad_links, new_links)

        scrape_and_append_sales_data_from_featured_links(filename,
                                                         clean_links,
                                                         num_retries)

        old_list = new_list

    job(old_list)
    schedule.every(interval).hours.do(job)

    while True:
        schedule.run_pending()
        time.sleep(30)

    print "Dynamic scraping finished"
combined.py 文件源码 项目:ebay 作者: fgscivittaro 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def dynamically_scrape_combined_data(data_filename,
                                     sales_filename,
                                     interval,
                                     num_retries = 10):
    """
    Dynamically scrapes a continuously updated list of unique clean links and
    appends the data to their respective files.
    """

    old_list = []

    def job(old_list):
        new_list = collect_all_featured_links()
        new_links = remove_old_links(old_list, new_list)
        bad_links = collect_bad_links(new_links)
        clean_links = remove_bad_links_from_link_list(bad_links, new_links)

        scrape_combined_data_from_all_featured_products(data_filename,
                                                        sales_filename,
                                                        clean_links,
                                                        num_retries)

        old_list = new_list

    job(old_list)
    schedule.every(interval).hours.do(job)

    while True:
        schedule.run_pending()
        time.sleep(30)

    print "Dynamic scraping finished"
agent.py 文件源码 项目:agent 作者: upd89 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def main():
    if not _config.is_registered():
        register()
    refreshinstalled()
    system_notify()
    schedule.every(2).hours.do(refreshinstalled)
    schedule.every(10).minutes.do(system_notify)
    schedule.every(30).seconds.do(do_update)

    while True:
        schedule.run_pending()
        sleep(5)
d_con.py 文件源码 项目:PiAlarm 作者: KyleKing 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def run_sched():
    """Loop through the schedule to check if new task"""
    global running
    # schedule.every().hour.do(job)
    # schedule.every().day.at("10:30").do(job)
    while running:
        schedule.run_pending()
        sleep(1)
lcd.py 文件源码 项目:PiAlarm 作者: KyleKing 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run_sched(self):
        """Loop through the schedule to check if new task"""
        cg.send('> Started Thread w/ self._c = {}'.format(self._checkSchedule))
        while self._checkSchedule:
            schedule.run_pending()
            sleep(1)
        cg.send('> Ended thread w/ self._c = {}'.format(self._checkSchedule))
einhorn.py 文件源码 项目:einhorn_fuer_deutschland 作者: mackaiver 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def main():

    schedule.every(10).minutes.do(fck_afd)
    schedule.every().day.at("12:00").do(tweet_time_of_day)

    while True:
        schedule.run_pending()
        time.sleep(60)
delete_cache.py 文件源码 项目:OpenFinData 作者: FedorArbuzovHSE 项目源码 文件源码 阅读 15 收藏 0 点赞 0 评论 0
def delete_cache(first_time='0:05', second_time='1:05'):
    try:
        # Deleting folders from 1am to 23pm at 24.05 o'clock
        schedule.every().day.at(first_time).do(delete_unnecessary_folders23)
        # Deleting folders from which we created during 24th hour at 1.05 o'clock
        schedule.every().day.at(second_time).do(delete_unnecessary_folders24)

        while 1:
            schedule.run_pending()
            sleep(50)
    except Exception as e:
        print("{0} Error: {1}".format(strftime("%Y-%m-%d %H:%M:%S", gmtime()), e))
        schedule.clear()
        print('Notification: Timer is reset')
        delete_cache()


问题


面经


文章

微信
公众号

扫码关注公众号