test_asyncore.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:oil 作者: oilshell 项目源码 文件源码
def test_send(self):
        evt = threading.Event()
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.settimeout(3)
        port = test_support.bind_port(sock)

        cap = StringIO()
        args = (evt, cap, sock)
        t = threading.Thread(target=capture_server, args=args)
        t.start()
        try:
            # wait a little longer for the server to initialize (it sometimes
            # refuses connections on slow machines without this wait)
            time.sleep(0.2)

            data = "Suppose there isn't a 16-ton weight?"
            d = dispatcherwithsend_noread()
            d.create_socket(socket.AF_INET, socket.SOCK_STREAM)
            d.connect((HOST, port))

            # give time for socket to connect
            time.sleep(0.1)

            d.send(data)
            d.send(data)
            d.send('\n')

            n = 1000
            while d.out_buffer and n > 0:
                asyncore.poll()
                n -= 1

            evt.wait()

            self.assertEqual(cap.getvalue(), data*2)
        finally:
            t.join()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号