def process_input(self, s, cl, addr):
print('client connected from', addr)
cl.settimeout(None)
if not hasattr(cl, 'readline'):
# CPython
client_stream = cl.makefile("rwb")
else:
# MicroPython
client_stream = cl
req = client_stream.readline()
print(req)
while True:
data = client_stream.readline()
if not data or data == b'\r\n':
break
method, path, protocol = req.split(b' ')
if method.lower() == b'get':
if path == b'/':
self.send(cl, 200, filename='index.html', content_type='text/html; charset=utf-8')
elif path == b'/commands':
commands = ', '.join('"' + c + '"' for c in sorted(WebApp.commands))
self.send(cl, 200, '[' + commands + ']', content_type='application/json')
else:
self.send(cl, 404)
elif method.lower() == b'post':
try:
func = WebApp.commands[path.lstrip(b'/').decode('ascii')]
except KeyError:
self.send(cl, 404)
return
self.send(cl, 200)
return func
else:
self.send(cl, 400)
#mem = gc.mem_alloc()
#gc.collect()
#print("Freeing", mem - gc.mem_alloc(), "now free", gc.mem_free())
python类mem_alloc()的实例源码
def _garbage_collect(self):
while True:
await asyncio.sleep_ms(100)
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
# Very basic window class. Cuts a rectangular hole in a screen on which content may be drawn
sx127x.py 文件源码
项目:SX127x_driver_for_MicroPython_on_ESP8266
作者: Wei1234c
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def collect_garbage(self):
gc.collect()
if config_lora.IS_MICROPYTHON:
print('[Memory - free: {} allocated: {}]'.format(gc.mem_free(), gc.mem_alloc()))
sx127x.py 文件源码
项目:SX127x_driver_for_MicroPython_on_ESP8266
作者: Wei1234c
项目源码
文件源码
阅读 22
收藏 0
点赞 0
评论 0
def collect_garbage(self):
gc.collect()
if config_lora.IS_MICROPYTHON:
print('[Memory - free: {} allocated: {}]'.format(gc.mem_free(), gc.mem_alloc()))
def _garbage_collect(self):
while True:
await asyncio.sleep_ms(100)
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
# Very basic window class. Cuts a rectangular hole in a screen on which content may be drawn
def _idle_thread(self):
if self.gc_enable and (self.last_gc == 0 or after(self.last_gc) > GCTIME):
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
self.last_gc = ticks_us()
if self.heartbeat is not None and (self.last_heartbeat == 0 or after(self.last_heartbeat) > HBTIME):
if platform == 'pyboard':
self.heartbeat.toggle()
elif platform == 'esp8266':
self.heartbeat(not self.heartbeat())
self.last_heartbeat = ticks_us()
def _garbage_collect():
while True:
await asyncio.sleep_ms(500)
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
def mem(level=None):
import gc
mem_alloc = gc.mem_alloc()
mem_free = gc.mem_free()
capacity = mem_alloc + mem_free
print(" capacity\tfree\tusage")
print(" {}\t{}\t{}%".format(capacity, mem_free, int(
((capacity - mem_free) / capacity) * 100.0)))
if level:
import micropython
micropython.mem_info(level)
def get_memory_stats(self):
mem_alloc = gc.mem_alloc()
mem_free = gc.mem_free()
return {
'mem_alloc': mem_alloc,
'mem_free': mem_free
}
def get_gc_stats(self):
import gc
return {
'mem_alloc': gc.mem_alloc(),
'mem_free': gc.mem_free()
}
def handle_command(self, args):
import gc
mem_alloc = gc.mem_alloc()
mem_free = gc.mem_free()
capacity = mem_alloc + mem_free
print(" capacity\tfree\tusage")
print(" {}\t{}\t{}%".format(capacity, mem_free, int(
((capacity - mem_free) / capacity) * 100.0)))
if "-i" in args:
import micropython
micropython.mem_info(1)
def main(self):
loop.create_task(self.led_ctrl())
sta_if = WLAN(STA_IF)
conn = False
while not conn:
while not sta_if.isconnected():
await asyncio.sleep(1)
self.dprint('Awaiting WiFi.') # Repeats forever if no stored connection.
await asyncio.sleep(3)
try:
await self.connect()
conn = True
except OSError:
self.close() # Close socket
self.dprint('Awaiting broker.')
self.dprint('Starting.')
self.outage = False
n = 0
while True:
await asyncio.sleep(60)
gc.collect() # For RAM stats.
msg = 'Mins: {} repubs: {} outages: {} RAM free: {} alloc: {} Longest outage: {}s'.format(
n, self.REPUB_COUNT, self.outages, gc.mem_free(), gc.mem_alloc(), self.max_outage)
self.pub_msg('debug', msg)
n += 1
# Topic names in dict enables multiple Sonoff units to run this code. Only main.py differs.
def from_pyboard(self):
client = self.client
while True:
istr = await self.await_obj(20) # wait for string (poll interval 20ms)
s = istr.split(SEP)
command = s[0]
if command == PUBLISH:
await client.publish(s[1], s[2], bool(s[3]), int(s[4]))
# If qos == 1 only returns once PUBACK received.
self.send(argformat(STATUS, PUBOK))
elif command == SUBSCRIBE:
await client.subscribe(s[1], int(s[2]))
client.subscriptions[s[1]] = int(s[2]) # re-subscribe after outage
elif command == MEM:
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
self.send(argformat(MEM, gc.mem_free(), gc.mem_alloc()))
elif command == TIME:
t = await client.get_time()
self.send(argformat(TIME, t))
else:
self.send(argformat(STATUS, UNKNOWN, 'Unknown command:', istr))
# Runs when channel has synchronised. No return: Pyboard resets ESP on fail.
# Get parameters from Pyboard. Process them. Connect. Instantiate client. Start
# from_pyboard() task. Wait forever, updating connected status.
def _memory(self):
count = 0
while self.isconnected(): # Ensure just one instance.
await asyncio.sleep(1) # Quick response to outage.
count += 1
count %= 20
if not count:
gc.collect()
print('RAM free {} alloc {}'.format(gc.mem_free(), gc.mem_alloc()))
def cb_status():
datetime = datenow()
chipid = config.get_config('chipid')
macaddr = config.get_config('mac')
address = config.get_config('address')
starttime = config.get_config('starttime')
conf = json.dumps(config.get_config(None))
return '<h2>Device %s</h2>' \
'<p>MacAddr: %s' \
'<p>Address: %s' \
'<p>Free Mem: %d (alloc %d)' \
'<p>Date Time: %s' \
'<p>Start Time: %s' \
'</div>' % (chipid, macaddr, address, gc.mem_free(), gc.mem_alloc(), datetime, starttime)
def run(self):
trash_counter = 10
trash = None
while True:
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
self.handle_duty_cycle()
trash_counter -= 1
if trash_counter == 0:
self.handle_year_changed()
trash = self.get_trash_tomorrow()
trash_counter = 10
print(trash)
if not self.wlan.isconnected():
print("Reconnecting to {0}...".format(self.essid))
self.wlan.connect(self.essid, self.password)
if trash != None:
if "b" in self.types:
self.set_red("b" in trash)
p = False
if "p" in self.types:
p = p or ("p" in trash)
if "p4" in self.types:
p = p or ("p4" in trash)
self.set_blue(p)
r = False
if "r" in self.types:
r = r or ("r" in trash)
if "r2" in self.types:
r = r or ("r2" in trash)
if "r4" in self.types:
r = r or ("r4" in trash)
self.set_white(r)
if "g" in self.types:
self.set_yellow("g" in trash)
time.sleep(1)