python类Timer()的实例源码

jenkinslight.py 文件源码 项目:rpi-jenkins-tower-light 作者: BramDriesen 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def blinking():
    if keepalive:
        threading.Timer(10.0, blinking).start()

        # Only blink when we are actually building
        if building or error:
            # If error, blink red.
            if error:
                color = "red"
            else:
                color = "yellow"

            alloff()
            pin = getcode(color)
            GPIO.output(pin, True)
            time.sleep(3)
            GPIO.output(pin, False)


# Check every 10s if we are building, if not or done get latest status
midi_hub_test.py 文件源码 项目:magenta 作者: tensorflow 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def testCaptureSequence_StopTime(self):
    start_time = 1.0
    stop_time = time.time() + 1.0

    self.capture_messages[-1].time += time.time()
    threading.Timer(0.1, self.send_capture_messages).start()

    captured_seq = self.midi_hub.capture_sequence(
        120, start_time, stop_time=stop_time)

    expected_seq = music_pb2.NoteSequence()
    expected_seq.tempos.add(qpm=120)
    expected_seq.total_time = stop_time
    testing_lib.add_track_to_sequence(
        expected_seq, 0,
        [Note(1, 64, 2, 5), Note(2, 64, 3, 4), Note(3, 64, 4, stop_time)])
    self.assertProtoEquals(captured_seq, expected_seq)
utils.py 文件源码 项目:Legion 作者: MooseDojo 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def execWait(cmd, outfile=None, timeout=0):
        result = ""
        env = os.environ
        proc = subprocess.Popen(cmd, executable='/bin/bash', env=env, stderr=subprocess.STDOUT, stdout=subprocess.PIPE, shell=True)

        if timeout:
            timer = threading.Timer(timeout, proc.kill)
            timer.start()
        result = proc.communicate()[0]
        if timeout:
            if timer.is_alive():
                timer.cancel()
        if outfile:
            if Utils.fileExists(outfile):
                print("FILE ALREADY EXISTS!!!!")
            else:
                tmp_result = "\033[0;33m(" + time.strftime(
                    "%Y.%m.%d-%H.%M.%S") + ") <pentest> #\033[0m " + cmd + Utils.newLine() + Utils.newLine() + result
                Utils.writeFile(tmp_result, outfile, 'ab')
        return result
iotclient.py 文件源码 项目:IotCenter 作者: panjanek 项目源码 文件源码 阅读 33 收藏 0 点赞 0 评论 0
def start(self):
        self.deviceHandler.start()
        if self.protocol == "udp":
            self.loadState()        
            self.logger.debug("udpHeartbeatSeconds = {0}".format(self.udpHeartbeatSeconds))
            self.logger.debug("udpDataPacketInterval = {0}".format(self.udpDataPacketInterval))
            self.udpServer = SocketServer.UDPServer(('0.0.0.0', 0), IotUDPHandler)
            self.udpServer.service = self
            self.udpServer.role = IotUDPHandler.CLIENT
            self.logger.info("starting UDP client at {0}:{1} connecting to {2}, state at {3}".format(self.udpServer.server_address[0], self.udpServer.server_address[1], self.serverAddr, self.stateFile))            
            timer = threading.Timer(0.5, self.repeat)
            timer.daemon = True
            timer.start()
            self.udpServer.serve_forever()      
        elif self.protocol == "ssl":
            while True:
                self.logger.info("Connecting by SSL to server at {0}".format(self.serverAddr))
                try:
                    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
                    self.logger.debug("using caCertFile={0}, deviceCertFile={1}, deviceKeyFile={2}".format(self.caCertFile, self.deviceCertFile, self.deviceKeyFile))
                    sslSocket = ssl.wrap_socket(sock, ca_certs=self.caCertFile, cert_reqs=ssl.CERT_REQUIRED, certfile=self.deviceCertFile, keyfile=self.deviceKeyFile, ssl_version=ssl.PROTOCOL_TLSv1)     
                    sslSocket.connect((self.serverAddr.split(':')[0], int(self.serverAddr.split(':')[1])))   
                    servercert = sslSocket.getpeercert()
                    subject = dict(x[0] for x in servercert['subject'])
                    self.logger.info("Connected to server with valid certificate, CN={0}".format(subject['commonName']))  
                    self.sslSocket = sslSocket
                    sslThread = threading.Thread(target = self.sslListen, args = (self.sslSocket,))
                    sslThread.daemon = True
                    sslThread.start()
                    while True:
                        payload = self.deviceHandler.getMessagePayload()
                        self.logger.debug("Sending payload to {0} by SSL: {1}".format(self.serverAddr, payload))
                        iotcommon.sendMessage(self.sslSocket, payload)
                        time.sleep(self.sslIntervalSeconds)
                except Exception as e: 
                    self.logger.exception(e)
                time.sleep(10)
