def get_onboard_button_events(self, btn, bcc_key, on_single_click, on_double_click):
import gc
from machine import Timer
if btn.value() == 0:
self.button_click_counter[bcc_key] += 1
if self.button_click_counter[bcc_key] == 1:
log.info("single-click registered (mem free: " + str(gc.mem_free()) + ")")
sc = getattr(tk, on_single_click)
sc()
elif self.button_click_counter[bcc_key] == 2:
log.info("double click registered (mem free: " + str(gc.mem_free()) + ")")
sc = getattr(tk, on_double_click)
sc()
else:
pass
gtim = Timer(1)
gtim.init(period=300, mode=Timer.ONE_SHOT, callback=lambda t:self.reset_onboard_button_event_counter(bcc_key))
# @timed_function
python类mem_free()的实例源码
def show_display(self):
"""The function that will show the display. First clean the display then
show all data. For network names if the name is longer than the display
it split in two and shown on row 1,2. When all the data is written the
esp will sleep for 10000 ms"""
self.oled.fill(0)
self.oled.show()
if len(self.name) > 15:
self.oled.text(self.name[0:15],0,0)
self.oled.text(self.name[15:int(len(self.name))],0,10)
else:
self.oled.text(self.name,0,0)
self.oled.text(self.strengt,30,20)
self.oled.text(self.status,30,30)
self.oled.text(self.kanaal, 30,40)
self.oled.text((str(gc.mem_free())+ " B"), 30,50)
self.oled.show()
utime.sleep_ms(10000)
def start_server():
print("Starting web server")
sock = socket.socket()
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.bind( ('', 80) ); sock.listen(1)
sock.settimeout(5.0)
print("Synchronous web server running...")
gc.collect()
print("gc.mem_free=", gc.mem_free())
t0 = time.ticks_ms()
try:
while True:
accept_conn(sock)
now = time.ticks_ms()
if (now - t0) > 8000:
periodic_tasks()
t0 = now
finally:
sock.close()
def timed_function(f, *args, **kwargs):
import time
myname = str(f).split(' ')[1]
def new_func(*args, **kwargs):
t = time.ticks_us()
result = f(*args, **kwargs)
delta = time.ticks_diff(t, time.ticks_us())
log.debug('GC: {} Function {} Time = {:6.3f}ms'.format(str(gc.mem_free()), myname, delta/1000))
return result
return new_func
# @timed_function
def mem_cleanup(self):
import gc
log.debug("Invoking garbage collection ...")
gc.collect()
mem = gc.mem_free()
if 6001 <= mem <= 10000:
log.warn("Memory is low: " + str(mem))
elif 4001 <= mem <= 6000:
log.warn("Memory is very low: " + str(mem))
elif mem < 4000:
log.critical("Memory is extremely low: {}".format(str(mem)))
else:
log.debug("Memory is currently: " + str(mem))
def process_input(self, s, cl, addr):
print('client connected from', addr)
cl.settimeout(None)
if not hasattr(cl, 'readline'):
# CPython
client_stream = cl.makefile("rwb")
else:
# MicroPython
client_stream = cl
req = client_stream.readline()
print(req)
while True:
data = client_stream.readline()
if not data or data == b'\r\n':
break
method, path, protocol = req.split(b' ')
if method.lower() == b'get':
if path == b'/':
self.send(cl, 200, filename='index.html', content_type='text/html; charset=utf-8')
elif path == b'/commands':
commands = ', '.join('"' + c + '"' for c in sorted(WebApp.commands))
self.send(cl, 200, '[' + commands + ']', content_type='application/json')
else:
self.send(cl, 404)
elif method.lower() == b'post':
try:
func = WebApp.commands[path.lstrip(b'/').decode('ascii')]
except KeyError:
self.send(cl, 404)
return
self.send(cl, 200)
return func
else:
self.send(cl, 400)
#mem = gc.mem_alloc()
#gc.collect()
#print("Freeing", mem - gc.mem_alloc(), "now free", gc.mem_free())
def heap():
yield
while True:
cpuload = (10000 - asyncio.sched.idlecount) / 100
asyncio.sched.idlecount = 0
log.info ("Memory free: %d cpu load: %d %% " , gc.mem_free() , cpuload )
yield
def heap():
yield
while True:
cpuload = (10000 - asyncio.sched.idlecount) / 100
cpuidle = asyncio.sched.idlecount / 10
asyncio.sched.idlecount = 0
log.info ("Memory free: %d cpu idlecount/sec: %d %% time:%s" , gc.mem_free() , cpuidle, klok.toString() )
yield
sx127x.py 文件源码
项目:SX127x_driver_for_MicroPython_on_ESP8266
作者: Wei1234c
项目源码
文件源码
阅读 21
收藏 0
点赞 0
评论 0
def collect_garbage(self):
gc.collect()
if config_lora.IS_MICROPYTHON:
print('[Memory - free: {} allocated: {}]'.format(gc.mem_free(), gc.mem_alloc()))
sx127x.py 文件源码
项目:SX127x_driver_for_MicroPython_on_ESP8266
作者: Wei1234c
项目源码
文件源码
阅读 23
收藏 0
点赞 0
评论 0
def collect_garbage(self):
gc.collect()
if config_lora.IS_MICROPYTHON:
print('[Memory - free: {} allocated: {}]'.format(gc.mem_free(), gc.mem_alloc()))
def show_free():
print("free ram:", gc.mem_free())
def loop_dht(o):
global d
global oled
global CLIENT_ID
global client
global a1
global a2
global a3
delay = o
topic = 'micro/{0}/temperature'.format(CLIENT_ID.decode("utf-8"))
while True:
try:
d.measure()
oled.fill(0)
soils = [{'id': 1, 'value': read_soil(a1)},
{'id': 2, 'value': read_soil(a2)},
{'id': 3, 'value': read_soil(a3)}]
oled.text('SOIL SENSOR', 20, 5)
oled.text('T: {0:.1f}C,{1:.1f}%'.format(d.temperature(), d.humidity()), 3, 20)
oled.text('A1:{0} A2:{1}'.format(soils[0]['value']['msg'],soils[1]['value']['msg']), 3, 35)
oled.text('A3:{0}'.format(soils[2]['value']['msg']), 3, 50)
oled.show()
msg = json.dumps({
'Id': CLIENT_ID,
'heap': gc.mem_free(),
'temperature': '{0:.2f}'.format(d.temperature()),
'humidity': '{0:.2f}'.format(d.humidity()),
'soils': soils
})
print(msg)
client.publish(topic, msg)
except OSError as e:
if e.args[0] == errno.ETIMEDOUT:
print('error dht: ', e)
time.sleep(delay)
def dth_read():
global client
global d
global oled
global CLIENT_ID
global loop
topic = 'micro/{0}/temperature'.format(CLIENT_ID.decode("utf-8"))
while True:
try:
d.measure()
oled.fill(0)
oled.text('ESP32', 45, 5)
oled.text('MicroPython', 20, 20)
oled.text('T:{0:.2f} C'.format(d.temperature()), 3, 35)
oled.text('H:{0:.2f} %'.format(d.humidity()), 3, 50)
oled.show()
msg = json.dumps({
'heap': gc.mem_free(), 'Type':7,
'Id': CLIENT_ID,
'temperature': '{0:.2f}'.format(d.temperature()),
'humidity': '{0:.2f}'.format(d.humidity())
})
print(topic, msg)
client.publish(topic, msg)
except OSError as e:
if e.args[0] == errno.ETIMEDOUT:
print('error dht: ', e)
else:
loop.stop()
print('error mqtt client: ', e)
print('restart mqtt client')
cfg = load_config()
mqtt_cfg = cfg['mqtt']
print('restart mqtt client: ', mqtt_cfg)
loop.run_until_complete(mqtt_connection(mqtt_cfg))
loop.run_forever()
continue
await asyncio.sleep(5)
def run():
global oled
global CLIENT_ID
while True:
try:
t, h = get_temperature()
msg = json.dumps({ 'Heap': gc.mem_free(), 'Type':7, 'id': CLIENT_ID, 'temperature': '{0:.2f}'.format(t), 'humidity': '{0:.2f}'.format(h)})
print(msg)
client.publish('micro/{0}/temperature'.format(CLIENT_ID.decode("utf-8")), msg)
except OSError as e:
if e.args[0] == errno.ETIMEDOUT:
print('error dht: ', e)
time.sleep(5)
def mem(level=None):
import gc
mem_alloc = gc.mem_alloc()
mem_free = gc.mem_free()
capacity = mem_alloc + mem_free
print(" capacity\tfree\tusage")
print(" {}\t{}\t{}%".format(capacity, mem_free, int(
((capacity - mem_free) / capacity) * 100.0)))
if level:
import micropython
micropython.mem_info(level)
def get_memory_stats(self):
mem_alloc = gc.mem_alloc()
mem_free = gc.mem_free()
return {
'mem_alloc': mem_alloc,
'mem_free': mem_free
}
def get_gc_stats(self):
import gc
return {
'mem_alloc': gc.mem_alloc(),
'mem_free': gc.mem_free()
}
def handle_command(self, args):
import gc
mem_alloc = gc.mem_alloc()
mem_free = gc.mem_free()
capacity = mem_alloc + mem_free
print(" capacity\tfree\tusage")
print(" {}\t{}\t{}%".format(capacity, mem_free, int(
((capacity - mem_free) / capacity) * 100.0)))
if "-i" in args:
import micropython
micropython.mem_info(1)
def run(self, *args, **kwargs):
def callback():
log.info("memory free %d", gc.mem_free())
super(Discovery, self).run(callback=callback, callback_time=self.discovery_time)
def run(self, *args, **kwargs):
""" run """
def callback():
""" callback """
log.debug("memory free %d", gc.mem_free())
super(Eddystone, self).run(callback=callback, callback_time=1000)
def main(self):
loop.create_task(self.led_ctrl())
sta_if = WLAN(STA_IF)
conn = False
while not conn:
while not sta_if.isconnected():
await asyncio.sleep(1)
self.dprint('Awaiting WiFi.') # Repeats forever if no stored connection.
await asyncio.sleep(3)
try:
await self.connect()
conn = True
except OSError:
self.close() # Close socket
self.dprint('Awaiting broker.')
self.dprint('Starting.')
self.outage = False
n = 0
while True:
await asyncio.sleep(60)
gc.collect() # For RAM stats.
msg = 'Mins: {} repubs: {} outages: {} RAM free: {} alloc: {} Longest outage: {}s'.format(
n, self.REPUB_COUNT, self.outages, gc.mem_free(), gc.mem_alloc(), self.max_outage)
self.pub_msg('debug', msg)
n += 1
# Topic names in dict enables multiple Sonoff units to run this code. Only main.py differs.
def from_pyboard(self):
client = self.client
while True:
istr = await self.await_obj(20) # wait for string (poll interval 20ms)
s = istr.split(SEP)
command = s[0]
if command == PUBLISH:
await client.publish(s[1], s[2], bool(s[3]), int(s[4]))
# If qos == 1 only returns once PUBACK received.
self.send(argformat(STATUS, PUBOK))
elif command == SUBSCRIBE:
await client.subscribe(s[1], int(s[2]))
client.subscriptions[s[1]] = int(s[2]) # re-subscribe after outage
elif command == MEM:
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
self.send(argformat(MEM, gc.mem_free(), gc.mem_alloc()))
elif command == TIME:
t = await client.get_time()
self.send(argformat(TIME, t))
else:
self.send(argformat(STATUS, UNKNOWN, 'Unknown command:', istr))
# Runs when channel has synchronised. No return: Pyboard resets ESP on fail.
# Get parameters from Pyboard. Process them. Connect. Instantiate client. Start
# from_pyboard() task. Wait forever, updating connected status.
def _memory(self):
count = 0
while self.isconnected(): # Ensure just one instance.
await asyncio.sleep(1) # Quick response to outage.
count += 1
count %= 20
if not count:
gc.collect()
print('RAM free {} alloc {}'.format(gc.mem_free(), gc.mem_alloc()))
def cb_status():
datetime = datenow()
chipid = config.get_config('chipid')
macaddr = config.get_config('mac')
address = config.get_config('address')
starttime = config.get_config('starttime')
conf = json.dumps(config.get_config(None))
return '<h2>Device %s</h2>' \
'<p>MacAddr: %s' \
'<p>Address: %s' \
'<p>Free Mem: %d (alloc %d)' \
'<p>Date Time: %s' \
'<p>Start Time: %s' \
'</div>' % (chipid, macaddr, address, gc.mem_free(), gc.mem_alloc(), datetime, starttime)