def __init__(self, id, frequency, datarate, ssid, password, server, port, ntp_server='pool.ntp.org', ntp_period=3600):
self.id = id
self.server = server
self.port = port
self.frequency = frequency
self.datarate = datarate
self.ssid = ssid
self.password = password
self.ntp_server = ntp_server
self.ntp_period = ntp_period
self.server_ip = None
self.rxnb = 0
self.rxok = 0
self.rxfw = 0
self.dwnb = 0
self.txnb = 0
self.sf = self._dr_to_sf(self.datarate)
self.bw = self._dr_to_bw(self.datarate)
self.stat_alarm = None
self.pull_alarm = None
self.uplink_alarm = None
self.wlan = None
self.sock = None
self.udp_stop = False
self.udp_lock = _thread.allocate_lock()
self.lora = None
self.lora_sock = None
self.rtc = machine.RTC()
python类allocate_lock()的实例源码
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def __init__(self, *pipes):
self.active_pipes = set()
self.active_sources = set()
self.active_drains = set()
self.active_sinks = set()
self._add_pipes(*pipes)
self.thread_lock = _thread.allocate_lock()
self.command_lock = _thread.allocate_lock()
self.__fdr,self.__fdw = os.pipe()
self.threadid = None
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def test_finalize_runnning_thread(self):
# Issue 1402: the PyGILState_Ensure / _Release functions may be called
# very late on python exit: on deallocation of a running thread for
# example.
import_module("ctypes")
rc, out, err = assert_python_failure("-c", """if 1:
import ctypes, sys, time, _thread
# This lock is used as a simple event variable.
ready = _thread.allocate_lock()
ready.acquire()
# Module globals are cleared before __del__ is run
# So we save the functions in class dict
class C:
ensure = ctypes.pythonapi.PyGILState_Ensure
release = ctypes.pythonapi.PyGILState_Release
def __del__(self):
state = self.ensure()
self.release(state)
def waitingThread():
x = C()
ready.release()
time.sleep(100)
_thread.start_new_thread(waitingThread, ())
ready.acquire() # Be sure the other thread is waiting.
sys.exit(42)
""")
self.assertEqual(rc, 42)
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
"""Create a new buffered reader using the given readable raw IO object.
"""
if not raw.readable():
raise IOError('"raw" argument must be readable.')
_BufferedIOMixin.__init__(self, raw)
if buffer_size <= 0:
raise ValueError("invalid buffer size")
self.buffer_size = buffer_size
self._reset_read_buf()
self._read_lock = Lock()
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
if not raw.writable():
raise IOError('"raw" argument must be writable.')
_BufferedIOMixin.__init__(self, raw)
if buffer_size <= 0:
raise ValueError("invalid buffer size")
self.buffer_size = buffer_size
self._write_buf = bytearray()
self._write_lock = Lock()
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def allocate_lock():
"""Dummy implementation of _thread.allocate_lock()."""
return LockType()
def __init__(self):
self.default_mode = BREAK_MODE_UNHANDLED
self.break_on = { }
self.handler_cache = dict(self.BUILT_IN_HANDLERS)
self.handler_lock = thread.allocate_lock()
self.add_exception('exceptions.IndexError', BREAK_MODE_NEVER)
self.add_exception('builtins.IndexError', BREAK_MODE_NEVER)
self.add_exception('exceptions.KeyError', BREAK_MODE_NEVER)
self.add_exception('builtins.KeyError', BREAK_MODE_NEVER)
self.add_exception('exceptions.AttributeError', BREAK_MODE_NEVER)
self.add_exception('builtins.AttributeError', BREAK_MODE_NEVER)
self.add_exception('exceptions.StopIteration', BREAK_MODE_NEVER)
self.add_exception('builtins.StopIteration', BREAK_MODE_NEVER)
self.add_exception('exceptions.GeneratorExit', BREAK_MODE_NEVER)
self.add_exception('builtins.GeneratorExit', BREAK_MODE_NEVER)
def __init__(self, id = None):
if id is not None:
self.id = id
else:
self.id = thread.get_ident()
self._events = {'call' : self.handle_call,
'line' : self.handle_line,
'return' : self.handle_return,
'exception' : self.handle_exception,
'c_call' : self.handle_c_call,
'c_return' : self.handle_c_return,
'c_exception' : self.handle_c_exception,
}
self.cur_frame = None
self.stepping = STEPPING_NONE
self.unblock_work = None
self._block_lock = thread.allocate_lock()
self._block_lock.acquire()
self._block_starting_lock = thread.allocate_lock()
self._is_blocked = False
self._is_working = False
self.stopped_on_line = None
self.detach = False
self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
self.prev_trace_func = None
self.trace_func_stack = []
self.reported_process_loaded = False
self.django_stepping = None
self.is_sending = False
# stackless changes
if stackless is not None:
self._stackless_attach()
if sys.platform == 'cli':
self.frames = []
def __init__(self):
self.lock = thread.allocate_lock()
def test_finalize_runnning_thread(self):
# Issue 1402: the PyGILState_Ensure / _Release functions may be called
# very late on python exit: on deallocation of a running thread for
# example.
import_module("ctypes")
rc, out, err = assert_python_failure("-c", """if 1:
import ctypes, sys, time, _thread
# This lock is used as a simple event variable.
ready = _thread.allocate_lock()
ready.acquire()
# Module globals are cleared before __del__ is run
# So we save the functions in class dict
class C:
ensure = ctypes.pythonapi.PyGILState_Ensure
release = ctypes.pythonapi.PyGILState_Release
def __del__(self):
state = self.ensure()
self.release(state)
def waitingThread():
x = C()
ready.release()
time.sleep(100)
_thread.start_new_thread(waitingThread, ())
ready.acquire() # Be sure the other thread is waiting.
sys.exit(42)
""")
self.assertEqual(rc, 42)
def test_tstate_lock(self):
# Test an implementation detail of Thread objects.
started = _thread.allocate_lock()
finish = _thread.allocate_lock()
started.acquire()
finish.acquire()
def f():
started.release()
finish.acquire()
time.sleep(0.01)
# The tstate lock is None until the thread is started
t = threading.Thread(target=f)
self.assertIs(t._tstate_lock, None)
t.start()
started.acquire()
self.assertTrue(t.is_alive())
# The tstate lock can't be acquired when the thread is running
# (or suspended).
tstate_lock = t._tstate_lock
self.assertFalse(tstate_lock.acquire(timeout=0), False)
finish.release()
# When the thread ends, the state_lock can be successfully
# acquired.
self.assertTrue(tstate_lock.acquire(timeout=5), False)
# But is_alive() is still True: we hold _tstate_lock now, which
# prevents is_alive() from knowing the thread's end-of-life C code
# is done.
self.assertTrue(t.is_alive())
# Let is_alive() find out the C code is done.
tstate_lock.release()
self.assertFalse(t.is_alive())
# And verify the thread disposed of _tstate_lock.
self.assertTrue(t._tstate_lock is None)
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
"""Create a new buffered reader using the given readable raw IO object.
"""
if not raw.readable():
raise OSError('"raw" argument must be readable.')
_BufferedIOMixin.__init__(self, raw)
if buffer_size <= 0:
raise ValueError("invalid buffer size")
self.buffer_size = buffer_size
self._reset_read_buf()
self._read_lock = Lock()
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
if not raw.writable():
raise OSError('"raw" argument must be writable.')
_BufferedIOMixin.__init__(self, raw)
if buffer_size <= 0:
raise ValueError("invalid buffer size")
self.buffer_size = buffer_size
self._write_buf = bytearray()
self._write_lock = Lock()
def __init__(self):
self.default_mode = BREAK_MODE_UNHANDLED
self.break_on = { }
self.handler_cache = dict(self.BUILT_IN_HANDLERS)
self.handler_lock = thread.allocate_lock()
self.AddException('exceptions.IndexError', BREAK_MODE_NEVER)
self.AddException('exceptions.KeyError', BREAK_MODE_NEVER)
self.AddException('exceptions.AttributeError', BREAK_MODE_NEVER)
self.AddException('exceptions.StopIteration', BREAK_MODE_NEVER)
self.AddException('exceptions.GeneratorExit', BREAK_MODE_NEVER)