def pulse(led, delay):
for i in range(200):
led.duty(int(math.sin(i/10*math.pi)*500 + 500))
time.sleep_ms(delay)
python类sleep_ms()的实例源码
def pulse(led, delay):
for i in range(20):
led.duty(int(math.sin(i/10*math.pi)*500 + 500))
time.sleep_ms(delay)
def sleep_ms(self, mseconds):
try:
time.sleep_ms(mseconds)
except AttributeError:
machine.delay(mseconds)
def power_off(self):
self.clear()
self.command([0x20, 0x08])
# 0x20 - basic instruction set
# 0x08 - set display to blank (doesn't delete contents)
self.sleep_ms(10)
if self.pwr:
self.pwr.value(0) # turn off power
def beacon():
fader.stop()
for repeat in range(0, 10):
for pos in range(0, NEOPIXEL_COUNT):
for i in range(0, NEOPIXEL_COUNT):
if i == pos:
np[i] = (249, 72, 119)
else:
np[i] = (0, 0, 0)
np.write()
time.sleep_ms(100)
fader.start()
def read(self, raw=False):
was_active = self.active()
self.active(True)
while not self._valid():
time.sleep_ms(int(self._integration_time + 0.9))
data = tuple(self._register16(register) for register in (
_REGISTER_RDATA,
_REGISTER_GDATA,
_REGISTER_BDATA,
_REGISTER_CDATA,
))
self.active(was_active)
if raw:
return data
return self._temperature_and_lux(data)
ssd1306.py 文件源码
项目:SX127x_driver_for_MicroPython_on_ESP8266
作者: Wei1234c
项目源码
文件源码
阅读 18
收藏 0
点赞 0
评论 0
def poweron(self):
self.res(1)
time.sleep_ms(1)
self.res(0)
time.sleep_ms(10)
self.res(1)
ssd1306.py 文件源码
项目:SX127x_driver_for_MicroPython_on_ESP8266
作者: Wei1234c
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def poweron(self):
self.res(1)
time.sleep_ms(1)
self.res(0)
time.sleep_ms(10)
self.res(1)
def run(updates=5, sleepms=1, poll_rate_inputs=4, poll_rate_network=10):
# updates: send out a status every this amount of seconds.
# If 0, never send time based status updates only when change happened.
# sleepms: going o sleep for how long between each loop run
# poll_rate_network: how often to evaluate incoming network commands
# poll_rate_inputs: how often to evaluate inputs
_init_mqtt()
if updates == 0:
overflow = 1000
else:
overflow = updates * 1000
overflow *= poll_rate_network * poll_rate_inputs / sleepms
overflow = int(overflow)
# print("Overflow:",overflow)
counter = 0
while True:
if counter % poll_rate_network == 0:
_poll_subscripton()
if counter % poll_rate_inputs == 0:
device_list = []
for d in _devlist.values():
if d.update():
device_list.append(d)
if len(device_list) > 0:
_publish_status(device_list)
if updates != 0 and counter % (updates * 1000) == 0:
print("Publishing full update.")
_publish_status(ignore_time=True)
# execute things on timestack
now = time.ticks_ms()
for id in list(_timestack):
t, cb = _timestack[id]
if time.ticks_diff(now, t) >= 0:
del (_timestack[id])
cb(id)
time.sleep_ms(sleepms)
counter += 1
if counter >= overflow:
counter = 0
def acquire_out_buffer(self):
while self.out_buffer_lock == True:
time.sleep_ms(1) # Wait for release
self.irqstate=machine.disable_irq()
if self.out_buffer_lock == True: # TODO: check if this locking is enough
machine.enable_irq(self.irqstate)
return False
self.out_buffer_lock=True
return True
def sleep_ms(t): time.sleep(t/1000)
def receive(self, request=0, timeoutms=None):
# receive into network buffer,
# fill buffer once and decrypt
# if request>0 wait blocking for request number of bytes (if timeout
# given interrupt after timeoutms ms)
# timeout==None: block
# timeout==0: return immediately after trying to read something from
# network buffer
# timeout>0: try for time specified to read something before returning
# this function always returns a pointer to the buffer and
# number of bytes read (could be 0)
data = self.netbuf_in
data_mv = self.netbuf_in_mv
readbytes = 0
start_t = ticks_ms()
while readbytes < self.netbuf_size:
try:
if self.sock_read(data_mv[readbytes:readbytes+1]):
readbytes += 1 # result was not 0 or none
else:
if readbytes >= request:
break # break if not blocking to request size
except OSError as e:
if len(e.args) > 0 \
and (e.args[0] == errno.EAGAIN
or e.args[0] == errno.ETIMEDOUT):
if readbytes >= request:
break # break if not blocking to request size
else:
raise
if timeoutms is not None \
and ticks_diff(ticks_ms(), start_t) >= timeoutms:
break
sleep_ms(1) # prevent busy waiting
if readbytes>0: self.crypt_in.decrypt(data,length=readbytes)
return data,readbytes
def sub_cb(topic, msg):
if lcd.cursor_y * lcd.num_columns + lcd.cursor_x + len(str(msg)) > (lcd.num_lines * lcd.num_columns):
lcd.clear()
lcd.move_to(0,0)
lcd.putstr("Puffer[" + str(topic, "utf-8").split("/")[-1] + "]:" + str(msg, "utf-8") + "\n")
time.sleep_ms(3000)
def main():
c.set_callback(sub_cb)
c.connect()
#c.subscribe(CLIENT_ID + "/display")
c.subscribe("esp8266-35cacf00/temperature/0")
c.subscribe("esp8266-35cacf00/temperature/1")
c.subscribe("esp8266-35cacf00/temperature/2")
while True:
c.check_msg()
time.sleep_ms(500)
c.disconnect()
def read_temps():
"""Read DS18B20's."""
dat = machine.Pin(PIN_1W)
ds = ds18x20.DS18X20(onewire.OneWire(dat))
ds.convert_temp()
time.sleep_ms(750)
return {rom_to_hex(rom): ds.read_temp(rom) for rom in ds.scan()}
def _read(self):
was_active = self.active()
self.active(True)
if not was_active:
# if the sensor was off, wait for measurement
time.sleep_ms(_INTEGRATION_TIME[self._integration_time][1])
broadband = self._register16(_REGISTER_CHANNEL0)
ir = self._register16(_REGISTER_CHANNEL1)
self.active(was_active)
return broadband, ir
def sleep_ms(self, mseconds):
try:
time.sleep_ms(mseconds)
except AttributeError:
machine.delay(mseconds)
def power_off(self):
self.clear()
self.set_power(self.POWER_DOWN)
self.sleep_ms(10)
if self.pwr:
self.pwr.value(0) # turn off power
def led_state():
p2 = Pin(2, Pin.OUT)
p2.value(0)
time.sleep_ms(500)
p2.value(1)
time.sleep_ms(500)
p2.value(0)
time.sleep_ms(500)
p2.value(1)
def sleep_ms(self, mseconds):
try:
time.sleep_ms(mseconds)
except AttributeError:
machine.delay(mseconds)