python类allocate_lock()的实例源码

nanogateway.py 文件源码 项目:pycom-libraries 作者: pycom 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, id, frequency, datarate, ssid, password, server, port, ntp_server='pool.ntp.org', ntp_period=3600):
        self.id = id
        self.server = server
        self.port = port

        self.frequency = frequency
        self.datarate = datarate

        self.ssid = ssid
        self.password = password

        self.ntp_server = ntp_server
        self.ntp_period = ntp_period

        self.server_ip = None

        self.rxnb = 0
        self.rxok = 0
        self.rxfw = 0
        self.dwnb = 0
        self.txnb = 0

        self.sf = self._dr_to_sf(self.datarate)
        self.bw = self._dr_to_bw(self.datarate)

        self.stat_alarm = None
        self.pull_alarm = None
        self.uplink_alarm = None

        self.wlan = None
        self.sock = None
        self.udp_stop = False
        self.udp_lock = _thread.allocate_lock()

        self.lora = None
        self.lora_sock = None

        self.rtc = machine.RTC()
_dummy_thread.py 文件源码 项目:news-for-good 作者: thecodinghub 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def allocate_lock():
    """Dummy implementation of _thread.allocate_lock()."""
    return LockType()
pipetool.py 文件源码 项目:trex-http-proxy 作者: alwye 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def __init__(self, *pipes):
        self.active_pipes = set()
        self.active_sources = set()
        self.active_drains = set()
        self.active_sinks = set()
        self._add_pipes(*pipes)
        self.thread_lock = _thread.allocate_lock()
        self.command_lock = _thread.allocate_lock()
        self.__fdr,self.__fdw = os.pipe()
        self.threadid = None
_dummy_thread32.py 文件源码 项目:yt 作者: yt-project 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def allocate_lock():
    """Dummy implementation of _thread.allocate_lock()."""
    return LockType()
_dummy_thread.py 文件源码 项目:Tencent_Cartoon_Download 作者: Fretice 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def allocate_lock():
    """Dummy implementation of _thread.allocate_lock()."""
    return LockType()
_dummy_thread.py 文件源码 项目:fieldsight-kobocat 作者: awemulya 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def allocate_lock():
    """Dummy implementation of _thread.allocate_lock()."""
    return LockType()
test_threading.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_finalize_runnning_thread(self):
        # Issue 1402: the PyGILState_Ensure / _Release functions may be called
        # very late on python exit: on deallocation of a running thread for
        # example.
        import_module("ctypes")

        rc, out, err = assert_python_failure("-c", """if 1:
            import ctypes, sys, time, _thread

            # This lock is used as a simple event variable.
            ready = _thread.allocate_lock()
            ready.acquire()

            # Module globals are cleared before __del__ is run
            # So we save the functions in class dict
            class C:
                ensure = ctypes.pythonapi.PyGILState_Ensure
                release = ctypes.pythonapi.PyGILState_Release
                def __del__(self):
                    state = self.ensure()
                    self.release(state)

            def waitingThread():
                x = C()
                ready.release()
                time.sleep(100)

            _thread.start_new_thread(waitingThread, ())
            ready.acquire()  # Be sure the other thread is waiting.
            sys.exit(42)
            """)
        self.assertEqual(rc, 42)
_io.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
        """Create a new buffered reader using the given readable raw IO object.
        """
        if not raw.readable():
            raise IOError('"raw" argument must be readable.')

        _BufferedIOMixin.__init__(self, raw)
        if buffer_size <= 0:
            raise ValueError("invalid buffer size")
        self.buffer_size = buffer_size
        self._reset_read_buf()
        self._read_lock = Lock()
_io.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
        if not raw.writable():
            raise IOError('"raw" argument must be writable.')

        _BufferedIOMixin.__init__(self, raw)
        if buffer_size <= 0:
            raise ValueError("invalid buffer size")
        self.buffer_size = buffer_size
        self._write_buf = bytearray()
        self._write_lock = Lock()
_dummy_thread.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def allocate_lock():
    """Dummy implementation of _thread.allocate_lock()."""
    return LockType()
_thread.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def allocate_lock():
    """Dummy implementation of _thread.allocate_lock()."""
    return LockType()
_dummy_thread.py 文件源码 项目:CloudPrint 作者: William-An 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def allocate_lock():
    """Dummy implementation of _thread.allocate_lock()."""
    return LockType()
visualstudio_py_debugger.py 文件源码 项目:HomeAutomation 作者: gs2671 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def __init__(self):
        self.default_mode = BREAK_MODE_UNHANDLED
        self.break_on = { }
        self.handler_cache = dict(self.BUILT_IN_HANDLERS)
        self.handler_lock = thread.allocate_lock()
        self.add_exception('exceptions.IndexError', BREAK_MODE_NEVER)
        self.add_exception('builtins.IndexError', BREAK_MODE_NEVER)
        self.add_exception('exceptions.KeyError', BREAK_MODE_NEVER)
        self.add_exception('builtins.KeyError', BREAK_MODE_NEVER)
        self.add_exception('exceptions.AttributeError', BREAK_MODE_NEVER)
        self.add_exception('builtins.AttributeError', BREAK_MODE_NEVER)
        self.add_exception('exceptions.StopIteration', BREAK_MODE_NEVER)
        self.add_exception('builtins.StopIteration', BREAK_MODE_NEVER)
        self.add_exception('exceptions.GeneratorExit', BREAK_MODE_NEVER)
        self.add_exception('builtins.GeneratorExit', BREAK_MODE_NEVER)
