python类sleep()的实例源码

PCA9685.py 文件源码 项目:Adafruit_Python_PCA9685 作者: adafruit 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def set_pwm_freq(self, freq_hz):
        """Set the PWM frequency to the provided value in hertz."""
        prescaleval = 25000000.0    # 25MHz
        prescaleval /= 4096.0       # 12-bit
        prescaleval /= float(freq_hz)
        prescaleval -= 1.0
        logger.debug('Setting PWM frequency to {0} Hz'.format(freq_hz))
        logger.debug('Estimated pre-scale: {0}'.format(prescaleval))
        prescale = int(math.floor(prescaleval + 0.5))
        logger.debug('Final pre-scale: {0}'.format(prescale))
        oldmode = self._device.readU8(MODE1);
        newmode = (oldmode & 0x7F) | 0x10    # sleep
        self._device.write8(MODE1, newmode)  # go to sleep
        self._device.write8(PRESCALE, prescale)
        self._device.write8(MODE1, oldmode)
        time.sleep(0.005)
        self._device.write8(MODE1, oldmode | 0x80)
test_read_write.py 文件源码 项目:nidaqmx-python 作者: ni 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def test_uint_multi_port(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        do_ports = random.sample(
            [d for d in x_series_device.do_ports if d.do_port_width <= 16], 2)

        total_port_width = sum([d.do_port_width for d in do_ports])

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                flatten_channel_string([d.name for d in do_ports]),
                line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)

            # Generate random values to test.
            values_to_test = [int(random.getrandbits(total_port_width))
                              for _ in range(10)]

            values_read = []
            for value_to_test in values_to_test:
                task.write(value_to_test)
                time.sleep(0.001)
                values_read.append(task.read())

            assert values_read == values_to_test
test_stream_digital_readers_writers.py 文件源码 项目:nidaqmx-python 作者: ni 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_one_sample_one_line(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        do_line = random.choice(x_series_device.do_lines).name

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                do_line, line_grouping=LineGrouping.CHAN_PER_LINE)

            writer = DigitalSingleChannelWriter(task.out_stream)
            reader = DigitalSingleChannelReader(task.in_stream)

            # Generate random values to test.
            values_to_test = [bool(random.getrandbits(1)) for _ in range(10)]

            values_read = []
            for value_to_test in values_to_test:
                writer.write_one_sample_one_line(value_to_test)
                time.sleep(0.001)
                values_read.append(reader.read_one_sample_one_line())

            numpy.testing.assert_array_equal(values_read, values_to_test)
test_stream_digital_readers_writers.py 文件源码 项目:nidaqmx-python 作者: ni 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def test_one_sample_port_byte(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        do_port = random.choice(
            [d for d in x_series_device.do_ports if d.do_port_width <= 8])

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)

            # Generate random values to test.
            values_to_test = [int(random.getrandbits(do_port.do_port_width))
                              for _ in range(10)]

            writer = DigitalSingleChannelWriter(task.out_stream)
            reader = DigitalSingleChannelReader(task.in_stream)

            values_read = []
            for value_to_test in values_to_test:
                writer.write_one_sample_port_byte(value_to_test)
                time.sleep(0.001)
                values_read.append(reader.read_one_sample_port_byte())

            numpy.testing.assert_array_equal(values_read, values_to_test)
test_stream_digital_readers_writers.py 文件源码 项目:nidaqmx-python 作者: ni 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def test_one_sample_port_uint16(self, x_series_device, seed):
        # Reset the pseudorandom number generator with seed.
        random.seed(seed)

        do_port = random.choice(
            [do for do in x_series_device.do_ports if do.do_port_width <= 16])

        with nidaqmx.Task() as task:
            task.do_channels.add_do_chan(
                do_port.name, line_grouping=LineGrouping.CHAN_FOR_ALL_LINES)

            # Generate random values to test.
            values_to_test = [int(random.getrandbits(do_port.do_port_width))
                              for _ in range(10)]

            writer = DigitalSingleChannelWriter(task.out_stream)
            reader = DigitalSingleChannelReader(task.in_stream)

            values_read = []
            for value_to_test in values_to_test:
                writer.write_one_sample_port_uint16(value_to_test)
                time.sleep(0.001)
                values_read.append(reader.read_one_sample_port_uint16())

            numpy.testing.assert_array_equal(values_read, values_to_test)