agent.py 文件源码 项目:stackimpact-python 作者: stackimpact 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def delay(self, timeout, func):
        def func_wrapper():
            try:
                func()
            except Exception:
                self.exception()

        t = threading.Timer(timeout, func_wrapper, ())
        t.start()

        return t
agent.py 文件源码 项目:stackimpact-python 作者: stackimpact 项目源码 文件源码 阅读 42 收藏 0 点赞 0 评论 0
def schedule(self, timeout, interval, func):
        tw = TimerWraper()

        def func_wrapper():
            start = time.time()

            try:
                func()
            except Exception:
                self.exception()

            with tw.cancel_lock:
                if not tw.canceled:
                    tw.timer = threading.Timer(abs(interval - (time.time() - start)), func_wrapper, ())
                    tw.timer.start()

        tw.timer = threading.Timer(timeout, func_wrapper, ())
        tw.timer.start()

        return tw
executor.py 文件源码 项目:bitcoin-trading-system 作者: vinicius-ronconi 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def execute(self):
        try:
            self.system.run()
        except (ReadTimeout, ConnectionError, JSONDecodeError):
            pass
        except exceptions.TradingSystemException as e:
            curr = datetime.now()
            print('{time} - {text}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e)))
        except Exception as e:
            curr = datetime.now()
            print('{time} - {text} - {args}'.format(time=curr.strftime('%Y-%m-%d %H:%M:%S'), text=str(e), args=e.args))
            traceback.print_exc()

        if self.interval:
            threading.Timer(self.interval, self.execute).start()
consumer.py 文件源码 项目:seqlog 作者: tintoy 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _schedule_auto_flush(self):
        """
        Schedule an automatic flush of the current batch.
        """

        if not self.auto_flush_timeout:
            return  # Auto-flush is disabled.

        self.state_lock.acquire()
        try:
            if self.flush_timer:
                return

            self.flush_timer = Timer(self.auto_flush_timeout, self.flush)
            self.flush_timer.daemon = True
            self.flush_timer.start()
        finally:
            self.state_lock.release()
test_debugger.py 文件源码 项目:PythonForWindows 作者: hakril 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_bp_exe_by_name(proc32_64_debug):
    class TSTBP(windows.debug.Breakpoint):
        COUNTER = 0
        def trigger(self, dbg, exc):
            TSTBP.COUNTER += 1
            assert TSTBP.COUNTER == 1
            # Kill the target in 0.5s
            # It's not too long
            # It's long enought to get trigger being recalled if implem is broken
            threading.Timer(0.5, proc32_64_debug.exit).start()

    exepe = proc32_64_debug.peb.exe
    entrypoint = exepe.get_OptionalHeader().AddressOfEntryPoint
    exename = os.path.basename(proc32_64_debug.peb.imagepath.str)
    d = windows.debug.Debugger(proc32_64_debug)
    # The goal is to test bp of format 'exename!offset' so we craft a string based on the entrypoint
    d.add_bp(TSTBP("{name}!{offset}".format(name=exename, offset=entrypoint)))
    d.loop()
    assert TSTBP.COUNTER == 1
timeout_decorator.py 文件源码 项目:ComplexityEstimator 作者: marwin1991 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def exit_after(s):
    def outer(fn):
        def inner(*args, **kwargs):
            try:
                timer = Timer(s, quit_function)
                timer.start()
                result = fn(*args, **kwargs)
            except KeyboardInterrupt:
                logger = init_logger("exit_after_decorator")
                logger.info("Timeout reached!")
                print("Timeout reached!")
                timer.cancel()
                return -1
            finally:
                timer.cancel()
            return result
        return inner
    return outer
writing.py 文件源码 项目:nstock 作者: ybenitezf 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def commit(self, restart=True):
        if self.period:
            self.timer.cancel()

        with self.lock:
            ramreader = self._get_ram_reader()
            self._make_ram_index()

        if self.bufferedcount:
            self.writer.add_reader(ramreader)
        self.writer.commit(**self.commitargs)
        self.bufferedcount = 0

        if restart:
            self.writer = self.index.writer(**self.writerargs)
            if self.period:
                self.timer = threading.Timer(self.period, self.commit)
                self.timer.start()
magicmirrorplatform.py 文件源码 项目:AlexaPi 作者: alexa-pi 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def mm_heartbeat(self):
        # Check if stop or set next timer
        if self.shutdown:
            return
        threading.Timer(self.hb_timer, self.mm_heartbeat).start()

        address = ("http://" + self.mm_host + ":" + self.mm_port + "/alexapi?action=AVSHB")

        logger.debug("Sending MM Heatbeat")

        try:
            response = urlopen(address).read()
        except URLError as err:
            logger.error("URLError: %s", err.reason)
            return

        logger.debug("Response: " + response)
client.py 文件源码 项目:loving-ai 作者: opencog 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def process_response(self, response):
        if response is not None:
            answer = response.get('text')
            if not self.ignore_indicator:
                self.process_indicator(answer)
            response['text'] = norm(answer)
            self.last_response = response
            if self.response_listener is None:
                self.stdout.write('{}[by {}]: {}\n'.format(
                    self.botname, response.get('botid'),
                    response.get('text')))
            else:
                try:
                    threading.Timer(0, self.response_listener.on_response, (self.session, response)).start()
                except Exception as ex:
                    logger.error(ex)
client.py 文件源码 项目:loving-ai 作者: opencog 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def process_indicator(self, reply):
        cmd, timeout = None, None
        for match in re.findall(r'\[.*\]', reply):
            match = match.strip()
            match = match.replace(' ', '')
            if match == '[loopback=0]':
                self.cancel_timer()
                return
            match = match.replace(']', '')
            match = match.replace('[', '')
            if '=' in match:
                cmd, timeout = match.split('=')
                self.timeout = float(timeout)/1000
            else:
                cmd = match
            cmd = '[{}]'.format(cmd)
        if self.timeout is not None and cmd is not None:
            self.cancel_timer()
            self.timer = threading.Timer(self.timeout, self.ask, (cmd, ))
            self.timer.start()
            logger.info("Start {} timer with timeout {}".format(
                cmd, self.timeout))
fsmonitor.py 文件源码 项目:onedrive-e 作者: tobecontinued 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def _process_move_from_event(self, drive, local_parent_path, ent_name, is_folder):
        # First try finding the item in database.
        rel_parent_path = _get_rel_parent_path(drive, local_parent_path)
        item_store = self._items_store_man.get_item_storage(drive)
        q = item_store.get_items_by_id(local_parent_path=local_parent_path, item_name=ent_name)
        try:
            item_id, item = q.popitem()
            if item.is_folder != is_folder:
                raise KeyError()
        except KeyError:
            # If the record does not match, sync the parent after some time.
            threading.Timer(self.SYNC_PARENT_DELAY_SEC, self._sync_parent_dir_of, (drive, rel_parent_path))
            return
        task = delete_task.DeleteItemTask(parent_task=self._task_bases[drive], rel_parent_path=rel_parent_path,
                                          item_name=ent_name, is_folder=is_folder)
        task.item_obj = item
        self._delayed_tasks.add(task)
        threading.Timer(self.MOVE_DETECTION_DELAY_SEC, self._enqueue_delayed_task, task)
utils.py 文件源码 项目:steth 作者: openstack 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def execute(cmd, shell=False, root=False, timeout=10):
    try:
        if root:
            cmd.insert(0, 'sudo')
        LOG.info(cmd)
        subproc = subprocess.Popen(cmd, stdout=subprocess.PIPE,
                                   stderr=subprocess.PIPE, shell=shell)
        timer = Timer(timeout, lambda proc: proc.kill(), [subproc])
        timer.start()
        subproc.wait()
        stdcode = subproc.returncode
        stdout = subproc.stdout.readlines()
        stderr = subproc.stderr.readlines()
        timer.cancel()

        def list_strip(lines):
            return [line.strip() for line in lines]
        return stdcode, list_strip(stderr) if stdcode else list_strip(stdout)
    except Exception as e:
        LOG.error(e)
        raise
kodidevkit.py 文件源码 项目:KodiDevKit 作者: phil65 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def on_selection_modified_async(self, view):
        """
        ST API method: gets called when selection changed
        """
        if len(view.sel()) > 1 or not INFOS.addon:
            return None
        try:
            region = view.sel()[0]
        except Exception:
            return None
        if region == self.prev_selection:
            return None
        self.prev_selection = region
        delay = self.settings.get("tooltip_delay", 200)
        if self.timer:
            self.timer.cancel()
        self.timer = Timer(delay / 1000, self.show_tooltip, (view,))
        self.timer.start()
excelRTDServer.py 文件源码 项目:OSPTF 作者: xSploited 项目源码 文件源码 阅读 67 收藏 0 点赞 0 评论 0
def Update(self):
    # Get our wake-up thread ready...
    self.ticker = threading.Timer(self.INTERVAL, self.Update)
    try:
      # Check if any of our topics have new info to pass on
      if len(self.topics):     
        refresh = False
        for topic in self.topics.itervalues():
          topic.Update(self)
          if topic.HasChanged():
            refresh = True
          topic.Reset()

        if refresh:
          self.SignalExcel()
    finally:
      self.ticker.start() # Make sure we get to run again
util.py 文件源码 项目:FlowIDE 作者: tptee 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def debounce(func):
    def debounced(self, *args, **kwargs):
        flow_settings = find_flow_settings(
            self.view.window().project_data()
        )
        debounce_ms = flow_settings.get('debounce_ms')

        def call_func():
            func(self, *args, **kwargs)
        try:
            debounced.timer.cancel()
        except(AttributeError):
            pass

        debounced.timer = Timer(debounce_ms / 1000, call_func)
        debounced.timer.start()
    return debounced
helpers.py 文件源码 项目:Prism 作者: Stumblinbear 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def repeat(start_time, repeat_time):
    if repeat_time < 1:
        logging.error('Repeating function must have a repeat time greater than 1 second')

        def repeat_inner(func):
            return func
        return repeat_inner

    def repeat_inner(func):
        @wraps(func)
        def func_inner():
            t = threading.Timer(repeat_time, func_inner)
            t.daemon = True
            t.start()
            return func()
        t = threading.Timer(start_time, func_inner)
        t.daemon = True
        t.start()
        return func_inner
    return repeat_inner
test.py 文件源码 项目:klondike 作者: planetlabs 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def doRpcServer(port, stopTimeSec):
    class EchoHandler(object):
        def Echo123(self, msg1, msg2, msg3):
            return ("1:%s 2:%s 3:%s" % (msg1, msg2, msg3))
        def EchoStruct(self, msg):
            return ("%s" % msg)

    addr = msgpackrpc.Address('localhost', port)
    server = msgpackrpc.Server(EchoHandler())
    server.listen(addr)
    # run thread to stop it after stopTimeSec seconds if > 0
    if stopTimeSec > 0:
        def myStopRpcServer():
            server.stop()
        t = threading.Timer(stopTimeSec, myStopRpcServer)
        t.start()
    server.start()
midi_hub_test.py 文件源码 项目:magenta 作者: tensorflow 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def testCaptureSequence_StopSignal(self):
    start_time = 1.0

    threading.Timer(0.1, self.send_capture_messages).start()

    captured_seq = self.midi_hub.capture_sequence(
        120, start_time,
        stop_signal=midi_hub.MidiSignal(type='control_change', control=1))

    expected_seq = music_pb2.NoteSequence()
    expected_seq.tempos.add(qpm=120)
    expected_seq.total_time = 6.0
    testing_lib.add_track_to_sequence(
        expected_seq, 0,
        [Note(1, 64, 2, 5), Note(2, 64, 3, 4), Note(3, 64, 4, 6)])
    self.assertProtoEquals(captured_seq, expected_seq)
midi_hub_test.py 文件源码 项目:magenta 作者: tensorflow 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def testCaptureSequence_Mono(self):
    start_time = 1.0

    threading.Timer(0.1, self.send_capture_messages).start()
    self.midi_hub = midi_hub.MidiHub(self.port, self.port,
                                     midi_hub.TextureType.MONOPHONIC)
    captured_seq = self.midi_hub.capture_sequence(
        120, start_time,
        stop_signal=midi_hub.MidiSignal(type='control_change', control=1))

    expected_seq = music_pb2.NoteSequence()
    expected_seq.tempos.add(qpm=120)
    expected_seq.total_time = 6
    testing_lib.add_track_to_sequence(
        expected_seq, 0,
        [Note(1, 64, 2, 3), Note(2, 64, 3, 4), Note(3, 64, 4, 6)])
    self.assertProtoEquals(captured_seq, expected_seq)
reminder.py 文件源码 项目:Jarvis 作者: sukeesh 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def addReminder(name, time, uuid, body='', urgency=0, hidden=True):
    """
    Queue reminder.

    Show notification at the specified time. With the given name as title and an optional body
    for further information.
    The mandatory is used to identify the reminder and remove it with removeReminder().
    If the reminder should show up in the list printed by 'remind print' hidden (default: True)
    should be set to false. In this case the reminder is requeued at startup. If reminders are
    used e.g. with a todo list for due dates, hidden should probably be set to true so that the
    list is not cluttered with automatically created data.
    If the reminder needs a different priority, it can be set with urgency to critical (=2),
    high (=1) or normal (=0, default).
    """
    waitTime = time - dt.now()
    n = notify2.Notification(name, body)
    n.set_urgency(urgency)
    timerList[uuid] = Timer(waitTime.total_seconds(), showAlarm, [n, name])
    timerList[uuid].start()
    newItem = {'name': name, 'time': time, 'hidden': hidden, 'uuid': uuid}
    reminderList['items'].append(newItem)
    reminderList['items'] = sort(reminderList['items'])
    write_file("reminderlist.txt", reminderList)
relay.py 文件源码 项目:BoilerPlate 作者: wyaron 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_relay():
   """Test relay on and off cycle"""

   # check if the output is high
   print 'current control output is: ', is_output_high(), ' (should be off)'

   # start the relay
   start_relay()

   print 'current control output is: ', is_output_high(), ' (should be on)'

   # setup a timer to stop the relay after 5 seconds
   t = Timer(5, stop_relay)
   t.start()

   # wait for the timer to finish
   t.join()
test_llcp_tco.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_recv(self, tco):
        pdu = nfc.llcp.pdu.UnnumberedInformation(1, 1, HEX('1122'))
        assert tco.enqueue(pdu) is True
        assert tco.recv() == pdu
        threading.Timer(0.01, tco.close).start()
        with pytest.raises(nfc.llcp.Error) as excinfo:
            tco.recv()
        assert excinfo.value.errno == errno.EPIPE
        with pytest.raises(nfc.llcp.Error) as excinfo:
            tco.recv()
        assert excinfo.value.errno == errno.ESHUTDOWN


# =============================================================================
# Logical Data Link
# =============================================================================
test_llcp_tco.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 56 收藏 0 点赞 0 评论 0
def test_recvfrom(self, tco):
        pdu = nfc.llcp.pdu.Symmetry()
        assert tco.enqueue(pdu) is False
        pdu = nfc.llcp.pdu.UnnumberedInformation(1, 1, (tco.recv_miu+1) * b'1')
        assert tco.enqueue(pdu) is False
        pdu = nfc.llcp.pdu.UnnumberedInformation(1, 1, HEX('1122'))
        assert tco.enqueue(pdu) is True
        assert tco.recvfrom() == (pdu.data, pdu.ssap)
        threading.Timer(0.01, tco.close).start()
        with pytest.raises(nfc.llcp.Error) as excinfo:
            tco.recvfrom()
        assert excinfo.value.errno == errno.EPIPE
        with pytest.raises(nfc.llcp.Error) as excinfo:
            tco.recvfrom()
        assert excinfo.value.errno == errno.ESHUTDOWN


# =============================================================================
# Data Link Connection
# =============================================================================
test_llcp_llc.py 文件源码 项目:nfcpy 作者: nfcpy 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_accept_connect(self, llc, ldl, dlc, peer_miu, send_miu):
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.accept(object())
            assert excinfo.value.errno == errno.ENOTSOCK
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.accept(ldl)
            assert excinfo.value.errno == errno.EOPNOTSUPP
            with pytest.raises(nfc.llcp.Error) as excinfo:
                llc.accept(dlc)
            assert excinfo.value.errno == errno.EINVAL
            connect_pdu = nfc.llcp.pdu.Connect(4, 32, peer_miu)
            threading.Timer(0.01, llc.dispatch, (connect_pdu,)).start()
            llc.bind(dlc, b'urn:nfc:sn:snep')
            llc.listen(dlc, 0)
            sock = llc.accept(dlc)
            assert isinstance(sock, nfc.llcp.tco.DataLinkConnection)
            assert llc.getsockopt(sock, nfc.llcp.SO_SNDMIU) == send_miu
            assert llc.getpeername(sock) == 32
            assert llc.getsockname(sock) == 4
kivy_screens.py 文件源码 项目:bptc_wallet 作者: ceddie 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def __init__(self, network, defaults, app):
        self.network = network
        self.defaults = defaults
        self.pushing = False
        self.app = app
        super().__init__()

        # endless loop for updating information displayed
        def update_statistics():
            self.ids.listening_interface_label.text = 'Listening interface: {}:{}'.format(bptc.ip, bptc.port)
            self.ids.event_count_label.text = '{} events, {} confirmed'.format(len(self.hashgraph.lookup_table.keys()),
                                                                               len(self.hashgraph.ordered_events))
            self.ids.last_push_sent_label.text = 'Last push sent: {}'.format(self.network.last_push_sent)
            self.ids.last_push_received_label.text = 'Last push received: {}'.format(self.network.last_push_received)

            t = threading.Timer(1, update_statistics)
            t.daemon = True
            t.start()

        update_statistics()
ShockStatemachine.py 文件源码 项目:SmartDoorControl 作者: xiaokaizh 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def convert(a):
    ctState.currentState = a
    if ctState.currentState != st.nomal:
        print("CurrentState %s" % a)
    if ctState.currentState == st.knockOr:
        global knockNum
        knockNum += 1
        if knockNum == 2:
            ctState.currentState = st.knock
    if ctState.currentState == st.knock:
        # url = "192.168.12.101"
        # port = 9001
        # MessageHandle.CallAlarm(url, port, 1)
        print "Tong zhi Yong hu :you ren qiao men"




# Timer???


问题


面经


文章

微信
公众号

扫码关注公众号