greenio_test.py 文件源码

python
阅读 22 收藏 0 点赞 0 评论 0

项目:deb-python-eventlet 作者: openstack 项目源码 文件源码
def test_blocking_accept_mark_as_reopened(self):
        evt_hub = get_hub()
        with mock.patch.object(evt_hub, "mark_as_reopened") as patched_mark_as_reopened:
            def connect_once(listener):
                # delete/overwrite the original conn
                # object, only keeping the file object around
                # closing the file object should close everything
                client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                client.connect(('127.0.0.1', listener.getsockname()[1]))
                client.close()

            server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
            server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
            server.bind(('127.0.0.1', 0))
            server.listen(50)
            acceptlet = eventlet.spawn(connect_once, server)
            conn, addr = server.accept()
            conn.sendall(b'hello\n')
            connfileno = conn.fileno()
            conn.close()
            assert patched_mark_as_reopened.called
            assert patched_mark_as_reopened.call_count == 3, "3 fds were opened, but the hub was " \
                                                             "only notified {call_count} times" \
                .format(call_count=patched_mark_as_reopened.call_count)
            args, kwargs = patched_mark_as_reopened.call_args
            assert args == (connfileno,), "Expected mark_as_reopened to be called " \
                                          "with {expected_fileno}, but it was called " \
                                          "with {fileno}".format(expected_fileno=connfileno,
                                                                 fileno=args[0])
        server.close()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号