qiushimm.py 文件源码 项目:Python 作者: Guzi219 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def Start(self):
        self.enable = True
        page = self.page
        print u'????????......'
        # ????????????????
        thread.start_new_thread(self.LoadPage, ())
        time.sleep(2) #wait the sub thread to be done.
        # ----------- ???????? -----------
        while self.enable:
            # ??self?page???????
            if len(self.pages) > 0:
                now_page_items = self.pages[0]

                # del now page items
                del self.pages[0]

                self.ShowOnePage(now_page_items, page)
                page += 1

        print self.enable


# ----------- ?????? -----------
main0.py 文件源码 项目:PGO-mapscan-opt 作者: seikur0 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def do_full_login(account):
    lock_network.acquire()
    time.sleep(locktime)
    lock_network.release()
    if account['type'] == 'ptc':
        login_ptc(account)
    elif account['type'] == 'google':
        login_google(account)
        new_session(account)
    else:
        lprint('[{}] Error: Login type should be either ptc or google.'.format(account['num']))
        sys.exit()

    cursor_accs = db_accs.cursor()
    while True:
        try:
            cursor_accs.execute("INSERT OR REPLACE INTO accounts VALUES(?,?,?,?,?,?,?)", [account['user'], account['access_token'], account['access_expire_timestamp'], account['api_url'], 0, '0', '0'])
            db_accs.commit()
            return
        except sqlite3.OperationalError as e:
            lprint('[-] Sqlite operational error: {}, account: {} Retrying...'.format(e, account['user']))
        except sqlite3.InterfaceError as e:
            lprint('[-] Sqlite interface error: {}, account: {} Retrying...'.format(e, account['user']))
bot.py 文件源码 项目:foodAId 作者: Pregnor 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _loop_messages(self):
        '''Slack message loop.'''

        if not self._slack.rtm_connect():
            raise Exception("Could not connect to Slack RTM API.\nBot token might be invalid.")

        while self.is_running:
            try:
                events = self._slack.rtm_read()
            except websocket._exceptions.WebSocketConnectionClosedException:
                self._slack.rtm_connect()
                continue

            bot_events = self._get_bot_events(events)

            if bot_events:
                for event in bot_events:
                    self._send_typing(event['channel'])
                    self._process_event(event)

            time.sleep(self._reaction_interval)
sensor_backlog_individual.py 文件源码 项目:cbapi-examples 作者: cbcommunity 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def query_forever(cb, interval, udp):

    while True:

        try:
            sensors = cb.sensors()
            for sensor in sensors:

                summary = {}
                summary['computer_name'] = sensor['computer_name'].strip()
                summary['id'] = sensor['id']
                summary['computer_sid'] = sensor['computer_sid'].strip()
                summary['num_storefiles_bytes'] = sensor['num_storefiles_bytes']
                summary['num_eventlog_bytes'] = sensor['num_eventlog_bytes']

                output(json.dumps(summary), udp)
        except Exception, e:
            print e
            pass 

        time.sleep(float(interval))

    return
