def do_connect(ssid, pwd, TYPE, hard_reset=True):
interface = network.WLAN(TYPE)
# Stage zero if credential are null disconnect
if not pwd or not ssid :
print('Disconnecting ', TYPE)
interface.active(False)
return None
if TYPE == network.AP_IF:
interface.active(True)
time.sleep_ms(200)
interface.config(essid=ssid, password=pwd)
return interface
if hard_reset:
interface.active(True)
interface.connect(ssid, pwd)
# Stage one check for default connection
print('Connecting')
for t in range(120):
time.sleep_ms(250)
if interface.isconnected():
print('Yes! Connected')
return interface
if t == 60 and not hard_reset:
# if still not connected
interface.active(True)
interface.connect(ssid, pwd)
# No way we are not connected
print('Cant connect', ssid)
return None
#----------------------------------------------------------------
# MAIN PROGRAM STARTS HERE
python类sleep_ms()的实例源码
def deauth(_ap,_client,type,reason):
# 0 - 1 type, subtype c0: deauth (a0: disassociate)
# 2 - 3 duration (SDK takes care of that)
# 4 - 9 reciever (target)
# 10 - 15 source (ap)
# 16 - 21 BSSID (ap)
# 22 - 23 fragment & squence number
# 24 - 25 reason code (1 = unspecified reason)
packet=bytearray([0xC0,0x00,0x00,0x00,0xBB,0xBB,0xBB,0xBB,0xBB,0xBB,0xCC, 0xCC, 0xCC, 0xCC, 0xCC, 0xCC,0xCC, 0xCC, 0xCC, 0xCC, 0xCC, 0xCC,0x00, 0x00,0x01, 0x00])
for i in range(0,6):
packet[4 + i] =_client[i]
packet[10 + i] = packet[16 + i] =_ap[i]
#set type
packet[0] = type;
packet[24] = reason
result=sta_if.send_pkt_freedom(packet)
if result==0:
time.sleep_ms(1)
return True
else:
return False
def read_temp():
# the device is on GPIO12
dat = machine.Pin(5)
# create the onewire object
ds = ds18x20.DS18X20(onewire.OneWire(dat))
# scan for devices on the bus
roms = ds.scan()
# print('found devices:', roms)
# print('temperature:', end=' ')
ds.convert_temp()
time.sleep_ms(750)
temp = ds.read_temp(roms[0])
# print(temp)
return temp
def active(self, value=None):
if value is None:
return self._active
value = bool(value)
if self._active == value:
return
self._active = value
enable = self._register8(_REGISTER_ENABLE)
if value:
self._register8(_REGISTER_ENABLE, enable | _ENABLE_PON)
time.sleep_ms(3)
self._register8(_REGISTER_ENABLE,
enable | _ENABLE_PON | _ENABLE_AEN)
else:
self._register8(_REGISTER_ENABLE,
enable & ~(_ENABLE_PON | _ENABLE_AEN))
def loop():
show_temp=True
while True:
g=get()
display.fill(0)
display.text("Weather in",0,0)
display.text(g['name'],0,8)
if left_button.value()==0 or right_button.value()==0:
show_temp=not show_temp
if show_temp:
celsius=g['main']['temp']-273.15
display.text("Temp/C: %.1f"%celsius,0,24)
display.text("Temp/F: %.1f"%(celsius*1.8+32),0,32)
else: # humidity
display.text("Humidity: %d%%"%g['main']['humidity'],0,24)
if lower_button.value()==0:
break
display.show()
time.sleep_ms(500)
def __init__(self, i2c, i2c_addr, num_lines, num_columns):
self.i2c = i2c
self.i2c_addr = i2c_addr
self.i2c.writeto(self.i2c_addr, bytes([0]))
sleep_ms(20) # Allow LCD time to powerup
# Send reset 3 times
self.hal_write_init_nibble(self.LCD_FUNCTION_RESET)
sleep_ms(5) # need to delay at least 4.1 msec
self.hal_write_init_nibble(self.LCD_FUNCTION_RESET)
sleep_ms(1)
self.hal_write_init_nibble(self.LCD_FUNCTION_RESET)
sleep_ms(1)
# Put LCD into 4 bit mode
self.hal_write_init_nibble(self.LCD_FUNCTION)
sleep_ms(1)
LcdApi.__init__(self, num_lines, num_columns)
cmd = self.LCD_FUNCTION
if num_lines > 1:
cmd |= self.LCD_FUNCTION_2LINES
self.hal_write_command(cmd)
def hal_write_command(self, cmd):
"""Writes a command to the LCD.
Data is latched on the falling edge of E.
"""
byte=((self.backlight << SHIFT_BACKLIGHT) |
(((cmd >> 4) & 0x0f) << SHIFT_DATA))
self.i2c.writeto(self.i2c_addr, bytes([byte | MASK_E]))
self.i2c.writeto(self.i2c_addr, bytes([byte]))
byte=((self.backlight << SHIFT_BACKLIGHT) |
((cmd & 0x0f) << SHIFT_DATA))
self.i2c.writeto(self.i2c_addr, bytes([byte | MASK_E]))
self.i2c.writeto(self.i2c_addr, bytes([byte]))
if cmd <= 3:
# The home and clear commands require a worst
# case sleep_ms of 4.1 msec
sleep_ms(5)
def readDS18x20():
from machine import Pin
import onewire, ds18x20
OneWirePin = 0
# the device is on GPIO12
dat = Pin(OneWirePin)
# create the onewire object
ds = ds18x20.DS18X20(onewire.OneWire(dat))
# scan for devices on the bus
roms = ds.scan()
ds.convert_temp()
time.sleep_ms(750)
values = []
for rom in roms:
values.append(ds.read_temp(rom))
print(values)
return values
def get_sensor_avg(self, samples, softstart=100):
'''Return the average readings from the sensors over the
given number of samples. Discard the first softstart
samples to give things time to settle.'''
sample = self.read_sensors()
counters = [0] * 7
for i in range(samples + softstart):
# the sleep here is to ensure we read a new sample
# each time
time.sleep_ms(2)
sample = self.read_sensors()
if i < softstart:
continue
for j, val in enumerate(sample):
counters[j] += val
return [x//samples for x in counters]
def measure(self):
buf = self.buf
address = self.address
# wake sensor
try:
self.i2c.writeto(address, b'')
except OSError:
pass
# read 4 registers starting at offset 0x00
self.i2c.writeto(address, b'\x03\x00\x04')
# wait at least 1.5ms
time.sleep_ms(2)
# read data
self.i2c.readfrom_mem_into(address, 0, buf)
crc = ustruct.unpack('<H', bytearray(buf[-2:]))[0]
if (crc != self.crc16(buf[:-2])):
raise Exception("checksum error")
def measure(self):
buf = self.buf
address = self.address
# wake sensor
try:
self.i2c.writeto(address, b'')
except OSError:
pass
# read 4 registers starting at offset 0x00
self.i2c.writeto(address, b'\x03\x00\x04')
# wait at least 1.5ms
time.sleep_ms(2)
# read data
self.i2c.readfrom_mem_into(address, 0, buf)
# debug print
print(ustruct.unpack('BBBBBBBB', buf))
crc = ustruct.unpack('<H', bytearray(buf[-2:]))[0]
if (crc != self.crc16(buf[:-2])):
raise Exception("checksum error")
def measure(self):
self.wakeup()
# request measure
wbuf = bytearray( 3 )
wbuf[0] = AM2315_READREG
wbuf[1] = 0x00 # Start at adress 0x00
wbuf[2] = 0x04 # Request 4 byte data
self.i2c.writeto( self.addr, wbuf ) # b'\x03\x00\x04' )
# wait 1.5+ ms before reading
time.sleep_ms( 2 )
# Request 8 bytes from sensor
#rbuf = bytearray( 8 )
self.i2c.readfrom_mem_into(self.addr, 0, self.rbuf) # Read from Reg 0
return self.check_response()
def status(self):
ow = onewire.OneWire(Pin(2))
ds = ds18x20.DS18X20(ow)
roms = ds.scan()
if not len(roms):
print("Nao encontrei um dispositivo onewire.")
return None
ds.convert_temp()
time.sleep_ms(750)
temp = 0
for i in range(10):
for rom in roms:
temp += ds.read_temp(rom)
return temp / 10
# envia a temperatura para o broker
def _init__(self, cspin):
self.cspin = Pin(cspin, mode=Pin.OUT)
self.cspin.value(1)
self.x = 0
self.y = 0
self.z = 0
self.t = 0
self.spi = SPI(0, mode=SPI.MASTER, baudrate=1000000, polarity=0, phase=0, firstbit=SPI.MSB)
time.sleep_ms(1000)
self.SPIwriteOneRegister(0x1F, 0x52) #RESET
time.sleep_ms(10)
self.DEVICEID = self.SPIreadOneRegister(0x00)
self.DEVID_MST = self.SPIreadOneRegister(0x01)
self.PART_ID = self.SPIreadOneRegister(0x02)
self.REV_ID = self.SPIreadOneRegister(0x03)
self.STATUS = self.getStatus()
def __init__(self, csPIN, ISRPin, ActivityThreshold=70, ActivityTime=5, InactivityThreshold=1000, InactivityTime=5):
self.steps = 0
ADXL362.__init__(self, csPIN)
self.isrpin = Pin(ISRPin, mode=Pin.IN)
self.isrpin.callback(trigger=Pin.IRQ_RISING, handler=self._isr_handler)
self.setupDCActivityInterrupt(ActivityThreshold, ActivityTime)
self.setupDCInactivityInterrupt(InactivityThreshold, InactivityTime)
#ACTIVITY/INACTIVITY CONTROL REGISTER
self.SPIwriteOneRegister(0x27, 0B111111) #B111111 Link/Loop:11 (Loop Mode), Inactive Ref Mode:1,Inactive Enable (under threshold):1,Active Ref Mode:1, Active Enable (over threshold):1
#MAP INTERRUPT 1 REGISTER.
self.SPIwriteOneRegister(0x2A,0B10010000) #B00010000 0=HIGH on ISR,AWAKE:0,INACT:0,ACT:1,FIFO_OVERRUN:0,FIFO_WATERMARK:0,FIFO_READY:0,DATA_READY:0
#POWER CONTROL REGISTER
self.SPIwriteOneRegister(0x2D, 0B1110) #B1011 Wake-up:1,Autosleep:0,measure:11=Reserved (Maybe works better)
self.beginMeasure()
time.sleep_ms(100)
def send_cmd(self, cmd_request, response_size=6, read_delay_ms=100):
"""
Send a command to the sensor and read (optionally) the response
The responsed data is validated by CRC
"""
try:
self.i2c.start();
self.i2c.writeto(self.i2c_addr, cmd_request);
if not response_size:
self.i2c.stop();
return
time.sleep_ms(read_delay_ms)
data = self.i2c.readfrom(self.i2c_addr, response_size)
self.i2c.stop();
for i in range(response_size//3):
if not self._check_crc(data[i*3:(i+1)*3]): # pos 2 and 5 are CRC
raise SHT30Error(SHT30Error.CRC_ERROR)
if data == bytearray(response_size):
raise SHT30Error(SHT30Error.DATA_ERROR)
return data
except OSError as ex:
if 'I2C' in ex.args[0]:
raise SHT30Error(SHT30Error.BUS_ERROR)
raise ex
def run(func):
result = func()
for wait in result:
time.sleep_ms(wait)
def _send(self, data, send_anyway=False):
if self._tx_count < MAX_MSG_PER_SEC or send_anyway:
retries = 0
while retries <= MAX_TX_RETRIES:
try:
self.conn.send(data)
self._tx_count += 1
break
except socket.error as er:
if er.args[0] != EAGAIN:
raise
else:
time.sleep_ms(RE_TX_DELAY)
retries += 1
def read(self, channel):
self._write_register(_REGISTER_CONFIG, _CQUE_NONE | _CLAT_NONLAT |
_CPOL_ACTVLOW | _CMODE_TRAD | _DR_1600SPS | _MODE_SINGLE |
_OS_SINGLE | _GAINS[self.gain] | _CHANNELS[channel])
while not self._read_register(_REGISTER_CONFIG) & _OS_NOTBUSY:
time.sleep_ms(1)
return self._read_register(_REGISTER_CONVERT)
def diff(self, channel1, channel2):
self._write_register(_REGISTER_CONFIG, _CQUE_NONE | _CLAT_NONLAT |
_CPOL_ACTVLOW | _CMODE_TRAD | _DR_1600SPS | _MODE_SINGLE |
_OS_SINGLE | _GAINS[self.gain] | _DIFFS[(channel1, channel2)])
while not self._read_register(_REGISTER_CONFIG) & _OS_NOTBUSY:
time.sleep_ms(1)
return self._read_register(_REGISTER_CONVERT)
def blink(n):
for i in range(n):
led.value(0)
time.sleep_ms(100)
led.value(1)
time.sleep_ms(100)
#if machine.reset_cause() == machine.SOFT_RESET:
# print ("Soft reset, doing nothing")
# print('Connected!! network config:', wifi.wlan.ifconfig())
def vpage(self,delay=10):
show = False
self.write_cmd(SET_START_LINE | 0)
for x in range(self.height-1):
self.write_cmd(SET_START_LINE | x)
if not show:
self.show()
show = True
time.sleep_ms(delay)
self.fill(0)
def reset(self, res):
res.value(1)
time.sleep_ms(1)
res.value(0)
time.sleep_ms(20)
res.value(1)
time.sleep_ms(20)
def vpage(self,delay=10):
show = False
self.write_cmd(SET_START_LINE | 0)
for x in range(self.height-1):
self.write_cmd(SET_START_LINE | x)
if not show:
self.show()
show = True
time.sleep_ms(delay)
self.fill(0)
def reset(self, res):
res.value(1)
time.sleep_ms(1)
res.value(0)
time.sleep_ms(20)
res.value(1)
time.sleep_ms(20)
def _send(self, data, send_anyway=False):
if self._tx_count < MAX_MSG_PER_SEC or send_anyway:
retries = 0
while retries <= MAX_TX_RETRIES:
try:
self.conn.send(data)
self._tx_count += 1
break
except socket.error as er:
if er.args[0] != EAGAIN:
raise
else:
time.sleep_ms(RE_TX_DELAY)
retries += 1
def calibrate(self, samples=300):
max_val = 0
for _ in range(samples):
val = self.pin()
if val > max_val:
max_val = val
time.sleep_ms(10)
self.threshold = max_val * 1.2
def __init__(self, i2c=None, sda='P22', scl='P21'):
if i2c is not None:
self.i2c = i2c
else:
self.i2c = I2C(0, mode=I2C.MASTER, pins=(sda, scl))
self.sda = sda
self.scl = scl
self.clk_cal_factor = 1
self.reg = bytearray(6)
self.wake_int = False
self.wake_int_pin = False
self.wake_int_pin_rising_edge = True
try:
self.read_fw_version()
except Exception:
time.sleep_ms(2)
try:
# init the ADC for the battery measurements
self.poke_memory(ANSELC_ADDR, 1 << 2)
self.poke_memory(ADCON0_ADDR, (0x06 << _ADCON0_CHS_POSN) | _ADCON0_ADON_MASK)
self.poke_memory(ADCON1_ADDR, (0x06 << _ADCON1_ADCS_POSN))
# enable the pull-up on RA3
self.poke_memory(WPUA_ADDR, (1 << 3))
# make RC5 an input
self.set_bits_in_memory(TRISC_ADDR, 1 << 5)
# set RC6 and RC7 as outputs and enable power to the sensors and the GPS
self.mask_bits_in_memory(TRISC_ADDR, ~(1 << 6))
self.mask_bits_in_memory(TRISC_ADDR, ~(1 << 7))
if self.read_fw_version() < 6:
raise ValueError('Firmware out of date')
except Exception:
raise Exception('Board not detected')
def activity(self):
if not self.debounced:
time.sleep_ms(self.act_dur)
self.debounced = True
if self.int_pin():
return True
return False
def activity(self):
if not self.debounced:
time.sleep_ms(self.act_dur)
self.debounced = True
if self.int_pin():
return True
return False