test_asyncore.py 文件源码

python
阅读 59 收藏 0 点赞 0 评论 0

项目:ouroboros 作者: pybee 项目源码 文件源码
def test_quick_connect(self):
        # see: http://bugs.python.org/issue10340
        if self.family in (socket.AF_INET, getattr(socket, "AF_INET6", object())):
            server = BaseServer(self.family, self.addr)
            t = threading.Thread(target=lambda: asyncore.loop(timeout=0.1,
                                                              count=500))
            t.start()
            def cleanup():
                t.join(timeout=TIMEOUT)
                if t.is_alive():
                    self.fail("join() timed out")
            self.addCleanup(cleanup)

            s = socket.socket(self.family, socket.SOCK_STREAM)
            s.settimeout(.2)
            s.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER,
                         struct.pack('ii', 1, 0))
            try:
                s.connect(server.address)
            except OSError:
                pass
            finally:
                s.close()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号