python类ticks_ms()的实例源码

_ht.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, name, pin, on_change=None):
        import onewire, ds18x20
        gc.collect()
        Device.__init__(self, name, pin,on_change=on_change)
        self.ds = ds18x20.DS18X20(onewire.OneWire(pin))
        self.roms = self.ds.scan()
        self.lasttime = time.ticks_ms()
        self.ds.convert_temp()
        self.temp_list = None
        self.getters[""] = self.value
templog.py 文件源码 项目:templogger 作者: aequitas 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def templog(sleep=True):
    """Log voltage and temperature to MQTT."""
    start = time.ticks_ms()

    # get sensor values
    values = read_voltage()
    values.update(read_temps())
    print(values)

    # send values over MQTT if connected
    if wait_connect():
        if not mqtt_send(values):
            machine.reset()
    else:
        # failed to connect, reboot
        machine.reset()

    if sleep:
        delta = time.ticks_diff(start, time.ticks_ms())
        deepsleep(delta)
miniserver.py 文件源码 项目:esp-webui 作者: MarkR42 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def start_server():
    print("Starting web server")
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind( ('', 80) ); sock.listen(1)
    sock.settimeout(5.0)

    print("Synchronous web server running...")
    gc.collect()
    print("gc.mem_free=", gc.mem_free())
    t0 = time.ticks_ms()
    try:
        while True:
            accept_conn(sock)
            now = time.ticks_ms()
            if (now - t0) > 8000:
                periodic_tasks()
                t0 = now
    finally:
        sock.close()
download.py 文件源码 项目:uPyLoader 作者: BetaRavener 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def _read_timeout(cnt, timeout_ms=2000):
    time_support = "ticks_ms" in dir(time)
    s_time = time.ticks_ms() if time_support else 0
    data = sys.stdin.read(cnt)
    if len(data) != cnt or (time_support and time.ticks_diff(time.ticks_ms(), s_time) > timeout_ms):
        return None
    return data
upload.py 文件源码 项目:uPyLoader 作者: BetaRavener 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def _read_timeout(cnt, timeout_ms=2000):
    time_support = "ticks_ms" in dir(time)
    s_time = time.ticks_ms() if time_support else 0
    data = sys.stdin.read(cnt)
    if len(data) != cnt or (time_support and time.ticks_diff(time.ticks_ms(), s_time) > timeout_ms):
        return None
    return data
