def _garbage_collect(self):
while True:
await asyncio.sleep_ms(100)
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
# Very basic window class. Cuts a rectangular hole in a screen on which content may be drawn
python类threshold()的实例源码
def _garbage_collect(self):
while True:
await asyncio.sleep_ms(100)
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
# Very basic window class. Cuts a rectangular hole in a screen on which content may be drawn
def _idle_thread(self):
if self.gc_enable and (self.last_gc == 0 or after(self.last_gc) > GCTIME):
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
self.last_gc = ticks_us()
if self.heartbeat is not None and (self.last_heartbeat == 0 or after(self.last_heartbeat) > HBTIME):
if platform == 'pyboard':
self.heartbeat.toggle()
elif platform == 'esp8266':
self.heartbeat(not self.heartbeat())
self.last_heartbeat = ticks_us()
def _garbage_collect():
while True:
await asyncio.sleep_ms(500)
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
def from_pyboard(self):
client = self.client
while True:
istr = await self.await_obj(20) # wait for string (poll interval 20ms)
s = istr.split(SEP)
command = s[0]
if command == PUBLISH:
await client.publish(s[1], s[2], bool(s[3]), int(s[4]))
# If qos == 1 only returns once PUBACK received.
self.send(argformat(STATUS, PUBOK))
elif command == SUBSCRIBE:
await client.subscribe(s[1], int(s[2]))
client.subscriptions[s[1]] = int(s[2]) # re-subscribe after outage
elif command == MEM:
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
self.send(argformat(MEM, gc.mem_free(), gc.mem_alloc()))
elif command == TIME:
t = await client.get_time()
self.send(argformat(TIME, t))
else:
self.send(argformat(STATUS, UNKNOWN, 'Unknown command:', istr))
# Runs when channel has synchronised. No return: Pyboard resets ESP on fail.
# Get parameters from Pyboard. Process them. Connect. Instantiate client. Start
# from_pyboard() task. Wait forever, updating connected status.
def run(self):
trash_counter = 10
trash = None
while True:
gc.collect()
gc.threshold(gc.mem_free() // 4 + gc.mem_alloc())
self.handle_duty_cycle()
trash_counter -= 1
if trash_counter == 0:
self.handle_year_changed()
trash = self.get_trash_tomorrow()
trash_counter = 10
print(trash)
if not self.wlan.isconnected():
print("Reconnecting to {0}...".format(self.essid))
self.wlan.connect(self.essid, self.password)
if trash != None:
if "b" in self.types:
self.set_red("b" in trash)
p = False
if "p" in self.types:
p = p or ("p" in trash)
if "p4" in self.types:
p = p or ("p4" in trash)
self.set_blue(p)
r = False
if "r" in self.types:
r = r or ("r" in trash)
if "r2" in self.types:
r = r or ("r2" in trash)
if "r4" in self.types:
r = r or ("r4" in trash)
self.set_white(r)
if "g" in self.types:
self.set_yellow("g" in trash)
time.sleep(1)