def __init__(self, name, pin, on_change=None):
import onewire, ds18x20
gc.collect()
Device.__init__(self, name, pin,on_change=on_change)
self.ds = ds18x20.DS18X20(onewire.OneWire(pin))
self.roms = self.ds.scan()
self.lasttime = time.ticks_ms()
self.ds.convert_temp()
self.temp_list = None
self.getters[""] = self.value
python类ticks_ms()的实例源码
def templog(sleep=True):
"""Log voltage and temperature to MQTT."""
start = time.ticks_ms()
# get sensor values
values = read_voltage()
values.update(read_temps())
print(values)
# send values over MQTT if connected
if wait_connect():
if not mqtt_send(values):
machine.reset()
else:
# failed to connect, reboot
machine.reset()
if sleep:
delta = time.ticks_diff(start, time.ticks_ms())
deepsleep(delta)
def start_server():
print("Starting web server")
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind( ('', 80) ); sock.listen(1)
sock.settimeout(5.0)
print("Synchronous web server running...")
gc.collect()
print("gc.mem_free=", gc.mem_free())
t0 = time.ticks_ms()
try:
while True:
accept_conn(sock)
now = time.ticks_ms()
if (now - t0) > 8000:
periodic_tasks()
t0 = now
finally:
sock.close()
def _read_timeout(cnt, timeout_ms=2000):
time_support = "ticks_ms" in dir(time)
s_time = time.ticks_ms() if time_support else 0
data = sys.stdin.read(cnt)
if len(data) != cnt or (time_support and time.ticks_diff(time.ticks_ms(), s_time) > timeout_ms):
return None
return data
def _read_timeout(cnt, timeout_ms=2000):
time_support = "ticks_ms" in dir(time)
s_time = time.ticks_ms() if time_support else 0
data = sys.stdin.read(cnt)
if len(data) != cnt or (time_support and time.ticks_diff(time.ticks_ms(), s_time) > timeout_ms):
return None
return data
def v2_read_handler():
# This widget will show some time in seconds..
blynk.virtual_write(2, time.ticks_ms() // 1000)
# Start Blynk (this call should never return)
def my_user_task():
# do any non-blocking operations
print('Action')
blynk.virtual_write(2, time.ticks_ms() // 1000)
def sleep_from_until (start, delay):
while time.ticks_diff(start, time.ticks_ms()) < delay:
idle_func()
return start + delay
def _run_task(self):
if self._task:
c_millis = time.ticks_ms()
if c_millis - self._task_millis >= self._task_period:
self._task_millis += self._task_period
self._task()
def v0_read_handler():
# we must call virtual write in order to send the value to the widget
blynk.virtual_write(0, time.ticks_ms() // 1000)
print('v0')
# register the virtual pin
def sleep_from_until (start, delay):
## while time.ticks_diff(start, time.ticks_ms()) < delay:
while time.ticks_diff(time.ticks_ms(),start) < delay:
machine.idle()
return start + delay
def _run_task(self):
if self._task:
c_millis = time.ticks_ms()
if c_millis - self._task_millis >= self._task_period:
self._task_millis += self._task_period
self._task()
def mensagem_recebida(topico, payload):
print('Alerta!')
# Guardar tempo de início da mensagem
comeco = time.ticks_ms()
# Enquanto o valor da sirene não for atingido, alternamos os leds vermelho e azul
while comeco + tempo_sirene > time.ticks_ms():
led_vermelho.value(time.tick_ms() % 500 > 250)
led_azul.value(time.tick_ms() % 500 <= 250)
# Conectar-se ao broker MQTT
def go_sleep(sleep_minutes):
"""Perform deepsleep to save energy."""
rtc = machine.RTC()
rtc.irq(trigger=rtc.ALARM0, wake=machine.DEEPSLEEP)
# ticks_ms is used to make wake up period more consistent
sleep_seconds = (sleep_minutes * 60) - (ticks_ms() // 1000)
rtc.alarm(rtc.ALARM0, sleep_seconds * 1000)
print(str(sleep_seconds // 60) +
":" + str(sleep_seconds % 60) +
" minutes deep sleep NOW!")
machine.deepsleep()
def _trigger_next_turn(self):
self.turn_start=time.ticks_ms()
if len(self.angle_list)>0:
self.write_angle(self.angle_list[0])
del self.angle_list[0]
def _update(self):
if self.turn_start is not None: # turn in process
current=time.ticks_ms()
if time.ticks_diff(current, self.turn_start) >= self.turn_time_ms:
if len(self.angle_list) > 0:
self._trigger_next_turn()
else:
self._release()
def do_later(time_delta_s, callback, id=None):
delta = int(time_delta_s * 1000)
later = time.ticks_add(time.ticks_ms(), delta)
if id is None:
id = hash((hash(callback), later))
_timestack[id] = (later, callback)
return id
def run(updates=5, sleepms=1, poll_rate_inputs=4, poll_rate_network=10):
# updates: send out a status every this amount of seconds.
# If 0, never send time based status updates only when change happened.
# sleepms: going o sleep for how long between each loop run
# poll_rate_network: how often to evaluate incoming network commands
# poll_rate_inputs: how often to evaluate inputs
_init_mqtt()
if updates == 0:
overflow = 1000
else:
overflow = updates * 1000
overflow *= poll_rate_network * poll_rate_inputs / sleepms
overflow = int(overflow)
# print("Overflow:",overflow)
counter = 0
while True:
if counter % poll_rate_network == 0:
_poll_subscripton()
if counter % poll_rate_inputs == 0:
device_list = []
for d in _devlist.values():
if d.update():
device_list.append(d)
if len(device_list) > 0:
_publish_status(device_list)
if updates != 0 and counter % (updates * 1000) == 0:
print("Publishing full update.")
_publish_status(ignore_time=True)
# execute things on timestack
now = time.ticks_ms()
for id in list(_timestack):
t, cb = _timestack[id]
if time.ticks_diff(now, t) >= 0:
del (_timestack[id])
cb(id)
time.sleep_ms(sleepms)
counter += 1
if counter >= overflow:
counter = 0
def __init__(self, name, pin, dht_dev, delay,
on_change=None,report_change=True):
self.delay = delay
import dht
self.dht = dht_dev
self.lasttime = time.ticks_ms()
self.dht.measure()
Device.__init__(self, name, pin, on_change=on_change,
getters={"temperature":self.temperature,
"humidity":self.humidity},
report_change=report_change)
def time_controlled_measure(self):
newtime = time.ticks_ms()
if newtime - self.lasttime < 0 or newtime - self.lasttime > DS18X20.MEASURE_DELAY:
self.temp_list = []
for rom in self.roms:
self.temp_list.append(self.ds.read_temp(rom))
if len(self.temp_list)==1:
self.temp_list = self.temp_list[0]
self.ds.convert_temp()
self.lasttime = newtime
def _update(self):
t = time.ticks_ms()
#if self.suspend_start is not None \
# and time.ticks_diff(t,self.suspend_start) <= self.suspend_time:
# print("Bus access suspended.")
if self.suspend_start is None \
or time.ticks_diff(t,self.suspend_start) > self.suspend_time:
self.suspend_start = None # done waiting
try:
if self.msgq is not None:
self.pin.writeto(self.addr, self.msgq)
self.msgq = None
s = self.pin.readfrom(self.addr,self.BUFFER_SIZE)
except:
print("Trouble accessing i2c.")
else:
if s[0]==255 and s[1]==255: # illegal read
print("Got garbage, waiting 1s before accessing bus again.")
print("data:", s)
self.suspend_time=1000 # take a break
self.suspend_start = time.ticks_ms();
else:
count = s[0]*256+s[1]
if self.count != count:
l = s[2];
self.suspend_time = s[3];
# scale timer
if self.suspend_time & 128:
self.suspend_time = (self.suspend_time & 127) * 100
if self.suspend_time>5000: # should not happen
print("Garbage time -> waiting 1s before accessing bus again.")
print("data:",s)
self.suspend_time=1000
if self.suspend_time > 0:
self.suspend_start = time.ticks_ms();
print("Bus suspend for",self.suspend_time,"ms requested.")
if l <= self.BUFFER_SIZE: # discard if too big
self.count = count
self.current_value = s[4:4+l]
def _update(self):
if ticks_diff(ticks_ms(), self._last_measured) > self.INTERVAL:
value = self._measure()
if abs(value - self._distance) >= self.precision:
self._distance = value
self._last_measured = ticks_ms()
def __init__(self, device, command_str):
self.program = command_str.split()
self.device = device
self.playing = True
self.program = command_str.split()
self.step = 0
self.fade_goals = {}
self.length = 0
self.starttime = 0
self.delta = 0
self.lasttime = time.ticks_ms()
self.effect = None
def _send(self):
# TODO: does this need to be unblocking?
#dp.println("s {},{},{}".format(self.out_fill,int(self.out_buffer[0]),int(self.out_buffer[1]))) # debug
self.cs.send(self.out_buffer,length=self.out_fill)
self.out_fill = 0
self.out_last_sent = time.ticks_ms()
def flush(self):
t = time.ticks_ms()
if self.out_fill == 0: # reset time, if there is nothing to send
self.out_last_sent = t
# debug dp.println("rt {}".format(t))
elif time.ticks_diff(t, self.out_last_sent) > self.INTERVAL:
# debug dp.println("t {},{},{}".format(time.ticks_diff(t,self.out_last_sent),
# t,self.out_last_sent))
self.acquire_out_buffer()
self._send()
self.release_out_buffer()
def ticks_ms(): return int(time.time() * 1000)
def receive(self, request=0, timeoutms=None):
# receive into network buffer,
# fill buffer once and decrypt
# if request>0 wait blocking for request number of bytes (if timeout
# given interrupt after timeoutms ms)
# timeout==None: block
# timeout==0: return immediately after trying to read something from
# network buffer
# timeout>0: try for time specified to read something before returning
# this function always returns a pointer to the buffer and
# number of bytes read (could be 0)
data = self.netbuf_in
data_mv = self.netbuf_in_mv
readbytes = 0
start_t = ticks_ms()
while readbytes < self.netbuf_size:
try:
if self.sock_read(data_mv[readbytes:readbytes+1]):
readbytes += 1 # result was not 0 or none
else:
if readbytes >= request:
break # break if not blocking to request size
except OSError as e:
if len(e.args) > 0 \
and (e.args[0] == errno.EAGAIN
or e.args[0] == errno.ETIMEDOUT):
if readbytes >= request:
break # break if not blocking to request size
else:
raise
if timeoutms is not None \
and ticks_diff(ticks_ms(), start_t) >= timeoutms:
break
sleep_ms(1) # prevent busy waiting
if readbytes>0: self.crypt_in.decrypt(data,length=readbytes)
return data,readbytes
def time_controlled_measure():
global _lasttime
newtime = time.ticks_ms()
if newtime - _lasttime < 0 or newtime - _lasttime > MEASURE_DELAY:
d.measure()
_lasttime = newtime
def time_controlled_measure():
global _lasttime
newtime = time.ticks_ms()
if newtime - _lasttime < 0 or newtime - _lasttime > MEASURE_DELAY:
d.measure()
_lasttime = newtime
def makegauge(self):
'''
Generator refreshing the raw measurments.
'''
delays = (5, 8, 14, 25)
while True:
self._bmp_i2c.writeto_mem(self._bmp_addr, 0xF4, bytearray([0x2E]))
t_start = time.ticks_ms()
while (time.ticks_ms() - t_start) <= 5: # 5mS delay
yield None
try:
self.UT_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF6, 2)
except:
yield None
self._bmp_i2c.writeto_mem(self._bmp_addr, 0xF4, bytearray([0x34+(self.oversample_setting << 6)]))
t_pressure_ready = delays[self.oversample_setting]
t_start = time.ticks_ms()
while (time.ticks_ms() - t_start) <= t_pressure_ready:
yield None
try:
self.MSB_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF6, 1)
self.LSB_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF7, 1)
self.XLSB_raw = self._bmp_i2c.readfrom_mem(self._bmp_addr, 0xF8, 1)
except:
yield None
yield True