test_authentication.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:txkube 作者: LeastAuthority 项目源码 文件源码
def create_reactor():
    """
    Twisted 17.1.0 and higher requires a reactor which implements
    ``IReactorPluggableNameResolver``.
    """

    @implementer(IHostResolution)
    @attr.s
    class Resolution(object):
        name = attr.ib()

    class _FakeResolver(object):

        def resolveHostName(self, resolutionReceiver, hostName, *args,  **kwargs):
            portNumber = kwargs.pop('portNumber')
            r = Resolution(name=hostName)

            resolutionReceiver.resolutionBegan(r)
            if hostName in HOST_MAP:
                resolutionReceiver.addressResolved(
                    IPv4Address('TCP', HOST_MAP[hostName], portNumber))
            resolutionReceiver.resolutionComplete()
            return r

    @implementer(IReactorPluggableNameResolver)
    class _ResolvingMemoryClockReactor(MemoryReactorClock):
        nameResolver = _FakeResolver()

    return _ResolvingMemoryClockReactor()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号