def disable_wifi_ap():
# Disable the built-in access point.
wlan = network.WLAN(network.AP_IF)
wlan.active(False)
print('Disabled access point, network status is %s' %
wlan.status())
python类WLAN的实例源码
def disable_wifi_ap():
# Disable the built-in access point.
wlan = network.WLAN(network.AP_IF)
wlan.active(False)
print('Disabled access point, network status is %s' %
wlan.status())
def getClientID():
from ubinascii import hexlify
import network
return "ESP_" + str(hexlify(network.WLAN().config('mac')).decode()[6:].upper(), "utf-8")
def periodic_tasks():
# Called every few seconds
# Check if the time has been set.
epoch = time.mktime((2016,1,1,0,0,0,0,0,0))
time_is_set = ( time.time() > epoch )
if not time_is_set:
# Check if the network STA is configured.
sta = network.WLAN(network.STA_IF)
if sta.isconnected():
#Trying to sync with ntp
try:
ntptime.settime()
except OSError:
print("Failed to sync with ntp")
def wait_connect():
"""Wait for wifi to be connected before attempting mqtt."""
for x in range(CONNECT_WAIT):
wlan = network.WLAN(network.STA_IF)
if wlan.isconnected():
print(wlan.ifconfig())
return True
time.sleep(1)
else:
return False
def WorkMode_STA(essid,pwd):
sta_mode=network.WLAN(network.STA_IF)#???????STA??
sta_mode.active(True)#????
sta_mode.connect(essid,pwd)#essid:WIFI?? pwd:WIFI??
#???????????
while not sta_mode.isconnected():
pass
print('IP:', sta_mode.ifconfig()[0])#??IP??
#ifconfig() (ip, subnet, gateway, dns)
return sta_mode.ifconfig()[0]
def WorkMode_AP(essid,pwd):
ap_mode=network.WLAN(network.AP_IF)#???????AP??
ap_mode.active(True)#????
ap_mode.config(essid,pwd)#??WIFI??,essid:WIFI?? pwd:WIFI??
print('IP:', ap_mode.ifconfig()[0])#??IP??
return ap_mode.ifconfig()[0]
def do_connect():
sta_if = network.WLAN(network.STA_IF)
p2 = Pin(2, Pin.OUT)
sta_if.active(False)
if not sta_if.isconnected():
p2.low()
print('connecting to network...')
sta_if.active(True)
sta_if.connect('TurnipSmart', 'turnip2016')
while not sta_if.isconnected():
pass
if sta_if.isconnected():
print('connect success')
p2.high()
print('network config:', sta_if.ifconfig())
def wifi_config():
global cfg
global client
global CLIENT_ID
print('load configuration')
f = open('config.json')
cfg = json.loads(f.read())
f.close()
print(cfg)
print('starting wifi connection')
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
wlan.connect(cfg['ap']['ssid'], cfg['ap']['pwd'])
while not wlan.isconnected():
print('wait connection...')
await asyncio.sleep(1)
wlan.ifconfig()
mqtt_cfg = cfg['mqtt']
client = MQTTClient(
CLIENT_ID, mqtt_cfg["broker"],
port=mqtt_cfg["port"],
keepalive=mqtt_cfg["keepalive"])
will_msg = {'id': CLIENT_ID,
'status': False,
'msg': 'The connection from this device is lost:('
}
client.set_last_will('/device/will/status', json.dumps(will_msg))
client.set_callback(on_message)
client.connect()
client.subscribe('/device/{0}/switch'.format(CLIENT_ID.decode("utf-8")), 0)
def setup():
global d
d = dht.DHT22(Pin(2))
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
cfg = load_config()
ap = cfg['ap']
wlan.connect(ap['ssid'], ap['pwd'])
while not wlan.isconnected():
machine.idle()
wifi(cfg['mqtt'])
def ifconfig():
import network
sta_if = network.WLAN(network.STA_IF)
if sta_if.active() :
(ip, subnet, gateway, dns) = sta_if.ifconfig()
print("Station:\n\tip: {}\n\tsubnet: {}\n\tgateway: {}\n\tdns: {}".format(ip, subnet, gateway, dns))
ap_if = network.WLAN(network.AP_IF)
if ap_if.active() :
(ip, subnet, gateway, dns) = ap_if.ifconfig()
print("Access Point:\n\tip: {}\n\tsubnet: {}\n\tgateway: {}\n\tdns: {}".format(ip, subnet, gateway, dns))
def get_sta_stats(self):
sta = network.WLAN(network.STA_IF)
return self.get_wlan_stats(sta)
def get_ap_stats(self):
ap = network.WLAN(network.AP_IF)
wlan_stats = self.get_wlan_stats(ap)
wlan_stats['config'] = self.get_wlan_config_stats(ap)
return wlan_stats
def save_ap_config(self, api_request):
config = api_request['body']
ap = network.WLAN(network.AP_IF)
logging.info("config: {}".format(config))
ap.config(
#mac=config['mac'],
essid=config['essid'],
channel=config['channel'],
hidden=config['hidden']
)
return self.get_wlan_config_stats(ap)
def get_sta_stats(self):
sta = network.WLAN(network.STA_IF)
return self.get_wlan_stats(sta)
def main(self):
loop.create_task(self.led_ctrl())
sta_if = WLAN(STA_IF)
conn = False
while not conn:
while not sta_if.isconnected():
await asyncio.sleep(1)
self.dprint('Awaiting WiFi.') # Repeats forever if no stored connection.
await asyncio.sleep(3)
try:
await self.connect()
conn = True
except OSError:
self.close() # Close socket
self.dprint('Awaiting broker.')
self.dprint('Starting.')
self.outage = False
n = 0
while True:
await asyncio.sleep(60)
gc.collect() # For RAM stats.
msg = 'Mins: {} repubs: {} outages: {} RAM free: {} alloc: {} Longest outage: {}s'.format(
n, self.REPUB_COUNT, self.outages, gc.mem_free(), gc.mem_alloc(), self.max_outage)
self.pub_msg('debug', msg)
n += 1
# Topic names in dict enables multiple Sonoff units to run this code. Only main.py differs.
def do_connect():
import network
wlan = network.WLAN(network.STA_IF)
wlan.active(True)
if not wlan.isconnected():
print('connecting to network...')
wlan.connect("MY_WIFI_SSID", "MY_PASSWORD")
while not wlan.isconnected():
pass
print('network config:', wlan.ifconfig())
def __init__(self,bb_mqtt_id="None",mqtt_broker="None",mqtt_port=0,mqtt_user="None",mqtt_passwd="None",topic="None"):
self.relay_one = Pin(12,Pin.OUT)
self.relay_two = Pin(13,Pin.OUT)
self.bb_mqtt_id = "ElectroDragon"
self.mqtt_broker = mqtt_broker
self.mqtt_port = mqtt_port
self.mqtt_user = mqtt_user
self.mqtt_passwd = mqtt_passwd
self.topic = topic
self.mqtt = MQTTClient(self.bb_mqtt_id,self.mqtt_broker,self.mqtt_port,self.mqtt_user,self.mqtt_passwd)
self.sta_if = network.WLAN(network.STA_IF)
self.programs = {b'fermentation':[18.0,22.0],b'maturation':[0.0,2.0],b'priming':[23.0,25.0]}
def __init__(self, bb_mqtt_id="None", mqtt_broker="None", mqtt_port=0, mqtt_user="None", mqtt_passwd="None",
step="None"):
self.MINUTES_TO_SLEEP = 1
self.SLEEP_TIME = self.MINUTES_TO_SLEEP * 10 * 1000
self.DO_SLEEP = False
self.MATURATION = 1
self.BREWING = 0
self.STEP = step
self.mqtt_broker = mqtt_broker
self.mqtt_port = mqtt_port
self.mqtt_user = mqtt_user
self.mqtt_passwd = mqtt_passwd
self.mqtt_id = bb_mqtt_id
self.sta_if = network.WLAN(network.STA_IF)
# fermentacao
self.brew_low = 17
self.brew_ok = 20
self.brew_bit_high = 22
self.brew_very_high = 23
# maturacao
self.mat_low = 4
self.mat_ok = 6
self.mat_bit_high = 8
self.mat_very_high = 9
# temps: low, ok, bit high, high
self.tempFor = {self.BREWING: [17, 20, 22, 23], self.MATURATION: [0, 4, 7, 9]}
self.myMQTT = MQTTClient(self.mqtt_id, self.mqtt_broker, self.mqtt_port, self.mqtt_user, self.mqtt_passwd)
def __init__(self, bb_mqtt_id="None", mqtt_broker="None", mqtt_port=0, mqtt_user="None", mqtt_passwd="None",
topic="None"):
self.relay_one = Pin(12, Pin.OUT)
self.relay_two = Pin(13, Pin.OUT)
self.bb_mqtt_id = "ElectroDragon"
self.mqtt_broker = mqtt_broker
self.mqtt_port = mqtt_port
self.mqtt_user = mqtt_user
self.mqtt_passwd = mqtt_passwd
self.topic = topic
self.mqtt = MQTTClient(self.bb_mqtt_id, self.mqtt_broker, self.mqtt_port, self.mqtt_user, self.mqtt_passwd)
self.sta_if = network.WLAN(network.STA_IF)
self.programs = {'fermentation': [18.0, 22.0], 'maturation': [0.0, 2.0], 'priming': [20.0, 23.0]}
self.style = 'fermentation'
self.programLoad()
self.MINIMUM = self.programs[self.style][0]
self.MAXIMUM = self.programs[self.style][1]
self.relay_status = "OFF"
print("Waiting IP...")
size = 0
while size < 11:
try:
size = len(self.sta_if.ifconfig()[0])
time.sleep_ms(80)
except:
size = 0
self.mqtt.connect()