def schedule_match():
try:
schedule.every(30).seconds.do(refresh_Scorecard).tag('score_updates', 'task')
while 1:
schedule.run_pending()
time.sleep(1)
except KeyboardInterrupt:
quit()
# method used to cancel the schedule match
python类every()的实例源码
def reloadSchedule():
with scheduleLock:
schedule.clear()
activeSched = None
with thermostatLock:
thermoSched = JsonStore( "thermostat_schedule.json" )
if holdControl != "down" :
if heatControl.state == "down":
activeSched = thermoSched[ "heat" ]
log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_CUSTOM + "/load", "heat" )
if useTestSchedule:
activeSched = getTestSchedule()
log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_CUSTOM + "/load", "test" )
print "Using Test Schedule!!!"
if activeSched != None:
for day, entries in activeSched.iteritems():
for i, entry in enumerate( entries ):
getattr( schedule.every(), day ).at( entry[ 0 ] ).do( setScheduledTemp, entry[ 1 ] )
log( LOG_LEVEL_DEBUG, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_TEXT, "Set " + day + ", at: " + entry[ 0 ] + " = " + str( entry[ 1 ] ) + scaleUnits )
##############################################################################
# #
# Web Server Interface #
# #
##############################################################################
##############################################################################
# encoding: UTF-8 #
# Form based authentication for CherryPy. Requires the #
# Session tool to be loaded. #
##############################################################################
def with_heartbeat(fn):
# timedtask wrapper
def call_func(*args):
loop = 0
fn(*args)
schedule.every(HEARTBEAT).seconds.do(fn, *args)
while 1:
print("#" * 15, "loop:%s" % loop, "#" * 15)
schedule.run_pending()
sleep(HEARTBEAT)
loop += 1
return call_func
def with_heartbeat_30s(fn):
# timedtask wrapper
def call_func(*args):
HEARTBEAT = 30
loop = 0
fn(*args)
schedule.every(HEARTBEAT).seconds.do(fn, *args)
while 1:
print("#" * 15, "loop:%s" % loop, "#" * 15)
schedule.run_pending()
sleep(HEARTBEAT)
loop += 1
return call_func
def with_heartbeat_1d(fn):
# timedtask wrapper
def call_func(*args):
HEARTBEAT = 30
loop = 0
fn(*args)
schedule.every().day.at("10:30").do(fn, *args)
while 1:
print("#" * 15, "loop:%s" % loop, "#" * 15)
schedule.run_pending()
sleep(HEARTBEAT)
loop += 1
return call_func
def handle(self, *args, **options):
schedule.every(1).hours.do(tasks.kill_obsolete_timers)
schedule.every(5).seconds.do(tasks.send_reminders)
self.stdout.write(self.style.SUCCESS('Starting job runner...'))
while True:
time.sleep(1)
try:
schedule.run_pending()
except:
traceback.print_exc()
def reloadSchedule():
with scheduleLock:
schedule.clear()
activeSched = None
with thermostatLock:
thermoSched = JsonStore( "thermostat_schedule.json" )
if holdControl.state != "down":
if heatControl.state == "down":
activeSched = thermoSched[ "heat" ]
log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_CUSTOM + "/load", "heat" )
elif coolControl.state == "down":
activeSched = thermoSched[ "cool" ]
log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_CUSTOM + "/load", "cool" )
if useTestSchedule:
activeSched = getTestSchedule()
log( LOG_LEVEL_INFO, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_CUSTOM + "/load", "test" )
print "Using Test Schedule!!!"
if activeSched != None:
for day, entries in activeSched.iteritems():
for i, entry in enumerate( entries ):
getattr( schedule.every(), day ).at( entry[ 0 ] ).do( setScheduledTemp, entry[ 1 ] )
log( LOG_LEVEL_DEBUG, CHILD_DEVICE_SCHEDULER, MSG_SUBTYPE_TEXT, "Set " + day + ", at: " + entry[ 0 ] + " = " + str( entry[ 1 ] ) + scaleUnits )
##############################################################################
# #
# Web Server Interface #
# #
##############################################################################
def schedule_actions():
# Example: nohup python MyScheduledProgram.py &> ~/Desktop/output.log
# ps auxw to see running ones
print(datetime.datetime.now())
print("Starting to run")
times = ['6:07', '06:24']
# Buy today's positions
for set_time in times:
schedule.every().monday.at(set_time).do(action)
schedule.every().tuesday.at(set_time).do(action)
schedule.every().wednesday.at(set_time).do(action)
schedule.every().thursday.at(set_time).do(action)
schedule.every().friday.at(set_time).do(action)
# Sell yesterday's positions
set_time = '06:01'
schedule.every().monday.at(set_time).do(sell_scheduled)
schedule.every().tuesday.at(set_time).do(sell_scheduled)
schedule.every().wednesday.at(set_time).do(sell_scheduled)
schedule.every().thursday.at(set_time).do(sell_scheduled)
schedule.every().friday.at(set_time).do(sell_scheduled)
while True:
schedule.run_pending()
sys.stdout.flush()
time.sleep(1) # Check every 1 second
def run(self):
schedule.every(1).seconds.do(self.run_threaded, self.run_generator_dispatch)
schedule.every(1).seconds.do(self.run_threaded, self.run_processor_dispatch)
schedule.every(1).seconds.do(self.run_threaded, self.run_query_project_status)
while True:
schedule.run_pending()
time.sleep(1)
def job():
db.session.query(UserAlbum).delete()
# schedule.every(1).minutes.do(job)
def schedule_function(self, func, time):
print("Scheduled function " + func.__name__ + " at " + time)
schedule.every().day.at(time).do(self.run_on_trading_day, func)
def schedule_checks(checkers):
schedule.clear()
for checker in checkers:
conf = checker.conf
period = conf["period"]
logger.info(
"Scheduling checks for %r every %r seconds",
conf["name"],
period,
)
schedule.every(period).seconds.do(checker.check)
def run(self):
def delete_job():
try:
Mail.delete_one_day_ago()
except:
logger.exception("Exception raised in MailDeleteBatch#run()#delete_job()")
raise
schedule.every().hour.do(delete_job)
while True:
schedule.run_pending()
time.sleep(1)
def write_new_file_and_scrape_all_data(filename, link_list, num_retries = 10):
"""
Writes a new file, scrapes data from every product link in a list, and
appends each product's data to the previously created file.
"""
open_new_file(filename)
scrape_all_data_from_all_featured_products(filename, link_list, num_retries)
def write_new_file_and_dynamically_scrape_all_data(filename,
link_list,
interval,
num_retries = 10):
"""
Writes a new file and repeatedly runs the scraper every time the specified
interval has passed and continuously appends the data to a file.
"""
open_new_file(filename)
dynamically_scrape_data(filename, link_list, num_retries, interval)
def write_new_file_update_links_and_dynamically_scrape(filename,
interval,
num_retries = 10):
"""
Writes a new file and repeatedly updates the link list and runs the scraper
every time the specified interval has passed and continuously appends the
data to a file.
"""
open_new_file(filename)
clean_links_and_dynamically_scrape(filename, interval, num_retries)
def dynamically_scrape_and_append_sales_data(filename,
interval,
num_retries = 10):
"""
Dynamically scrapes sales data and appends the data to a file by generating
a list of links, checking it against an old list and only keeping new links,
and scraping those links for sales data.
"""
old_list = []
def job(old_list):
new_list = collect_all_featured_links()
new_links = remove_old_links(old_list, new_list)
bad_links = collect_bad_links(new_links)
clean_links = remove_bad_links_from_link_list(bad_links, new_links)
scrape_and_append_sales_data_from_featured_links(filename,
clean_links,
num_retries)
old_list = new_list
job(old_list)
schedule.every(interval).hours.do(job)
while True:
schedule.run_pending()
time.sleep(30)
print "Dynamic scraping finished"
def scrape_combined_data_from_all_featured_products(data_filename,
sales_filename,
link_list,
num_retries = 10):
"""
Scrapes all data from every featured product and appends that data to
their respective files.
"""
for url in link_list:
scrape_and_append_combined_data(url,
data_filename,
sales_filename,
num_retries)
def dynamically_scrape_combined_data(data_filename,
sales_filename,
interval,
num_retries = 10):
"""
Dynamically scrapes a continuously updated list of unique clean links and
appends the data to their respective files.
"""
old_list = []
def job(old_list):
new_list = collect_all_featured_links()
new_links = remove_old_links(old_list, new_list)
bad_links = collect_bad_links(new_links)
clean_links = remove_bad_links_from_link_list(bad_links, new_links)
scrape_combined_data_from_all_featured_products(data_filename,
sales_filename,
clean_links,
num_retries)
old_list = new_list
job(old_list)
schedule.every(interval).hours.do(job)
while True:
schedule.run_pending()
time.sleep(30)
print "Dynamic scraping finished"
def main():
if not _config.is_registered():
register()
refreshinstalled()
system_notify()
schedule.every(2).hours.do(refreshinstalled)
schedule.every(10).minutes.do(system_notify)
schedule.every(30).seconds.do(do_update)
while True:
schedule.run_pending()
sleep(5)