iotclient.py 文件源码 项目:IotCenter 作者: panjanek 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def start(self):
        self.deviceHandler.start()
        if self.protocol == "udp":
            self.loadState()        
            self.logger.debug("udpHeartbeatSeconds = {0}".format(self.udpHeartbeatSeconds))
            self.logger.debug("udpDataPacketInterval = {0}".format(self.udpDataPacketInterval))
            self.udpServer = SocketServer.UDPServer(('0.0.0.0', 0), IotUDPHandler)
            self.udpServer.service = self
            self.udpServer.role = IotUDPHandler.CLIENT
            self.logger.info("starting UDP client at {0}:{1} connecting to {2}, state at {3}".format(self.udpServer.server_address[0], self.udpServer.server_address[1], self.serverAddr, self.stateFile))            
            timer = threading.Timer(0.5, self.repeat)
            timer.daemon = True
            timer.start()
            self.udpServer.serve_forever()      
        elif self.protocol == "ssl":
            while True:
                self.logger.info("Connecting by SSL to server at {0}".format(self.serverAddr))
                try:
                    sock = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
                    self.logger.debug("using caCertFile={0}, deviceCertFile={1}, deviceKeyFile={2}".format(self.caCertFile, self.deviceCertFile, self.deviceKeyFile))
                    sslSocket = ssl.wrap_socket(sock, ca_certs=self.caCertFile, cert_reqs=ssl.CERT_REQUIRED, certfile=self.deviceCertFile, keyfile=self.deviceKeyFile, ssl_version=ssl.PROTOCOL_TLSv1)     
                    sslSocket.connect((self.serverAddr.split(':')[0], int(self.serverAddr.split(':')[1])))   
                    servercert = sslSocket.getpeercert()
                    subject = dict(x[0] for x in servercert['subject'])
                    self.logger.info("Connected to server with valid certificate, CN={0}".format(subject['commonName']))  
                    self.sslSocket = sslSocket
                    sslThread = threading.Thread(target = self.sslListen, args = (self.sslSocket,))
                    sslThread.daemon = True
                    sslThread.start()
                    while True:
                        payload = self.deviceHandler.getMessagePayload()
                        self.logger.debug("Sending payload to {0} by SSL: {1}".format(self.serverAddr, payload))
                        iotcommon.sendMessage(self.sslSocket, payload)
                        time.sleep(self.sslIntervalSeconds)
                except Exception as e: 
                    self.logger.exception(e)
                time.sleep(10)
display.py 文件源码 项目:IotCenter 作者: panjanek 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def displaySensor1(self,number, description, trend):
        self.canvas.itemconfigure(self.txtSensor1, text="{0:.1f}".format(number)+u'\u2103')
        self.sensor1ts = datetime.datetime.now()
        color = self.mapColor(number)
        if description is not None:
            self.canvas.itemconfigure(self.txtSensor1Desc, text=description)
        self.canvas.itemconfigure(self.txtSensor1, fill=color)
        self.canvas.itemconfigure(self.txtSensor1BigIcon, fill=color) 
        self.canvas.itemconfigure(self.txtSensor1SmallIcon, text=u'\u2022')    
        def hide():
            time.sleep(0.5)
            self.canvas.itemconfigure(self.txtSensor1SmallIcon, text="")
        threading.Thread(target = hide).start()      
        if trend == -1:
            self.canvas.itemconfigure(self.txtSensor1BigIcon, text=u'\u2198') 
        elif trend == 1:
            self.canvas.itemconfigure(self.txtSensor1BigIcon, text=u'\u2197') 
        else:
            self.canvas.itemconfigure(self.txtSensor1BigIcon, text="")
display.py 文件源码 项目:IotCenter 作者: panjanek 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def repeat(self):
        while True:
            try:
                self.displayTime()
                if (datetime.datetime.now() - self.sensor1ts).total_seconds() > self.expirationSeconds:
                    self.canvas.itemconfigure(self.txtSensor1, text="")
                    self.canvas.itemconfigure(self.txtSensor1BigIcon, text="")
                if (datetime.datetime.now() - self.sensor2ts).total_seconds() > self.expirationSeconds:
                    self.canvas.itemconfigure(self.txtSensor2, text="")      
                    self.canvas.itemconfigure(self.txtSensor2BigIcon, text="")                    

                #t = random.random()*60-20
                #self.displaySensor1(t, "test")
            except Exception as e:    
                self.logger.exception(e)
            except:
                pass
            time.sleep(1)
