python类close_all()的实例源码

test_asyncore.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
test_asyncore.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
test_smtpd.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
        asyncore.socket = smtpd.socket = socket
test_smtpd.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
        asyncore.socket = smtpd.socket = socket
        smtpd.DEBUGSTREAM = self.old_debugstream
test_smtpd.py 文件源码 项目:web_ctp 作者: molebot 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
        asyncore.socket = smtpd.socket = socket
        smtpd.DEBUGSTREAM = self.old_debugstream
exasol.py 文件源码 项目:python-exasol 作者: EXASOL 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def run(self):
        try:
            while not self.finished:
                asyncore.loop(timeout = 1, count = 1)
        finally:
            self.serv.close()
            del self.serv
            asyncore.close_all()
test_poplib.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def run(self):
        self.active = True
        self.__flag.set()
        while self.active and asyncore.socket_map:
            self.active_lock.acquire()
            asyncore.loop(timeout=0.1, count=1)
            self.active_lock.release()
        asyncore.close_all(ignore_all=True)
test_ftplib.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def run(self):
        self.active = True
        self.__flag.set()
        while self.active and asyncore.socket_map:
            self.active_lock.acquire()
            asyncore.loop(timeout=0.1, count=1)
            self.active_lock.release()
        asyncore.close_all(ignore_all=True)
test_smtplib.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 28 收藏 0 点赞 0 评论 0
def debugging_server(serv, serv_evt, client_evt):
    serv_evt.set()

    try:
        if hasattr(select, 'poll'):
            poll_fun = asyncore.poll2
        else:
            poll_fun = asyncore.poll

        n = 1000
        while asyncore.socket_map and n > 0:
            poll_fun(0.01, asyncore.socket_map)

            # when the client conversation is finished, it will
            # set client_evt, and it's then ok to kill the server
            if client_evt.is_set():
                serv.close()
                break

            n -= 1

    except socket.timeout:
        pass
    finally:
        if not client_evt.is_set():
            # allow some time for the client to read the result
            time.sleep(0.5)
            serv.close()
        asyncore.close_all()
        serv_evt.set()
test_asyncore.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
test_asyncore.py 文件源码 项目:pefile.pypy 作者: cloudtracer 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
test_poplib.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 16 收藏 0 点赞 0 评论 0
def run(self):
        self.active = True
        self.__flag.set()
        while self.active and asyncore.socket_map:
            self.active_lock.acquire()
            asyncore.loop(timeout=0.1, count=1)
            self.active_lock.release()
        asyncore.close_all(ignore_all=True)
test_ftplib.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def run(self):
        self.active = True
        self.__flag.set()
        while self.active and asyncore.socket_map:
            self.active_lock.acquire()
            asyncore.loop(timeout=0.1, count=1)
            self.active_lock.release()
        asyncore.close_all(ignore_all=True)
test_smtplib.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 23 收藏 0 点赞 0 评论 0
def debugging_server(serv, serv_evt, client_evt):
    serv_evt.set()

    try:
        if hasattr(select, 'poll'):
            poll_fun = asyncore.poll2
        else:
            poll_fun = asyncore.poll

        n = 1000
        while asyncore.socket_map and n > 0:
            poll_fun(0.01, asyncore.socket_map)

            # when the client conversation is finished, it will
            # set client_evt, and it's then ok to kill the server
            if client_evt.is_set():
                serv.close()
                break

            n -= 1

    except socket.timeout:
        pass
    finally:
        if not client_evt.is_set():
            # allow some time for the client to read the result
            time.sleep(0.5)
            serv.close()
        asyncore.close_all()
        serv_evt.set()
test_os.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def run(self):
            self._active = True
            self.__flag.set()
            while self._active and asyncore.socket_map:
                self._active_lock.acquire()
                asyncore.loop(timeout=0.001, count=1)
                self._active_lock.release()
            asyncore.close_all()
test_asyncore.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
test_asyncore.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
test_smtpd.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
        asyncore.socket = smtpd.socket = socket
test_smtpd.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
        asyncore.socket = smtpd.socket = socket
        smtpd.DEBUGSTREAM = self.old_debugstream
test_smtpd.py 文件源码 项目:ouroboros 作者: pybee 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def tearDown(self):
        asyncore.close_all()
        asyncore.socket = smtpd.socket = socket
        smtpd.DEBUGSTREAM = self.old_debugstream


问题


面经


文章

微信
公众号

扫码关注公众号