02_virtual_read.py 文件源码 项目:blynk-library-python 作者: vshymanskyy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def v2_read_handler():
    # This widget will show some time in seconds..
    blynk.virtual_write(2, time.ticks_ms() // 1000)

# Start Blynk (this call should never return)
07_user_task.py 文件源码 项目:blynk-library-python 作者: vshymanskyy 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def my_user_task():
    # do any non-blocking operations
    print('Action')
    blynk.virtual_write(2, time.ticks_ms() // 1000)
BlynkLib.py 文件源码 项目:blynk-library-python 作者: vshymanskyy 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def sleep_from_until (start, delay):
    while time.ticks_diff(start, time.ticks_ms()) < delay:
        idle_func()
    return start + delay
BlynkLib.py 文件源码 项目:blynk-library-python 作者: vshymanskyy 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _run_task(self):
        if self._task:
            c_millis = time.ticks_ms()
            if c_millis - self._task_millis >= self._task_period:
                self._task_millis += self._task_period
                self._task()
blynk.py 文件源码 项目:micropython-share 作者: pacmac 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def v0_read_handler():
  # we must call virtual write in order to send the value to the widget
  blynk.virtual_write(0, time.ticks_ms() // 1000)
  print('v0')

# register the virtual pin
blynklib.py 文件源码 项目:micropython-share 作者: pacmac 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def sleep_from_until (start, delay):
  ## while time.ticks_diff(start, time.ticks_ms()) < delay:
  while time.ticks_diff(time.ticks_ms(),start) < delay:
    machine.idle()
  return start + delay
blynklib.py 文件源码 项目:micropython-share 作者: pacmac 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _run_task(self):
        if self._task:
            c_millis = time.ticks_ms()
            if c_millis - self._task_millis >= self._task_period:
                self._task_millis += self._task_period
                self._task()
sirene.py 文件源码 项目:Hackathon_CIASC_2017_Geo 作者: pedrovhb 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def mensagem_recebida(topico, payload):
    print('Alerta!')
    # Guardar tempo de início da mensagem
    comeco = time.ticks_ms()

    # Enquanto o valor da sirene não for atingido, alternamos os leds vermelho e azul
    while comeco + tempo_sirene > time.ticks_ms():
        led_vermelho.value(time.tick_ms() % 500 > 250)
        led_azul.value(time.tick_ms() % 500 <= 250)


# Conectar-se ao broker MQTT
ds18b20_mqtt.py 文件源码 项目:MicroPython 作者: Euter2 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def go_sleep(sleep_minutes):
    """Perform deepsleep to save energy."""
    rtc = machine.RTC()
    rtc.irq(trigger=rtc.ALARM0, wake=machine.DEEPSLEEP)
    # ticks_ms is used to make wake up period more consistent
    sleep_seconds = (sleep_minutes * 60) - (ticks_ms() // 1000)
    rtc.alarm(rtc.ALARM0, sleep_seconds * 1000)
    print(str(sleep_seconds // 60) +
          ":" + str(sleep_seconds % 60) +
          " minutes deep sleep NOW!")
    machine.deepsleep()
_servo.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _trigger_next_turn(self):
        self.turn_start=time.ticks_ms()
        if len(self.angle_list)>0:
            self.write_angle(self.angle_list[0])
            del self.angle_list[0]
_servo.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _update(self):
        if self.turn_start is not None: # turn in process
            current=time.ticks_ms()
            if time.ticks_diff(current, self.turn_start) >= self.turn_time_ms:
                if len(self.angle_list) > 0:
                    self._trigger_next_turn()
                else:
                    self._release()
devices.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def do_later(time_delta_s, callback, id=None):
    delta = int(time_delta_s * 1000)
    later = time.ticks_add(time.ticks_ms(), delta)
    if id is None:
        id = hash((hash(callback), later))
    _timestack[id] = (later, callback)
    return id
devices.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def run(updates=5, sleepms=1, poll_rate_inputs=4, poll_rate_network=10):
    # updates: send out a status every this amount of seconds.
    #          If 0, never send time based status updates only when change happened.
    # sleepms: going o sleep for how long between each loop run
    # poll_rate_network: how often to evaluate incoming network commands
    # poll_rate_inputs: how often to evaluate inputs
    _init_mqtt()
    if updates == 0:
        overflow = 1000
    else:
        overflow = updates * 1000
    overflow *= poll_rate_network * poll_rate_inputs / sleepms
    overflow = int(overflow)
    #    print("Overflow:",overflow)
    counter = 0

    while True:
        if counter % poll_rate_network == 0:
            _poll_subscripton()
        if counter % poll_rate_inputs == 0:
            device_list = []
            for d in _devlist.values():
                if d.update():
                    device_list.append(d)
            if len(device_list) > 0:
                _publish_status(device_list)
        if updates != 0 and counter % (updates * 1000) == 0:
            print("Publishing full update.")
            _publish_status(ignore_time=True)
        # execute things on timestack
        now = time.ticks_ms()
        for id in list(_timestack):
            t, cb = _timestack[id]
            if time.ticks_diff(now, t) >= 0:
                del (_timestack[id])
                cb(id)

        time.sleep_ms(sleepms)
        counter += 1
        if counter >= overflow:
            counter = 0
_ht.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def __init__(self, name, pin, dht_dev, delay,
                 on_change=None,report_change=True):
        self.delay = delay
        import dht
        self.dht = dht_dev
        self.lasttime = time.ticks_ms()
        self.dht.measure()
        Device.__init__(self, name, pin, on_change=on_change,
                        getters={"temperature":self.temperature,
                                 "humidity":self.humidity},
                        report_change=report_change)
_ht.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def time_controlled_measure(self):
        newtime = time.ticks_ms()
        if newtime - self.lasttime < 0 or newtime - self.lasttime > DS18X20.MEASURE_DELAY:
            self.temp_list = []
            for rom in self.roms:
                self.temp_list.append(self.ds.read_temp(rom))
            if len(self.temp_list)==1:
                self.temp_list = self.temp_list[0]
            self.ds.convert_temp()
            self.lasttime = newtime
_i2c_connector.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def _update(self):
        t = time.ticks_ms()
        #if self.suspend_start is not None \
        #        and time.ticks_diff(t,self.suspend_start) <= self.suspend_time:
        #    print("Bus access suspended.")
        if self.suspend_start is None \
            or time.ticks_diff(t,self.suspend_start) > self.suspend_time:
            self.suspend_start = None  # done waiting
            try:
                if self.msgq is not None:
                    self.pin.writeto(self.addr, self.msgq)
                    self.msgq = None
                s = self.pin.readfrom(self.addr,self.BUFFER_SIZE)
            except:
                print("Trouble accessing i2c.")
            else:
                if s[0]==255 and s[1]==255: # illegal read
                    print("Got garbage, waiting 1s before accessing bus again.")
                    print("data:", s)
                    self.suspend_time=1000 # take a break
                    self.suspend_start = time.ticks_ms();
                else:
                    count = s[0]*256+s[1]
                    if self.count != count:
                        l = s[2];
                        self.suspend_time = s[3];
                        # scale timer
                        if self.suspend_time & 128:
                            self.suspend_time = (self.suspend_time & 127) * 100
                            if self.suspend_time>5000: # should not happen
                                print("Garbage time -> waiting 1s before accessing bus again.")
                                print("data:",s)
                                self.suspend_time=1000
                        if self.suspend_time > 0:
                            self.suspend_start = time.ticks_ms();
                            print("Bus suspend for",self.suspend_time,"ms requested.")
                        if l <= self.BUFFER_SIZE: # discard if too big
                            self.count = count
                            self.current_value = s[4:4+l]
_hcsr04.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def _update(self):
        if ticks_diff(ticks_ms(), self._last_measured) > self.INTERVAL:
            value = self._measure()
            if abs(value - self._distance) >= self.precision:
                self._distance = value
            self._last_measured = ticks_ms()
_rgb_animator.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, device, command_str):
        self.program = command_str.split()
        self.device = device
        self.playing = True
        self.program = command_str.split()
        self.step = 0
        self.fade_goals = {}
        self.length = 0
        self.starttime = 0
        self.delta = 0
        self.lasttime = time.ticks_ms()
        self.effect = None
unetrepl.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def _send(self):
        # TODO: does this need to be unblocking?
        #dp.println("s {},{},{}".format(self.out_fill,int(self.out_buffer[0]),int(self.out_buffer[1]))) # debug
        self.cs.send(self.out_buffer,length=self.out_fill)
        self.out_fill = 0
        self.out_last_sent = time.ticks_ms()
unetrepl.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def flush(self):
        t = time.ticks_ms()
        if self.out_fill == 0: # reset time, if there is nothing to send
            self.out_last_sent = t
            # debug dp.println("rt {}".format(t))
        elif time.ticks_diff(t, self.out_last_sent) > self.INTERVAL:
            # debug           dp.println("t {},{},{}".format(time.ticks_diff(t,self.out_last_sent),
            #                                           t,self.out_last_sent))
            self.acquire_out_buffer()
            self._send()
            self.release_out_buffer()
crypt_socket.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def ticks_ms(): return int(time.time() * 1000)
crypt_socket.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def receive(self, request=0, timeoutms=None):
        # receive into network buffer,
        # fill buffer once and decrypt
        # if request>0 wait blocking for request number of bytes (if timeout
        # given interrupt after timeoutms ms)
        # timeout==None: block
        # timeout==0: return immediately after trying to read something from
        #             network buffer
        # timeout>0: try for time specified to read something before returning
        #            this function always returns a pointer to the buffer and
        #            number of bytes read (could be 0)
        data = self.netbuf_in
        data_mv = self.netbuf_in_mv
        readbytes = 0
        start_t = ticks_ms()
        while readbytes < self.netbuf_size:
            try:
                if self.sock_read(data_mv[readbytes:readbytes+1]):
                    readbytes += 1 # result was not 0 or none
                else:
                    if readbytes >= request:
                        break  # break if not blocking to request size
            except OSError as e:
                if len(e.args) > 0 \
                        and (e.args[0] == errno.EAGAIN
                        or e.args[0] == errno.ETIMEDOUT):
                    if readbytes >= request:
                        break  # break if not blocking to request size
                else:
                    raise
            if timeoutms is not None \
                    and ticks_diff(ticks_ms(), start_t) >= timeoutms:
                break
            sleep_ms(1) # prevent busy waiting
        if readbytes>0: self.crypt_in.decrypt(data,length=readbytes)
        return data,readbytes
ulno_iot_ht.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 32 收藏 0 点赞 0 评论 0
def time_controlled_measure():
    global _lasttime

    newtime = time.ticks_ms()
    if newtime - _lasttime < 0 or newtime - _lasttime > MEASURE_DELAY:
        d.measure()
        _lasttime = newtime
ulno_iot_ht.py 文件源码 项目:ulnoiot 作者: ulno 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def time_controlled_measure():
    global _lasttime

    newtime = time.ticks_ms()
    if newtime - _lasttime < 0 or newtime - _lasttime > MEASURE_DELAY:
        d.measure()
        _lasttime = newtime
bmp180.py 文件源码 项目:micropython-dev-kit 作者: db4linq 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def makegauge(self):
        '''
        Generator refreshing the raw measurments.
        '''
        delays = (5, 8, 14, 25)
        while True:
            self._bmp_i2c.writeto_mem(self._bmp_addr, 0xF4, bytearray([0x2E]))
            t_start = time.ticks_ms()
            while (time.ticks_ms() - t_start) <= 5: # 5mS delay
                yield None
            try:
                self.UT_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF6, 2)
            except:
                yield None
            self._bmp_i2c.writeto_mem(self._bmp_addr, 0xF4, bytearray([0x34+(self.oversample_setting << 6)]))
            t_pressure_ready = delays[self.oversample_setting]
            t_start = time.ticks_ms()
            while (time.ticks_ms() - t_start) <= t_pressure_ready:
                yield None
            try:
                self.MSB_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF6, 1)
                self.LSB_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF7, 1)
                self.XLSB_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF8, 1)
            except:
                yield None
            yield True


问题


面经


文章

微信
公众号

扫码关注公众号