python类scheduler()的实例源码

newcron.py 文件源码 项目:Problematica-public 作者: TechMaz 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self):
        s = sched.scheduler(time.time, time.sleep)
        logger.info('Hard cron daemon started')
        while not _cron_stopping:
            now = time.time()
            s.enter(60 - now % 60, 1, self.launch, ())
            s.run()
cache_invalidator.py 文件源码 项目:capstone 作者: rackerlabs 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def main():
    try:
        client = CloudFeedClient(CONF.rackspace.feed_url)
        sc = sched.scheduler(time.time, time.sleep)
        # Method enter(delay, priority, action, argument)
        sc.enter(0, 1, start, (sc, client))
        sc.run()
    except RuntimeError as e:
        raise SystemExit(e.message)
aDTN.py 文件源码 项目:aDTN-python 作者: megfault 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def __init__(self, batch_size, sending_interval, wireless_interface, data_store):
        """
        Initialize an aDTN instance and its respective key manager and message store, as well as a sending message pool
        from which the next sending batch gets generated.

        Define aDTNInnerPacket to be the payload of aDTNPacket. Define aDTNPacket to be the payload of Ethernet frames
        of type 0xcafe.

        Set up a scheduler to handle message sending.
        Define a thread to handle received messages.

        The wireless interface should be previously set to ad-hoc mode and its ESSID should be the same in other devices
        running aDTN.
        :param batch_size: number of packets to transmit at each sending operation
        :param sending_interval: number of seconds between two sending operations
        :param wireless_interface: wireless interface to send and receive packets
        """
        self._batch_size = batch_size
        self._sending_freq = sending_interval
        self._wireless_interface = wireless_interface
        self._km = KeyManager()
        self.data_store = DataStore(data_store)
        self._sending_pool = []
        self._scheduler = sched.scheduler(time, sleep)
        self._sending = None
        self._sniffing = None
        self._thread_send = None
        self._thread_receive = None
        self._sent_pkt_counter = None
        self._received_pkt_counter = None
        self._decrypted_pkt_counter = None
        self._start_time = None
        self._mac_address = macget(getcard(wireless_interface))
        self._sending_socket = L2Socket(iface=self._wireless_interface)
        bind_layers(aDTNPacket, aDTNInnerPacket)
        bind_layers(Ether, aDTNPacket, type=ETHERTYPE)
        log_debug("MAC address in use: {}".format(self._mac_address))
        self._stats_file_name = '{}_{}.stats'.format(batch_size, sending_interval)
updater.py 文件源码 项目:Cayenne-Agent 作者: myDevicesIoT 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, config, onUpdateConfig = None):

        #disable debug after testing is finished
        #setDebug()
        Thread.__init__(self, name='updater')
        self.setDaemon(True)
        self.appSettings = config
        self.onUpdateConfig = onUpdateConfig
        self.env = self.appSettings.get('Agent','Environment', fallback='live')
        global SETUP_URL
        global UPDATE_URL
        global TIME_TO_CHECK

        if self.env == "live":
            SETUP_URL = SETUP_URL + SETUP_NAME
        else:
            SETUP_URL = SETUP_URL + self.env + "_" + SETUP_NAME
            UPDATE_URL = UPDATE_URL + self.env

        if 'UpdateUrl' in config.cloudConfig:
            UPDATE_URL = config.cloudConfig.UpdateUrl
        if 'UpdateCheckRate' in config.cloudConfig:
            interval = int(config.cloudConfig.UpdateCheckRate)
            TIME_TO_CHECK = interval + random.randint(0, interval*10)
        if 'SetupUrl' in config.cloudConfig:
            SETUP_URL = config.cloudConfig.SetupUrl
        self.scheduler = scheduler(time, sleep)
        self.Continue = True
        self.currentVersion = ''
        self.newVersion = ''
        self.downloadUrl = ''
        self.UpdateCleanup()
        self.startTime = datetime.now() - timedelta(days=1)
updater.py 文件源码 项目:Cayenne-Agent 作者: myDevicesIoT 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self):
        debug('UpdaterThread started')
        while self.Continue:
            sleep(TIME_TO_SLEEP)
            self.SetupUpdater()
            self.scheduler.run()
        debug('UpdaterThread finished')
updater.py 文件源码 项目:Cayenne-Agent 作者: myDevicesIoT 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def SetupUpdater(self):
        self.scheduler.enter(TIME_TO_CHECK, 1, self.CheckUpdate, ())
avs.py 文件源码 项目:python-avs 作者: lddias 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def recognize_speech(self):
        """
        send recognize speech event and process the response

        :param speech: file-like containing speech for request
        :param mic_stop_event: threading.Event when speech is an infinite stream, to monitor for signal from
                               downchannel stream to end the recognize request.
        """
        if self.speech_profile not in SPEECH_CLOUD_ENDPOINTING_PROFILES:
            if self.expect_speech_timeout_event:
                self.scheduler.cancel(self.expect_speech_timeout_event)
        self._audio_input_device.start_recording()
        self.handle_parts(self.send_event_parse_response(self._generate_recognize_payload(self._audio_input_device)))
        logger.debug("Recognize dialog ID: {}".format(self._current_dialog_request_id))
