def setup_conn(port, accept_handler):
global listen_s
listen_s = socket.socket()
listen_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ai = socket.getaddrinfo("0.0.0.0", port)
addr = ai[0][4]
listen_s.bind(addr)
listen_s.listen(1)
if accept_handler:
listen_s.setsockopt(socket.SOL_SOCKET, 20, accept_handler)
for i in (network.AP_IF, network.STA_IF):
iface = network.WLAN(i)
if iface.active():
print("WebREPL daemon started on ws://%s:%d" % (iface.ifconfig()[0], port))
return listen_s
python类STA_IF的实例源码
def connect(ssid,auth,timeout=16000):
from network import WLAN, STA_IF, AP_IF
global uplink
uplink = WLAN(STA_IF)
uplink.active(True)
uplink.connect(ssid, auth)
started= ticks_ms()
while True:
if uplink.isconnected():
return True
else:
if ticks_diff(ticks_ms(), started) < timeout:
sleep_ms(100)
continue
else:
return False
def scan_wifi(self):
import network
return Board.scan_wifi(self, network.STA_IF)
# @timed_function
def connect_to_wifi(self, ssid, password, wait_for_ip):
import network
return Board.connect_to_wifi(self, ssid, password, network.STA_IF, wait_for_ip)
def __init__(self):
self.timeout = 60
self.net = WLAN(STA_IF)
self._ssid = self._password = None
self.token = None
def start(port=23,key=None,nostop=False): # TODO: take simpler default key as it will be reset
global _server_socket, netrepl_cfg
if nostop: # we want to check if it's already running and not restart it
if _server_socket: # not none
return # no new intialization _> stop here
stop()
if key is None:
key=netrepl_cfg.key
if key is None or len(key)==0:
key=bytearray(32) # empty default key
elif len(key) == 64:
key=ubinascii.unhexlify(key)
netrepl_cfg.key = key
# will be initialized after connection
# cc_out = chacha.ChaCha(key, bytearray(8))
_server_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_server_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ai = socket.getaddrinfo("0.0.0.0", port)
addr = ai[0][4]
_server_socket.bind(addr)
_server_socket.listen(1)
_server_socket.setsockopt(socket.SOL_SOCKET, 20, accept_telnet_connect)
for i in (network.AP_IF, network.STA_IF):
wlan = network.WLAN(i)
if wlan.active():
print("\nnetrepl: UlnoIOT netrepl server started on {}:{}".format(wlan.ifconfig()[0], port))
def main(self):
loop.create_task(self.led_ctrl())
sta_if = WLAN(STA_IF)
conn = False
while not conn:
while not sta_if.isconnected():
await asyncio.sleep(1)
self.dprint('Awaiting WiFi.') # Repeats forever if no stored connection.
await asyncio.sleep(3)
try:
await self.connect()
conn = True
except OSError:
self.close() # Close socket
self.dprint('Awaiting broker.')
self.dprint('Starting.')
self.outage = False
n = 0
while True:
await asyncio.sleep(60)
gc.collect() # For RAM stats.
msg = 'Mins: {} repubs: {} outages: {} RAM free: {} alloc: {} Longest outage: {}s'.format(
n, self.REPUB_COUNT, self.outages, gc.mem_free(), gc.mem_alloc(), self.max_outage)
self.pub_msg('debug', msg)
n += 1
# Topic names in dict enables multiple Sonoff units to run this code. Only main.py differs.
def setup_conn(self,port):
self.listen_s = socket.socket()
self.listen_s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
ai = socket.getaddrinfo("0.0.0.0", self.port)
addr = ai[0][4]
self.listen_s.bind(addr)
self.listen_s.listen(1)
self.listen_s.setsockopt(socket.SOL_SOCKET, 20, self.accept_conn)
for i in (network.AP_IF, network.STA_IF):
iface = network.WLAN(i)
if iface.active():
print("Server {} started on {}:{}".format(self.__name__,iface.ifconfig()[0], port))
def do_get(clisock, uri, content_length):
clisock.write(b'HTTP/1.0 200 OK\r\n'
b'Content-type: text/html; charset=utf-8\r\n'
b'\r\n')
clisock.write(b'<!DOCTYPE html><html><head><title>Current time</title></head>')
clisock.write(b'<body>The current time is: ')
timestr ='{}-{:02d}-{:02d} {:02d}:{:02d}:{:02d}'.format(*time.localtime())
clisock.write(timestr.encode('ascii'))
del timestr
uptime_s = int(time.ticks_ms() / 1000)
uptime_h = int(uptime_s / 3600)
uptime_m = int(uptime_s / 60)
uptime_m = uptime_m % 60
uptime_s = uptime_s % 60
clisock.write(b'<p>Uptime: {:02d}h {:02d}:{:02d}'.format(
uptime_h, uptime_m, uptime_s))
clisock.write(b'<p>Flash ID: {:x}'.format(esp.flash_id()))
clisock.write(b'<p>Flash size: {:d}'.format(esp.flash_size()))
clisock.write(b'<p>Python: {:s} on {:s} '.format(str(sys.implementation), sys.platform))
clisock.write(b'<p>Unique ID: ')
for b in machine.unique_id():
clisock.write(b'{:02x}'.format(b))
clisock.write(b'\n<h2>Network interfaces</h2>\n')
clisock.write(b'\n<table><tr><th>mac<th>active</th><th>connected</th><th>IP</th><th>Gateway</th>')
for i in network.STA_IF, network.AP_IF:
wlan = network.WLAN(i)
# Show MAC address.
clisock.write(b'<tr>')
clisock.write(b'<td>')
for b in wlan.config('mac'):
clisock.write(b'{:02X}'.format(b))
clisock.write(b'<td>{:}</td>'.format(wlan.active()))
clisock.write(b'<td>{:}</td>'.format(wlan.isconnected()))
ifconfig = wlan.ifconfig() #ip, netmask, gateway, dns
ifconfig = (ifconfig[0], ifconfig[2]) # ip, gw
for item in ifconfig:
clisock.write(b'<td>{:}</td>'.format(item))
clisock.write(b'\n</table>\n')