def run(self):
s = sched.scheduler(time.time, time.sleep)
logger.info('Hard cron daemon started')
while not _cron_stopping:
now = time.time()
s.enter(60 - now % 60, 1, self.launch, ())
s.run()
python类scheduler()的实例源码
def main():
try:
client = CloudFeedClient(CONF.rackspace.feed_url)
sc = sched.scheduler(time.time, time.sleep)
# Method enter(delay, priority, action, argument)
sc.enter(0, 1, start, (sc, client))
sc.run()
except RuntimeError as e:
raise SystemExit(e.message)
def __init__(self, batch_size, sending_interval, wireless_interface, data_store):
"""
Initialize an aDTN instance and its respective key manager and message store, as well as a sending message pool
from which the next sending batch gets generated.
Define aDTNInnerPacket to be the payload of aDTNPacket. Define aDTNPacket to be the payload of Ethernet frames
of type 0xcafe.
Set up a scheduler to handle message sending.
Define a thread to handle received messages.
The wireless interface should be previously set to ad-hoc mode and its ESSID should be the same in other devices
running aDTN.
:param batch_size: number of packets to transmit at each sending operation
:param sending_interval: number of seconds between two sending operations
:param wireless_interface: wireless interface to send and receive packets
"""
self._batch_size = batch_size
self._sending_freq = sending_interval
self._wireless_interface = wireless_interface
self._km = KeyManager()
self.data_store = DataStore(data_store)
self._sending_pool = []
self._scheduler = sched.scheduler(time, sleep)
self._sending = None
self._sniffing = None
self._thread_send = None
self._thread_receive = None
self._sent_pkt_counter = None
self._received_pkt_counter = None
self._decrypted_pkt_counter = None
self._start_time = None
self._mac_address = macget(getcard(wireless_interface))
self._sending_socket = L2Socket(iface=self._wireless_interface)
bind_layers(aDTNPacket, aDTNInnerPacket)
bind_layers(Ether, aDTNPacket, type=ETHERTYPE)
log_debug("MAC address in use: {}".format(self._mac_address))
self._stats_file_name = '{}_{}.stats'.format(batch_size, sending_interval)
def __init__(self, config, onUpdateConfig = None):
#disable debug after testing is finished
#setDebug()
Thread.__init__(self, name='updater')
self.setDaemon(True)
self.appSettings = config
self.onUpdateConfig = onUpdateConfig
self.env = self.appSettings.get('Agent','Environment', fallback='live')
global SETUP_URL
global UPDATE_URL
global TIME_TO_CHECK
if self.env == "live":
SETUP_URL = SETUP_URL + SETUP_NAME
else:
SETUP_URL = SETUP_URL + self.env + "_" + SETUP_NAME
UPDATE_URL = UPDATE_URL + self.env
if 'UpdateUrl' in config.cloudConfig:
UPDATE_URL = config.cloudConfig.UpdateUrl
if 'UpdateCheckRate' in config.cloudConfig:
interval = int(config.cloudConfig.UpdateCheckRate)
TIME_TO_CHECK = interval + random.randint(0, interval*10)
if 'SetupUrl' in config.cloudConfig:
SETUP_URL = config.cloudConfig.SetupUrl
self.scheduler = scheduler(time, sleep)
self.Continue = True
self.currentVersion = ''
self.newVersion = ''
self.downloadUrl = ''
self.UpdateCleanup()
self.startTime = datetime.now() - timedelta(days=1)
def run(self):
debug('UpdaterThread started')
while self.Continue:
sleep(TIME_TO_SLEEP)
self.SetupUpdater()
self.scheduler.run()
debug('UpdaterThread finished')
def SetupUpdater(self):
self.scheduler.enter(TIME_TO_CHECK, 1, self.CheckUpdate, ())
def recognize_speech(self):
"""
send recognize speech event and process the response
:param speech: file-like containing speech for request
:param mic_stop_event: threading.Event when speech is an infinite stream, to monitor for signal from
downchannel stream to end the recognize request.
"""
if self.speech_profile not in SPEECH_CLOUD_ENDPOINTING_PROFILES:
if self.expect_speech_timeout_event:
self.scheduler.cancel(self.expect_speech_timeout_event)
self._audio_input_device.start_recording()
self.handle_parts(self.send_event_parse_response(self._generate_recognize_payload(self._audio_input_device)))
logger.debug("Recognize dialog ID: {}".format(self._current_dialog_request_id))
def send_ping(self):
"""
self-scheduling task to send http2 PING every _PING_RATE seconds
"""
logger.debug("PINGING AVS")
self._connection.ping(b'\x00' * 8)
logger.info("PINGED AVS")
self.scheduler.enter(_PING_RATE, 1, self.send_ping)
def run(self):
"""
main loop for AVS client
1. checks for any expired scheduled tasks that need to run
2. handles outstanding directives
3. runs one iteration of audio player state-machine loop
:return:
"""
self.scheduler.run(blocking=False)
self._handle_directives()
self.player.run()
def test_enter(self):
l = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
z = scheduler.enter(x, 1, fun, (x,))
scheduler.run()
self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
def test_enterabs(self):
l = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
z = scheduler.enterabs(x, 1, fun, (x,))
scheduler.run()
self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
def test_enter_concurrent(self):
q = queue.Queue()
fun = q.put
timer = Timer()
scheduler = sched.scheduler(timer.time, timer.sleep)
scheduler.enter(1, 1, fun, (1,))
scheduler.enter(3, 1, fun, (3,))
t = threading.Thread(target=scheduler.run)
t.start()
timer.advance(1)
self.assertEqual(q.get(timeout=TIMEOUT), 1)
self.assertTrue(q.empty())
for x in [4, 5, 2]:
z = scheduler.enter(x - 1, 1, fun, (x,))
timer.advance(2)
self.assertEqual(q.get(timeout=TIMEOUT), 2)
self.assertEqual(q.get(timeout=TIMEOUT), 3)
self.assertTrue(q.empty())
timer.advance(1)
self.assertEqual(q.get(timeout=TIMEOUT), 4)
self.assertTrue(q.empty())
timer.advance(1)
self.assertEqual(q.get(timeout=TIMEOUT), 5)
self.assertTrue(q.empty())
timer.advance(1000)
t.join(timeout=TIMEOUT)
self.assertFalse(t.is_alive())
self.assertTrue(q.empty())
self.assertEqual(timer.time(), 5)
def test_priority(self):
l = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
for priority in [1, 2, 3, 4, 5]:
z = scheduler.enterabs(0.01, priority, fun, (priority,))
scheduler.run()
self.assertEqual(l, [1, 2, 3, 4, 5])
def test_cancel(self):
l = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
now = time.time()
event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
scheduler.cancel(event1)
scheduler.cancel(event5)
scheduler.run()
self.assertEqual(l, [0.02, 0.03, 0.04])
def test_empty(self):
l = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
self.assertTrue(scheduler.empty())
for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
z = scheduler.enterabs(x, 1, fun, (x,))
self.assertFalse(scheduler.empty())
scheduler.run()
self.assertTrue(scheduler.empty())
def test_queue(self):
l = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
now = time.time()
e5 = scheduler.enterabs(now + 0.05, 1, fun)
e1 = scheduler.enterabs(now + 0.01, 1, fun)
e2 = scheduler.enterabs(now + 0.02, 1, fun)
e4 = scheduler.enterabs(now + 0.04, 1, fun)
e3 = scheduler.enterabs(now + 0.03, 1, fun)
# queue property is supposed to return an order list of
# upcoming events
self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
def test_args_kwargs(self):
flag = []
def fun(*a, **b):
flag.append(None)
self.assertEqual(a, (1,2,3))
self.assertEqual(b, {"foo":1})
scheduler = sched.scheduler(time.time, time.sleep)
z = scheduler.enterabs(0.01, 1, fun, argument=(1,2,3), kwargs={"foo":1})
scheduler.run()
self.assertEqual(flag, [None])
def test_run_non_blocking(self):
l = []
fun = lambda x: l.append(x)
scheduler = sched.scheduler(time.time, time.sleep)
for x in [10, 9, 8, 7, 6]:
scheduler.enter(x, 1, fun, (x,))
scheduler.run(blocking=False)
self.assertEqual(l, [])
def fun():
scheduler.enter(10, 2, print_event, ('10', start))
scheduler.enter(20, 1, print_event, ('20', start))
def run(self):
s = sched.scheduler(time.time, time.sleep)
logger.info('Hard cron daemon started')
while not _cron_stopping:
now = time.time()
s.enter(60 - now % 60, 1, self.launch, ())
s.run()