visualstudio_py_debugger.py 文件源码 项目:HomeAutomation 作者: gs2671 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, id = None):
        if id is not None:
            self.id = id 
        else:
            self.id = thread.get_ident()
        self._events = {'call' : self.handle_call, 
                        'line' : self.handle_line, 
                        'return' : self.handle_return, 
                        'exception' : self.handle_exception,
                        'c_call' : self.handle_c_call,
                        'c_return' : self.handle_c_return,
                        'c_exception' : self.handle_c_exception,
                       }
        self.cur_frame = None
        self.stepping = STEPPING_NONE
        self.unblock_work = None
        self._block_lock = thread.allocate_lock()
        self._block_lock.acquire()
        self._block_starting_lock = thread.allocate_lock()
        self._is_blocked = False
        self._is_working = False
        self.stopped_on_line = None
        self.detach = False
        self.trace_func = self.trace_func # replace self.trace_func w/ a bound method so we don't need to re-create these regularly
        self.prev_trace_func = None
        self.trace_func_stack = []
        self.reported_process_loaded = False
        self.django_stepping = None
        self.is_sending = False

        # stackless changes
        if stackless is not None:
            self._stackless_attach()

        if sys.platform == 'cli':
            self.frames = []
visualstudio_py_repl.py 文件源码 项目:HomeAutomation 作者: gs2671 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self):
        self.lock = thread.allocate_lock()
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_finalize_runnning_thread(self):
        # Issue 1402: the PyGILState_Ensure / _Release functions may be called
        # very late on python exit: on deallocation of a running thread for
        # example.
        import_module("ctypes")

        rc, out, err = assert_python_failure("-c", """if 1:
            import ctypes, sys, time, _thread

            # This lock is used as a simple event variable.
            ready = _thread.allocate_lock()
            ready.acquire()

            # Module globals are cleared before __del__ is run
            # So we save the functions in class dict
            class C:
                ensure = ctypes.pythonapi.PyGILState_Ensure
                release = ctypes.pythonapi.PyGILState_Release
                def __del__(self):
                    state = self.ensure()
                    self.release(state)

            def waitingThread():
                x = C()
                ready.release()
                time.sleep(100)

            _thread.start_new_thread(waitingThread, ())
            ready.acquire()  # Be sure the other thread is waiting.
            sys.exit(42)
            """)
        self.assertEqual(rc, 42)
test_threading.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_tstate_lock(self):
        # Test an implementation detail of Thread objects.
        started = _thread.allocate_lock()
        finish = _thread.allocate_lock()
        started.acquire()
        finish.acquire()
        def f():
            started.release()
            finish.acquire()
            time.sleep(0.01)
        # The tstate lock is None until the thread is started
        t = threading.Thread(target=f)
        self.assertIs(t._tstate_lock, None)
        t.start()
        started.acquire()
        self.assertTrue(t.is_alive())
        # The tstate lock can't be acquired when the thread is running
        # (or suspended).
        tstate_lock = t._tstate_lock
        self.assertFalse(tstate_lock.acquire(timeout=0), False)
        finish.release()
        # When the thread ends, the state_lock can be successfully
        # acquired.
        self.assertTrue(tstate_lock.acquire(timeout=5), False)
        # But is_alive() is still True:  we hold _tstate_lock now, which
        # prevents is_alive() from knowing the thread's end-of-life C code
        # is done.
        self.assertTrue(t.is_alive())
        # Let is_alive() find out the C code is done.
        tstate_lock.release()
        self.assertFalse(t.is_alive())
        # And verify the thread disposed of _tstate_lock.
        self.assertTrue(t._tstate_lock is None)
_pyio.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
        """Create a new buffered reader using the given readable raw IO object.
        """
        if not raw.readable():
            raise OSError('"raw" argument must be readable.')

        _BufferedIOMixin.__init__(self, raw)
        if buffer_size <= 0:
            raise ValueError("invalid buffer size")
        self.buffer_size = buffer_size
        self._reset_read_buf()
        self._read_lock = Lock()
_pyio.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self, raw, buffer_size=DEFAULT_BUFFER_SIZE):
        if not raw.writable():
            raise OSError('"raw" argument must be writable.')

        _BufferedIOMixin.__init__(self, raw)
        if buffer_size <= 0:
            raise ValueError("invalid buffer size")
        self.buffer_size = buffer_size
        self._write_buf = bytearray()
        self._write_lock = Lock()
visualstudio_py_debugger.py 文件源码 项目:AutoDiff 作者: icewall 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def __init__(self):
        self.default_mode = BREAK_MODE_UNHANDLED
        self.break_on = { }
        self.handler_cache = dict(self.BUILT_IN_HANDLERS)
        self.handler_lock = thread.allocate_lock()
        self.AddException('exceptions.IndexError', BREAK_MODE_NEVER)
        self.AddException('exceptions.KeyError', BREAK_MODE_NEVER)
        self.AddException('exceptions.AttributeError', BREAK_MODE_NEVER)
        self.AddException('exceptions.StopIteration', BREAK_MODE_NEVER)
        self.AddException('exceptions.GeneratorExit', BREAK_MODE_NEVER)


问题


面经


文章

微信
公众号

扫码关注公众号