dispycos_client8.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def client_proc(computation, njobs, task=None):
    # schedule computation with the scheduler; scheduler accepts one computation
    # at a time, so if scheduler is shared, the computation is queued until it
    # is done with already scheduled computations
    if (yield computation.schedule()):
        raise Exception('Could not schedule computation')

    # send 5 requests to remote process (compute_task)
    def send_requests(rtask, task=None):
        # first send this local task (to whom rtask sends result)
        rtask.send(task)
        for i in range(5):
            # even if recipient doesn't use "yield" (such as executing long-run
            # computation, or thread-blocking function such as 'time.sleep' as
            # in this case), the message is accepted by another scheduler
            # (netpycos.Pycos) at the receiver and put in recipient's message
            # queue
            rtask.send(random.uniform(10, 20))
            # assume delay in input availability
            yield task.sleep(random.uniform(2, 5))
        # end of input is indicated with None
        rtask.send(None)
        result = yield task.receive() # get result
        print('    %s computed result: %.4f' % (rtask.location, result))

    for i in range(njobs):
        rtask = yield computation.run(compute_task)
        if isinstance(rtask, pycos.Task):
            print('  job %d processed by %s' % (i, rtask.location))
            pycos.Task(send_requests, rtask)

    yield computation.close()
dispycos_client8.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def client_proc(computation, njobs, task=None):
    # schedule computation with the scheduler; scheduler accepts one computation
    # at a time, so if scheduler is shared, the computation is queued until it
    # is done with already scheduled computations
    if (yield computation.schedule()):
        raise Exception('Could not schedule computation')

    # send 5 requests to remote process (compute_task)
    def send_requests(rtask, task=None):
        # first send this local task (to whom rtask sends result)
        rtask.send(task)
        for i in range(5):
            # even if recipient doesn't use "yield" (such as executing long-run
            # computation, or thread-blocking function such as 'time.sleep' as
            # in this case), the message is accepted by another scheduler
            # (netpycos.Pycos) at the receiver and put in recipient's message
            # queue
            rtask.send(random.uniform(10, 20))
            # assume delay in input availability
            yield task.sleep(random.uniform(2, 5))
        # end of input is indicated with None
        rtask.send(None)
        result = yield task.receive() # get result
        print('    %s computed result: %.4f' % (rtask.location, result))

    for i in range(njobs):
        rtask = yield computation.run(compute_task)
        if isinstance(rtask, pycos.Task):
            print('  job %d processed by %s' % (i, rtask.location))
            pycos.Task(send_requests, rtask)

    yield computation.close()
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _suspend(self, task, timeout, alarm_value, state):
        """Internal use only. See sleep/suspend in Task.
        """
        self._lock.acquire()
        if self.__cur_task != task:
            self._lock.release()
            logger.warning('invalid "suspend" - "%s" != "%s"', task, self.__cur_task)
            return -1
        tid = task._id
        if state == Pycos._AwaitMsg_ and task._msgs:
            s, update = task._msgs[0]
            if s == state:
                task._msgs.popleft()
                self._lock.release()
                return update
        if timeout is None:
            task._timeout = None
        else:
            if not isinstance(timeout, (float, int)):
                logger.warning('invalid timeout %s', timeout)
                self._lock.release()
                return -1
            if timeout <= 0:
                self._lock.release()
                return alarm_value
            else:
                task._timeout = _time() + timeout + 0.0001
                heappush(self._timeouts, (task._timeout, tid, alarm_value))
        self._scheduled.discard(tid)
        self._suspended.add(tid)
        task._state = state
        self._lock.release()
        return 0
