python类_RLock()的实例源码

threadable.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 75 收藏 0 点赞 0 评论 0
def init(with_threads=1):
    """Initialize threading.

    Don't bother calling this.  If it needs to happen, it will happen.
    """
    global threaded, _synchLockCreator, XLock

    if with_threads:
        if not threaded:
            if threadmodule is not None:
                threaded = True

                class XLock(threadingmodule._RLock, object):
                    def __reduce__(self):
                        return (unpickle_lock, ())

                _synchLockCreator = XLock()
            else:
                raise RuntimeError("Cannot initialize threading, platform lacks thread support")
    else:
        if threaded:
            raise RuntimeError("Cannot uninitialize threads")
        else:
            pass
threadable.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def init(with_threads=1):
    """Initialize threading.

    Don't bother calling this.  If it needs to happen, it will happen.
    """
    global threaded, _synchLockCreator, XLock

    if with_threads:
        if not threaded:
            if threadmodule is not None:
                threaded = True

                class XLock(threadingmodule._RLock, object):
                    def __reduce__(self):
                        return (unpickle_lock, ())

                _synchLockCreator = XLock()
            else:
                raise RuntimeError("Cannot initialize threading, platform lacks thread support")
    else:
        if threaded:
            raise RuntimeError("Cannot uninitialize threads")
        else:
            pass
threadable.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def init(with_threads=1):
    """Initialize threading.

    Don't bother calling this.  If it needs to happen, it will happen.
    """
    global threaded, _synchLockCreator, XLock

    if with_threads:
        if not threaded:
            if threadingmodule is not None:
                threaded = True

                class XLock(threadingmodule._RLock, object):
                    def __reduce__(self):
                        return (unpickle_lock, ())

                _synchLockCreator = XLock()
            else:
                raise RuntimeError("Cannot initialize threading, platform lacks thread support")
    else:
        if threaded:
            raise RuntimeError("Cannot uninitialize threads")
        else:
            pass
compat.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def __init__(self, *args):
            from OpenSSL import SSL as _ssl
            self._ssl_conn = apply(_ssl.Connection, args)
            from threading import _RLock
            self._lock = _RLock()
compat.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def __init__(self, *args):
            from OpenSSL import SSL as _ssl
            self._ssl_conn = apply(_ssl.Connection, args)
            from threading import _RLock
            self._lock = _RLock()
nrlock.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def __init__(self):
                threading._RLock.__init__(self)
nrlock.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def acquire(self, blocking=1):
                if self._is_owned():
                        raise NRLockException("Recursive NRLock acquire")
                return threading._RLock.acquire(self, blocking)
nrlock.py 文件源码 项目:solaris-ips 作者: oracle 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def release(self):
                try:
                        threading._RLock.release(self)
                except RuntimeError:
                        errbuf = "Release of unacquired lock\n"
                        errbuf += self._debug_lock_release()
                        raise NRLockException(errbuf)


问题


面经


文章

微信
公众号

扫码关注公众号