def templog(sleep=True):
"""Log voltage and temperature to MQTT."""
start = time.ticks_ms()
# get sensor values
values = read_voltage()
values.update(read_temps())
print(values)
# send values over MQTT if connected
if wait_connect():
if not mqtt_send(values):
machine.reset()
else:
# failed to connect, reboot
machine.reset()
if sleep:
delta = time.ticks_diff(start, time.ticks_ms())
deepsleep(delta)
python类ticks_diff()的实例源码
def __init__(self,pno=4,bsz=1024,dbgport=None):
self.BUFSZ=int(pow(2, math.ceil(math.log(bsz)/math.log(2))))
self.MASK=self.BUFSZ-1
self.pin=pno
self.ir=Pin(pno,Pin.IN)
self.buf=array.array('i',0 for i in range(self.BUFSZ))
self.ledge=0
self.wptr=0
self.rptr=0
self.dbgport=dbgport
if dbgport is None:
self.dbgport=DummyPort()
self.dbgport.low()
# cache ticks functions for native call
self.ticks_diff = time.ticks_diff
self.ticks_us = time.ticks_us
self.start()
def timed_function(f, *args, **kwargs):
import time
myname = str(f).split(' ')[1]
def new_func(*args, **kwargs):
t = time.ticks_us()
result = f(*args, **kwargs)
delta = time.ticks_diff(t, time.ticks_us())
log.debug('GC: {} Function {} Time = {:6.3f}ms'.format(str(gc.mem_free()), myname, delta/1000))
return result
return new_func
# @timed_function
def _read_timeout(cnt, timeout_ms=2000):
time_support = "ticks_ms" in dir(time)
s_time = time.ticks_ms() if time_support else 0
data = sys.stdin.read(cnt)
if len(data) != cnt or (time_support and time.ticks_diff(time.ticks_ms(), s_time) > timeout_ms):
return None
return data
def _read_timeout(cnt, timeout_ms=2000):
time_support = "ticks_ms" in dir(time)
s_time = time.ticks_ms() if time_support else 0
data = sys.stdin.read(cnt)
if len(data) != cnt or (time_support and time.ticks_diff(time.ticks_ms(), s_time) > timeout_ms):
return None
return data
def sleep_from_until (start, delay):
while time.ticks_diff(start, time.ticks_ms()) < delay:
idle_func()
return start + delay
def sleep_from_until (start, delay):
## while time.ticks_diff(start, time.ticks_ms()) < delay:
while time.ticks_diff(time.ticks_ms(),start) < delay:
machine.idle()
return start + delay
def _update(self):
if self.turn_start is not None: # turn in process
current=time.ticks_ms()
if time.ticks_diff(current, self.turn_start) >= self.turn_time_ms:
if len(self.angle_list) > 0:
self._trigger_next_turn()
else:
self._release()
def _publish_status(device_list=None, ignore_time=False):
global _client, _last_publish
if device_list is None:
device_list = _devlist.values()
current_time = time.ticks_us()
if ignore_time or \
time.ticks_diff(current_time,
_last_publish) >= MIN_PUBLISH_TIME_US:
_last_publish = current_time
try:
for d in device_list:
v = d.value()
if v is not None:
rt = (_topic + "/" + d.name).encode()
for s in d.getters:
if s == "":
t = rt
else:
t = rt + "/" + s.encode()
my_value = d.getters[s]()
print('Publishing', t, my_value)
if type(my_value) is bytes or type(
my_value) is bytearray:
_client.publish(t, my_value)
else:
_client.publish(t, str(my_value).encode())
except Exception as e:
print('Trouble publishing, re-init network.')
print(e)
_init_mqtt()
# ======= Setup and execution
def run(updates=5, sleepms=1, poll_rate_inputs=4, poll_rate_network=10):
# updates: send out a status every this amount of seconds.
# If 0, never send time based status updates only when change happened.
# sleepms: going o sleep for how long between each loop run
# poll_rate_network: how often to evaluate incoming network commands
# poll_rate_inputs: how often to evaluate inputs
_init_mqtt()
if updates == 0:
overflow = 1000
else:
overflow = updates * 1000
overflow *= poll_rate_network * poll_rate_inputs / sleepms
overflow = int(overflow)
# print("Overflow:",overflow)
counter = 0
while True:
if counter % poll_rate_network == 0:
_poll_subscripton()
if counter % poll_rate_inputs == 0:
device_list = []
for d in _devlist.values():
if d.update():
device_list.append(d)
if len(device_list) > 0:
_publish_status(device_list)
if updates != 0 and counter % (updates * 1000) == 0:
print("Publishing full update.")
_publish_status(ignore_time=True)
# execute things on timestack
now = time.ticks_ms()
for id in list(_timestack):
t, cb = _timestack[id]
if time.ticks_diff(now, t) >= 0:
del (_timestack[id])
cb(id)
time.sleep_ms(sleepms)
counter += 1
if counter >= overflow:
counter = 0
def _update(self):
t = time.ticks_ms()
#if self.suspend_start is not None \
# and time.ticks_diff(t,self.suspend_start) <= self.suspend_time:
# print("Bus access suspended.")
if self.suspend_start is None \
or time.ticks_diff(t,self.suspend_start) > self.suspend_time:
self.suspend_start = None # done waiting
try:
if self.msgq is not None:
self.pin.writeto(self.addr, self.msgq)
self.msgq = None
s = self.pin.readfrom(self.addr,self.BUFFER_SIZE)
except:
print("Trouble accessing i2c.")
else:
if s[0]==255 and s[1]==255: # illegal read
print("Got garbage, waiting 1s before accessing bus again.")
print("data:", s)
self.suspend_time=1000 # take a break
self.suspend_start = time.ticks_ms();
else:
count = s[0]*256+s[1]
if self.count != count:
l = s[2];
self.suspend_time = s[3];
# scale timer
if self.suspend_time & 128:
self.suspend_time = (self.suspend_time & 127) * 100
if self.suspend_time>5000: # should not happen
print("Garbage time -> waiting 1s before accessing bus again.")
print("data:",s)
self.suspend_time=1000
if self.suspend_time > 0:
self.suspend_start = time.ticks_ms();
print("Bus suspend for",self.suspend_time,"ms requested.")
if l <= self.BUFFER_SIZE: # discard if too big
self.count = count
self.current_value = s[4:4+l]
def _update(self):
if ticks_diff(ticks_ms(), self._last_measured) > self.INTERVAL:
value = self._measure()
if abs(value - self._distance) >= self.precision:
self._distance = value
self._last_measured = ticks_ms()
def flush(self):
t = time.ticks_ms()
if self.out_fill == 0: # reset time, if there is nothing to send
self.out_last_sent = t
# debug dp.println("rt {}".format(t))
elif time.ticks_diff(t, self.out_last_sent) > self.INTERVAL:
# debug dp.println("t {},{},{}".format(time.ticks_diff(t,self.out_last_sent),
# t,self.out_last_sent))
self.acquire_out_buffer()
self._send()
self.release_out_buffer()
def ticks_diff(a, b): return a - b
def receive(self, request=0, timeoutms=None):
# receive into network buffer,
# fill buffer once and decrypt
# if request>0 wait blocking for request number of bytes (if timeout
# given interrupt after timeoutms ms)
# timeout==None: block
# timeout==0: return immediately after trying to read something from
# network buffer
# timeout>0: try for time specified to read something before returning
# this function always returns a pointer to the buffer and
# number of bytes read (could be 0)
data = self.netbuf_in
data_mv = self.netbuf_in_mv
readbytes = 0
start_t = ticks_ms()
while readbytes < self.netbuf_size:
try:
if self.sock_read(data_mv[readbytes:readbytes+1]):
readbytes += 1 # result was not 0 or none
else:
if readbytes >= request:
break # break if not blocking to request size
except OSError as e:
if len(e.args) > 0 \
and (e.args[0] == errno.EAGAIN
or e.args[0] == errno.ETIMEDOUT):
if readbytes >= request:
break # break if not blocking to request size
else:
raise
if timeoutms is not None \
and ticks_diff(ticks_ms(), start_t) >= timeoutms:
break
sleep_ms(1) # prevent busy waiting
if readbytes>0: self.crypt_in.decrypt(data,length=readbytes)
return data,readbytes
def isr(self, pin):
# debounce
if time.ticks_diff(time.ticks_ms(), self.last_isr) < 10:
return
print('! reset gyro request')
self.flag_reset_gyro = True
self.last_isr = time.ticks_ms()
def serve(self):
print('starting mpu server on port {}'.format(self.port))
lastgc = lastsent = lastread = time.ticks_ms()
while True:
now = time.ticks_ms()
write_dt = time.ticks_diff(now, lastsent)
read_dt = time.ticks_diff(now, lastread)
gc_dt = time.ticks_diff(now, lastgc)
time.sleep_ms(max(0, 1-read_dt))
if self.flag_reset_gyro:
self.mpu.filter.reset_gyro()
self.flag_reset_gyro = False
values = self.mpu.read_position()
lastread = now
if write_dt >= self.write_interval:
lastsent = time.ticks_ms()
self.sock.sendto(tojson(values), ('192.168.4.2', 8000))
if gc_dt >= self.gc_interval:
gc.collect()
def input(self, vals):
now = time.ticks_ms()
# unpack sensor readings
accel_data = vals[0:3]
gyro_data = vals[4:7]
# convert accelerometer reading to degrees
self.accel_pos = self.calculate_accel_pos(*accel_data)
# if this is our first chunk of data, simply accept
# the accelerometer reads and move on.
if self.last == 0:
self.filter_pos = self.gyro_pos = self.accel_pos
self.last = now
return
# calculate the elapsed time (in seconds) since last data.
# we need this because the gyroscope measures movement in
# degrees/second.
dt = time.ticks_diff(now, self.last)/1000
self.last = now
# calculate change in position from gyroscope readings
gyro_delta = [i * dt for i in gyro_data]
self.gyro_pos = [i + j for i, j in zip(self.gyro_pos, gyro_delta)]
# pitch
self.filter_pos[0] = (
self.gyro_weight * (self.filter_pos[0] + gyro_delta[0])
+ (1-self.gyro_weight) * self.accel_pos[0])
# roll
self.filter_pos[1] = (
self.gyro_weight * (self.filter_pos[1] + gyro_delta[1])
+ (1-self.gyro_weight) * self.accel_pos[1])
def waiting(self):
acum = time.ticks_ms()
delta = time.ticks_diff(time.ticks_ms(),acum)
while delta < 5000:
delta = time.ticks_diff(time.ticks_ms(), acum)
time.sleep_ms(5)
self.DO_SLEEP = True
print("Ordem para dormir")
def timeseq_isr(self,p):
'''compute edge to edge transition time, ignore polarity'''
e=self.ticks_us()
self.dbgport.high()
s=2*p.value()-1
d=self.ticks_diff(e,self.ledge)
self.ledge=e
self.buf[self.wptr]=d*s
self.wptr=(self.wptr+1)&self.MASK
self.dbgport.low()
def __call__(self,pin):
self.stop=time.ticks_us()
self.flight=time.ticks_diff(self.stop,self.start)