avs.py 文件源码 项目:python-avs 作者: lddias 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def send_ping(self):
        """
        self-scheduling task to send http2 PING every _PING_RATE seconds
        """
        logger.debug("PINGING AVS")
        self._connection.ping(b'\x00' * 8)
        logger.info("PINGED AVS")
        self.scheduler.enter(_PING_RATE, 1, self.send_ping)
avs.py 文件源码 项目:python-avs 作者: lddias 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def run(self):
        """
        main loop for AVS client

        1. checks for any expired scheduled tasks that need to run
        2. handles outstanding directives
        3. runs one iteration of audio player state-machine loop

        :return:
        """
        self.scheduler.run(blocking=False)
        self._handle_directives()
        self.player.run()
test_sched.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_enter(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
        for x in [0.5, 0.4, 0.3, 0.2, 0.1]:
            z = scheduler.enter(x, 1, fun, (x,))
        scheduler.run()
        self.assertEqual(l, [0.1, 0.2, 0.3, 0.4, 0.5])
test_sched.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_enterabs(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
            z = scheduler.enterabs(x, 1, fun, (x,))
        scheduler.run()
        self.assertEqual(l, [0.01, 0.02, 0.03, 0.04, 0.05])
test_sched.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_enter_concurrent(self):
        q = queue.Queue()
        fun = q.put
        timer = Timer()
        scheduler = sched.scheduler(timer.time, timer.sleep)
        scheduler.enter(1, 1, fun, (1,))
        scheduler.enter(3, 1, fun, (3,))
        t = threading.Thread(target=scheduler.run)
        t.start()
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 1)
        self.assertTrue(q.empty())
        for x in [4, 5, 2]:
            z = scheduler.enter(x - 1, 1, fun, (x,))
        timer.advance(2)
        self.assertEqual(q.get(timeout=TIMEOUT), 2)
        self.assertEqual(q.get(timeout=TIMEOUT), 3)
        self.assertTrue(q.empty())
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 4)
        self.assertTrue(q.empty())
        timer.advance(1)
        self.assertEqual(q.get(timeout=TIMEOUT), 5)
        self.assertTrue(q.empty())
        timer.advance(1000)
        t.join(timeout=TIMEOUT)
        self.assertFalse(t.is_alive())
        self.assertTrue(q.empty())
        self.assertEqual(timer.time(), 5)
test_sched.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_priority(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
        for priority in [1, 2, 3, 4, 5]:
            z = scheduler.enterabs(0.01, priority, fun, (priority,))
        scheduler.run()
        self.assertEqual(l, [1, 2, 3, 4, 5])
test_sched.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_cancel(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
        now = time.time()
        event1 = scheduler.enterabs(now + 0.01, 1, fun, (0.01,))
        event2 = scheduler.enterabs(now + 0.02, 1, fun, (0.02,))
        event3 = scheduler.enterabs(now + 0.03, 1, fun, (0.03,))
        event4 = scheduler.enterabs(now + 0.04, 1, fun, (0.04,))
        event5 = scheduler.enterabs(now + 0.05, 1, fun, (0.05,))
        scheduler.cancel(event1)
        scheduler.cancel(event5)
        scheduler.run()
        self.assertEqual(l, [0.02, 0.03, 0.04])
test_sched.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_empty(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
        self.assertTrue(scheduler.empty())
        for x in [0.05, 0.04, 0.03, 0.02, 0.01]:
            z = scheduler.enterabs(x, 1, fun, (x,))
        self.assertFalse(scheduler.empty())
        scheduler.run()
        self.assertTrue(scheduler.empty())
test_sched.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_queue(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
        now = time.time()
        e5 = scheduler.enterabs(now + 0.05, 1, fun)
        e1 = scheduler.enterabs(now + 0.01, 1, fun)
        e2 = scheduler.enterabs(now + 0.02, 1, fun)
        e4 = scheduler.enterabs(now + 0.04, 1, fun)
        e3 = scheduler.enterabs(now + 0.03, 1, fun)
        # queue property is supposed to return an order list of
        # upcoming events
        self.assertEqual(scheduler.queue, [e1, e2, e3, e4, e5])
test_sched.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_args_kwargs(self):
        flag = []

        def fun(*a, **b):
            flag.append(None)
            self.assertEqual(a, (1,2,3))
            self.assertEqual(b, {"foo":1})

        scheduler = sched.scheduler(time.time, time.sleep)
        z = scheduler.enterabs(0.01, 1, fun, argument=(1,2,3), kwargs={"foo":1})
        scheduler.run()
        self.assertEqual(flag, [None])
test_sched.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def test_run_non_blocking(self):
        l = []
        fun = lambda x: l.append(x)
        scheduler = sched.scheduler(time.time, time.sleep)
        for x in [10, 9, 8, 7, 6]:
            scheduler.enter(x, 1, fun, (x,))
        scheduler.run(blocking=False)
        self.assertEqual(l, [])
TimeScheduler.py 文件源码 项目:automatic-repo 作者: WZQ1397 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def fun():
    scheduler.enter(10, 2, print_event, ('10', start))
    scheduler.enter(20, 1, print_event, ('20', start))
newcron.py 文件源码 项目:rekall-agent-server 作者: rekall-innovations 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def run(self):
        s = sched.scheduler(time.time, time.sleep)
        logger.info('Hard cron daemon started')
        while not _cron_stopping:
            now = time.time()
            s.enter(60 - now % 60, 1, self.launch, ())
            s.run()


问题


面经


文章

微信
公众号

扫码关注公众号