dispycos.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __discover_node(self, msg, task=None):
        for _ in range(10):
            node_task = yield Task.locate('dispycos_node', location=msg.location,
                                          timeout=MsgTimeout)
            if not isinstance(node_task, Task):
                yield task.sleep(0.1)
                continue
            self._disabled_nodes.pop(msg.location.addr, None)
            node = self._nodes.pop(msg.location.addr, None)
            if node:
                logger.warning('Rediscovered dispycosnode at %s; discarding previous incarnation!',
                               msg.location.addr)
                self._disabled_nodes.pop(node.addr, None)
                if self._cur_computation:
                    status_task = self._cur_computation.status_task
                else:
                    status_task = None
                if status_task:
                    for server in node.servers.values():
                        for rtask, job in server.rtasks.values():
                            status = pycos.MonitorException(rtask, (Scheduler.TaskAbandoned, None))
                            status_task.send(status)
                        status_task.send(DispycosStatus(Scheduler.ServerAbandoned,
                                                        server.task.location))
                    info = DispycosNodeInfo(node.name, node.addr, node.cpus, node.platform,
                                            node.avail_info)
                    status_task.send(DispycosStatus(Scheduler.NodeAbandoned, info))
            node = self._disabled_nodes.get(msg.location.addr, None)
            if not node:
                node = Scheduler._Node(msg.name, msg.location.addr)
                self._disabled_nodes[msg.location.addr] = node
            node.task = node_task
            yield self.__get_node_info(node, task=task)
            raise StopIteration
dispycos_client8.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def client_proc(computation, njobs, task=None):
    # schedule computation with the scheduler; scheduler accepts one computation
    # at a time, so if scheduler is shared, the computation is queued until it
    # is done with already scheduled computations
    if (yield computation.schedule()):
        raise Exception('Could not schedule computation')

    # send 5 requests to remote process (compute_task)
    def send_requests(rtask, task=None):
        # first send this local task (to whom rtask sends result)
        rtask.send(task)
        for i in range(5):
            # even if recipient doesn't use "yield" (such as executing long-run
            # computation, or thread-blocking function such as 'time.sleep' as
            # in this case), the message is accepted by another scheduler
            # (netpycos.Pycos) at the receiver and put in recipient's message
            # queue
            rtask.send(random.uniform(10, 20))
            # assume delay in input availability
            yield task.sleep(random.uniform(2, 5))
        # end of input is indicated with None
        rtask.send(None)
        result = yield task.receive() # get result
        print('    %s computed result: %.4f' % (rtask.location, result))

    for i in range(njobs):
        rtask = yield computation.run(compute_task)
        if isinstance(rtask, pycos.Task):
            print('  job %d processed by %s' % (i, rtask.location))
            pycos.Task(send_requests, rtask)

    yield computation.close()
__init__.py 文件源码 项目:pycos 作者: pgiri 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def _suspend(self, task, timeout, alarm_value, state):
        """Internal use only. See sleep/suspend in Task.
        """
        self._lock.acquire()
        if self.__cur_task != task:
            self._lock.release()
            logger.warning('invalid "suspend" - "%s" != "%s"', task, self.__cur_task)
            return -1
        tid = task._id
        if state == Pycos._AwaitMsg_ and task._msgs:
            s, update = task._msgs[0]
            if s == state:
                task._msgs.popleft()
                self._lock.release()
                return update
        if timeout is None:
            task._timeout = None
        else:
            if not isinstance(timeout, (float, int)):
                logger.warning('invalid timeout %s', timeout)
                self._lock.release()
                return -1
            if timeout <= 0:
                self._lock.release()
                return alarm_value
            else:
                task._timeout = _time() + timeout + 0.0001
                heappush(self._timeouts, (task._timeout, tid, alarm_value))
        self._scheduled.discard(tid)
        self._suspended.add(tid)
        task._state = state
        self._lock.release()
        return 0
pg_gw_utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def stop_pg():
    '''
    Stops PLUMgrid service.
    '''
    service_stop('plumgrid')
    time.sleep(2)
pg_gw_utils.py 文件源码 项目:charm-plumgrid-gateway 作者: openstack 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def remove_iovisor():
    '''
    Removes iovisor kernel module.
    '''
    _exec_cmd(cmd=['rmmod', 'iovisor'],
              error_msg='Error Removing IOVisor Kernel Module')
    time.sleep(1)


问题


面经


文章

微信
公众号

扫码关注公众号