python类Clock()的实例源码

test_sse_protocol.py 文件源码 项目:marathon-acme 作者: praekeltfoundation 项目源码 文件源码 阅读 17 收藏 0 点赞 0 评论 0
def test_lost_connection_cancels_timeout(self):
        """
        When the connection is lost, the timeout should be cancelled and have
        no effect.
        """
        timeout = 5
        clock = Clock()
        protocol = make_protocol(timeout=timeout, reactor=clock)
        protocol.connectionMade()

        assert not protocol.transport.disconnecting

        # Lose the connection
        protocol.connectionLost()

        # Advance to the timeout
        clock.advance(timeout)
        # Timeout should *not* be triggered
        assert not protocol.transport.disconnecting
test_task.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def testDelayedStart(self):
        timings = [0.05, 0.1, 0.1]

        clock = task.Clock()

        L = []
        lc = TestableLoopingCall(clock, L.append, None)
        d = lc.start(0.1, now=False)

        theResult = []
        def saveResult(result):
            theResult.append(result)
        d.addCallback(saveResult)

        clock.pump(timings)

        self.assertEquals(len(L), 2,
                          "got %d iterations, not 2" % (len(L),))
        lc.stop()
        self.assertIdentical(theResult[0], lc)

        self.failIf(clock.calls)
test_task.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def testWaitDeferred(self):
        # Tests if the callable isn't scheduled again before the returned
        # deferred has fired.
        timings = [0.2, 0.8]
        clock = task.Clock()

        def foo():
            d = defer.Deferred()
            d.addCallback(lambda _: lc.stop())
            clock.callLater(1, d.callback, None)
            return d

        lc = TestableLoopingCall(clock, foo)
        d = lc.start(0.2)
        clock.pump(timings)
        self.failIf(clock.calls)
test_task.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def testFailurePropagation(self):
        # Tests if the failure of the errback of the deferred returned by the
        # callable is propagated to the lc errback.
        #
        # To make sure this test does not hang trial when LoopingCall does not
        # wait for the callable's deferred, it also checks there are no
        # calls in the clock's callLater queue.
        timings = [0.3]
        clock = task.Clock()

        def foo():
            d = defer.Deferred()
            clock.callLater(0.3, d.errback, TestException())
            return d

        lc = TestableLoopingCall(clock, foo)
        d = lc.start(1)
        self.assertFailure(d, TestException)

        clock.pump(timings)
        self.failIf(clock.calls)
        return d
test_protocols.py 文件源码 项目:hostapd-mana 作者: adde88 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def testPausing(self):
        """
        Test pause inside data receiving. It uses fake clock to see if
        pausing/resuming work.
        """
        for packet_size in range(1, 10):
            t = StringIOWithoutClosing()
            clock = task.Clock()
            a = LineTester(clock)
            a.makeConnection(protocol.FileWrapper(t))
            for i in range(len(self.pause_buf)/packet_size + 1):
                s = self.pause_buf[i*packet_size:(i+1)*packet_size]
                a.dataReceived(s)
            self.failUnlessEqual(self.pause_output1, a.received)
            clock.advance(0)
            self.failUnlessEqual(self.pause_output2, a.received)
test_bindcache.py 文件源码 项目:privacyidea-ldap-proxy 作者: NetKnights-GmbH 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_manual_removal(self):
        cache = BindCache()
        clock = task.Clock()
        cache.callLater = clock.callLater
        cache.add_to_cache(DN, APP, PASSWORD)
        cache.add_to_cache(DN_OTHER, APP_OTHER, PASSWORD_OTHER)
        self.assertTrue(cache.is_cached(DN, APP, PASSWORD))
        self.assertTrue(cache.is_cached(DN_OTHER, APP_OTHER, PASSWORD_OTHER))
        # Remove (DN, APP, PASSWORD)
        cache.remove_from_cache(DN, APP, PASSWORD)
        self.assertFalse(cache.is_cached(DN, APP, PASSWORD))
        self.assertTrue(cache.is_cached(DN_OTHER, APP_OTHER, PASSWORD_OTHER))
        # Remove (DN_OTHER, APP_OTHER, PASSWORD_OTHER)
        cache.remove_from_cache(DN_OTHER, APP_OTHER, PASSWORD_OTHER)
        self.assertFalse(cache.is_cached(DN, APP, PASSWORD))
        self.assertFalse(cache.is_cached(DN_OTHER, APP_OTHER, PASSWORD_OTHER))
        # Remove (DN_OTHER, PASSWORD_OTHER) again
        cache.remove_from_cache(DN_OTHER, APP_OTHER, PASSWORD_OTHER)
        self.assertFalse(cache.is_cached(DN, APP, PASSWORD))
        self.assertFalse(cache.is_cached(DN_OTHER, APP_OTHER, PASSWORD_OTHER))
test_appcache.py 文件源码 项目:privacyidea-ldap-proxy 作者: NetKnights-GmbH 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def test_manual_removal(self):
        cache = AppCache(TIMEOUT)
        clock = task.Clock()
        cache.callLater = clock.callLater
        cache.add_to_cache(DN1, MARKER1)
        cache.add_to_cache(DN2, MARKER2)
        self.assertEqual(cache.get_cached_marker(DN1), MARKER1)
        self.assertEqual(cache.get_cached_marker(DN2), MARKER2)
        # Remove (DN2, MARKER2)
        cache.remove_from_cache(DN2, MARKER2)
        self.assertEqual(cache.get_cached_marker(DN1), MARKER1)
        self.assertEqual(cache.get_cached_marker(DN2), None)
        # Overwrite (DN1, MARKER1) with MARKER2
        cache.add_to_cache(DN1, MARKER2)
        self.assertEqual(cache.get_cached_marker(DN1), MARKER2)
        # Remove (DN1, MARKER1) -- no effect.
        cache.remove_from_cache(DN1, MARKER1)
        self.assertEqual(cache.get_cached_marker(DN1), MARKER2)
        # Remove (DN1_OTHERCASE, MAKER2) -- no effect.
        cache.remove_from_cache(DN1_OTHERCASE, MARKER2)
        self.assertEqual(cache.get_cached_marker(DN1), MARKER2)
        # Remove (DN1, MARKER2) -- removed!
        cache.remove_from_cache(DN1, MARKER2)
        self.assertEqual(cache.get_cached_marker(DN1), None)
        self.assertEqual(cache.get_cached_marker(DN2), None)
test_task.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 27 收藏 0 点赞 0 评论 0
def testDelayedStart(self):
        timings = [0.05, 0.1, 0.1]

        clock = task.Clock()

        L = []
        lc = TestableLoopingCall(clock, L.append, None)
        d = lc.start(0.1, now=False)

        theResult = []
        def saveResult(result):
            theResult.append(result)
        d.addCallback(saveResult)

        clock.pump(timings)

        self.assertEquals(len(L), 2,
                          "got %d iterations, not 2" % (len(L),))
        lc.stop()
        self.assertIdentical(theResult[0], lc)

        self.failIf(clock.calls)
test_task.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def testWaitDeferred(self):
        # Tests if the callable isn't scheduled again before the returned
        # deferred has fired.
        timings = [0.2, 0.8]
        clock = task.Clock()

        def foo():
            d = defer.Deferred()
            d.addCallback(lambda _: lc.stop())
            clock.callLater(1, d.callback, None)
            return d

        lc = TestableLoopingCall(clock, foo)
        d = lc.start(0.2)
        clock.pump(timings)
        self.failIf(clock.calls)
test_task.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 35 收藏 0 点赞 0 评论 0
def testFailurePropagation(self):
        # Tests if the failure of the errback of the deferred returned by the
        # callable is propagated to the lc errback.
        #
        # To make sure this test does not hang trial when LoopingCall does not
        # wait for the callable's deferred, it also checks there are no
        # calls in the clock's callLater queue.
        timings = [0.3]
        clock = task.Clock()

        def foo():
            d = defer.Deferred()
            clock.callLater(0.3, d.errback, TestException())
            return d

        lc = TestableLoopingCall(clock, foo)
        d = lc.start(1)
        self.assertFailure(d, TestException)

        clock.pump(timings)
        self.failIf(clock.calls)
        return d
test_protocols.py 文件源码 项目:sslstrip-hsts-openwrt 作者: adde88 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def testPausing(self):
        """
        Test pause inside data receiving. It uses fake clock to see if
        pausing/resuming work.
        """
        for packet_size in range(1, 10):
            t = StringIOWithoutClosing()
            clock = task.Clock()
            a = LineTester(clock)
            a.makeConnection(protocol.FileWrapper(t))
            for i in range(len(self.pause_buf)/packet_size + 1):
                s = self.pause_buf[i*packet_size:(i+1)*packet_size]
                a.dataReceived(s)
            self.failUnlessEqual(self.pause_output1, a.received)
            clock.advance(0)
            self.failUnlessEqual(self.pause_output2, a.received)
testutil.py 文件源码 项目:deb-python-txaio 作者: openstack 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def replace_loop(new_loop):
    """
    This is a context-manager that sets the txaio event-loop to the
    one supplied temporarily. It's up to you to ensure you pass an
    event_loop or a reactor instance depending upon asyncio/Twisted.

    Use like so:

    .. sourcecode:: python

        from twisted.internet import task
        with replace_loop(task.Clock()) as fake_reactor:
            f = txaio.call_later(5, foo)
            fake_reactor.advance(10)
            # ...etc
    """

    # setup
    orig = txaio.config.loop
    txaio.config.loop = new_loop

    yield new_loop

    # cleanup
    txaio.config.loop = orig
test_call_later.py 文件源码 项目:deb-python-txaio 作者: openstack 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_call_later_tx(framework_tx):
    '''
    Wait for two Futures.
    '''

    from twisted.internet.task import Clock
    new_loop = Clock()
    calls = []
    with replace_loop(new_loop) as fake_loop:
        def foo(*args, **kw):
            calls.append((args, kw))

        delay = txaio.call_later(1, foo, 5, 6, 7, foo="bar")
        assert len(calls) == 0
        assert hasattr(delay, 'cancel')
        fake_loop.advance(2)

        assert len(calls) == 1
        assert calls[0][0] == (5, 6, 7)
        assert calls[0][1] == dict(foo="bar")
test_all.py 文件源码 项目:loopix 作者: UCL-InfoSec 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_client_make_real_stream():
    alice = env.clients[0]
    alice.transport.written = []
    alice.reactor = task.Clock()

    alice.make_real_stream()
    assert len(alice.transport.written) == 1
    alice.reactor.advance(100)
    assert len(alice.transport.written) == 2

    calls = alice.reactor.getDelayedCalls()
    for c in calls:
        assert c.func == alice.make_real_stream

    # Check whether a real message is sent if buffer non-empty
    receiver = env.pubs_clients[1]
    path = [alice.provider] + env.pubs_mixes + [receiver.provider]
    test_packet = 'ABCDefgHIJKlmnOPRstUWxyz'
    alice.output_buffer.put(test_packet)
    alice.make_real_stream()
    packet, addr = alice.transport.written[-1]
    assert petlib.pack.decode(packet) == 'ABCDefgHIJKlmnOPRstUWxyz'
    assert addr == (alice.provider.host, alice.provider.port)
test_internet.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_clientConnectionFailed(self):
        """
        When a client connection fails, the service removes its reference
        to the protocol and tries again after a timeout.
        """
        clock = Clock()
        cq, service = self.makeReconnector(fireImmediately=False,
                                           clock=clock)
        self.assertEqual(len(cq.connectQueue), 1)
        cq.connectQueue[0].errback(Failure(Exception()))
        whenConnected = service.whenConnected()
        self.assertNoResult(whenConnected)
        # Don't fail during test tear-down when service shutdown causes all
        # waiting connections to fail.
        whenConnected.addErrback(lambda ignored: ignored.trap(CancelledError))
        clock.advance(AT_LEAST_ONE_ATTEMPT)
        self.assertEqual(len(cq.connectQueue), 2)
test_internet.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def test_clientConnectionLost(self):
        """
        When a client connection is lost, the service removes its reference
        to the protocol and calls retry.
        """
        clock = Clock()
        cq, service = self.makeReconnector(clock=clock, fireImmediately=False)
        self.assertEqual(len(cq.connectQueue), 1)
        cq.connectQueue[0].callback(None)
        self.assertEqual(len(cq.connectQueue), 1)
        self.assertIdentical(self.successResultOf(service.whenConnected()),
                             cq.applicationProtocols[0])
        cq.constructedProtocols[0].connectionLost(Failure(Exception()))
        clock.advance(AT_LEAST_ONE_ATTEMPT)
        self.assertEqual(len(cq.connectQueue), 2)
        cq.connectQueue[1].callback(None)
        self.assertIdentical(self.successResultOf(service.whenConnected()),
                             cq.applicationProtocols[1])
test_internet.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_whenConnectedLater(self):
        """
        L{ClientService.whenConnected} returns a L{Deferred} that fires when a
        connection is established.
        """
        clock = Clock()
        cq, service = self.makeReconnector(fireImmediately=False, clock=clock)
        a = service.whenConnected()
        b = service.whenConnected()
        self.assertNoResult(a)
        self.assertNoResult(b)
        cq.connectQueue[0].callback(None)
        resultA = self.successResultOf(a)
        resultB = self.successResultOf(b)
        self.assertIdentical(resultA, resultB)
        self.assertIdentical(resultA, cq.applicationProtocols[0])
test_internet.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 22 收藏 0 点赞 0 评论 0
def test_whenConnectedErrbacksOnStopService(self):
        """
        L{ClientService.whenConnected} returns a L{Deferred} that
        errbacks with L{CancelledError} if
        L{ClientService.stopService} is called between connection
        attempts.
        """
        clock = Clock()
        cq, service = self.makeReconnector(fireImmediately=False,
                                           clock=clock)
        beforeErrbackAndStop = service.whenConnected()

        # The protocol fails to connect, and the service is waiting to
        # reconnect.
        cq.connectQueue[0].errback(Exception("no connection"))

        service.stopService()
        afterErrbackAndStop = service.whenConnected()

        self.assertIsInstance(self.failureResultOf(beforeErrbackAndStop).value,
                              CancelledError)
        self.assertIsInstance(self.failureResultOf(afterErrbackAndStop).value,
                              CancelledError)
test_internet.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_stopServiceWhileDisconnecting(self):
        """
        Calling L{ClientService.stopService} twice after it has
        connected (that is, stopping it while it is disconnecting)
        returns a L{Deferred} each time that fires when the
        disconnection has completed.
        """
        clock = Clock()
        cq, service = self.makeReconnector(fireImmediately=False,
                                           clock=clock)
        # The protocol connects
        cq.connectQueue[0].callback(None)

        # The protocol begins disconnecting
        firstStopDeferred = service.stopService()
        # The protocol continues disconnecting
        secondStopDeferred = service.stopService()

        # The protocol is disconnected
        cq.constructedProtocols[0].connectionLost(Failure(IndentationError()))

        self.successResultOf(firstStopDeferred)
        self.successResultOf(secondStopDeferred)
test_internet.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 34 收藏 0 点赞 0 评论 0
def test_stopServiceWhileRestarting(self):
        """
        Calling L{ClientService.stopService} after calling a
        reconnection attempt returns a L{Deferred} that fires when the
        disconnection has completed.
        """
        clock = Clock()
        cq, service = self.makeReconnector(fireImmediately=False,
                                           clock=clock)
        # The protocol connects
        cq.connectQueue[0].callback(None)

        # The protocol begins disconnecting
        firstStopDeferred = service.stopService()
        # The protocol begins reconnecting
        service.startService()
        # The protocol begins disconnecting again
        secondStopDeferred = service.stopService()

        # The protocol is disconnected
        cq.constructedProtocols[0].connectionLost(Failure(IndentationError()))

        self.successResultOf(firstStopDeferred)
        self.successResultOf(secondStopDeferred)
test_cftp.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def test_printProgressBarNoProgress(self):
        """
        L{StdioClient._printProgressBar} prints a progress description that
        indicates 0 bytes transferred if no bytes have been transferred and no
        time has passed.
        """
        self.setKnownConsoleSize(10, 34)
        clock = self.client.reactor = Clock()
        wrapped = BytesIO(b"x")
        wrapped.name = b"sample"
        wrapper = cftp.FileWrapper(wrapped)
        startTime = clock.seconds()

        self.client._printProgressBar(wrapper, startTime)

        if _PY3:
            result = b"\rb'sample'  0% 0.0B 0.0Bps 00:00 "
        else:
            result = "\rsample  0% 0.0B 0.0Bps 00:00 "
        self.assertEqual(self.client.transport.value(), result)
test_userauth.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 18 收藏 0 点赞 0 评论 0
def test_failedPasswordAuthentication(self):
        """
        When provided with invalid authentication details, the server should
        respond by sending a MSG_USERAUTH_FAILURE message which states whether
        the authentication was partially successful, and provides other, open
        options for authentication.

        See RFC 4252, Section 5.1.
        """
        # packet = username, next_service, authentication type, FALSE, password
        packet = b''.join([NS(b'foo'), NS(b'none'), NS(b'password'), chr(0),
                           NS(b'bar')])
        self.authServer.clock = task.Clock()
        d = self.authServer.ssh_USERAUTH_REQUEST(packet)
        self.assertEqual(self.authServer.transport.packets, [])
        self.authServer.clock.advance(2)
        return d.addCallback(self._checkFailed)
test_userauth.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 21 收藏 0 点赞 0 评论 0
def test_loginTimeout(self):
        """
        Test that the login times out.
        """
        timeoutAuthServer = userauth.SSHUserAuthServer()
        timeoutAuthServer.clock = task.Clock()
        timeoutAuthServer.transport = FakeTransport(self.portal)
        timeoutAuthServer.serviceStarted()
        timeoutAuthServer.clock.advance(11 * 60 * 60)
        timeoutAuthServer.serviceStopped()
        self.assertEqual(timeoutAuthServer.transport.packets,
                [(transport.MSG_DISCONNECT,
                b'\x00' * 3 +
                chr(transport.DISCONNECT_NO_MORE_AUTH_METHODS_AVAILABLE) +
                NS(b"you took too long") + NS(b''))])
        self.assertTrue(timeoutAuthServer.transport.lostConnection)
test_http.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 36 收藏 0 点赞 0 评论 0
def test_requestBodyTimeout(self):
        """
        L{HTTPChannel} resets its timeout whenever data from a request body is
        delivered to it.
        """
        clock = Clock()
        transport = StringTransport()
        protocol = http.HTTPChannel()
        protocol.timeOut = 100
        protocol.callLater = clock.callLater
        protocol.makeConnection(transport)
        protocol.dataReceived(b'POST / HTTP/1.0\r\nContent-Length: 2\r\n\r\n')
        clock.advance(99)
        self.assertFalse(transport.disconnecting)
        protocol.dataReceived(b'x')
        clock.advance(99)
        self.assertFalse(transport.disconnecting)
        protocol.dataReceived(b'x')
        self.assertEqual(len(protocol.requests), 1)
test_http.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_requestBodyDefaultTimeout(self):
        """
        L{HTTPChannel}'s default timeout is 60 seconds.
        """
        clock = Clock()
        transport = StringTransport()
        factory = http.HTTPFactory()
        protocol = factory.buildProtocol(None)

        # This is a terrible violation of the abstraction later of
        # _genericHTTPChannelProtocol, but we need to do it because
        # policies.TimeoutMixin doesn't accept a reactor on the object.
        # See https://twistedmatrix.com/trac/ticket/8488
        protocol._channel.callLater = clock.callLater
        protocol.makeConnection(transport)
        protocol.dataReceived(b'POST / HTTP/1.0\r\nContent-Length: 2\r\n\r\n')
        clock.advance(59)
        self.assertFalse(transport.disconnecting)
        clock.advance(1)
        self.assertTrue(transport.disconnecting)
test_http.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 29 收藏 0 点赞 0 评论 0
def test_requestBodyTimeoutFromFactory(self):
        """
        L{HTTPChannel} timeouts whenever data from a request body is not
        delivered to it in time, even when it gets built from a L{HTTPFactory}.
        """
        clock = Clock()
        factory = http.HTTPFactory(timeout=100, reactor=clock)
        factory.startFactory()
        protocol = factory.buildProtocol(None)
        transport = StringTransport()

        # Confirm that the timeout is what we think it is.
        self.assertEqual(protocol.timeOut, 100)

        # This is a terrible violation of the abstraction later of
        # _genericHTTPChannelProtocol, but we need to do it because
        # policies.TimeoutMixin doesn't accept a reactor on the object.
        # See https://twistedmatrix.com/trac/ticket/8488
        protocol._channel.callLater = clock.callLater
        protocol.makeConnection(transport)
        protocol.dataReceived(b'POST / HTTP/1.0\r\nContent-Length: 2\r\n\r\n')
        clock.advance(99)
        self.assertFalse(transport.disconnecting)
        clock.advance(2)
        self.assertTrue(transport.disconnecting)
test_http.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 24 收藏 0 点赞 0 评论 0
def test_HTTPChannelUnregistersSelfWhenTimingOut(self):
        """
        L{HTTPChannel} unregisters itself when it times out a connection.
        """
        clock = Clock()
        transport = StringTransport()
        channel = http.HTTPChannel()

        # Patch the channel's callLater method.
        channel.timeOut = 100
        channel.callLater = clock.callLater
        channel.makeConnection(transport)

        # Tick the clock forward almost to the timeout.
        clock.advance(99)
        self.assertIs(transport.producer, channel)
        self.assertIs(transport.streaming, True)

        # Fire the timeout.
        clock.advance(1)
        self.assertIs(transport.producer, None)
        self.assertIs(transport.streaming, None)
test_agent.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 31 收藏 0 点赞 0 评论 0
def test_dontRetryIfRetryAutomaticallyFalse(self):
        """
        If L{HTTPConnectionPool.retryAutomatically} is set to C{False}, don't
        wrap connections with retrying logic.
        """
        pool = client.HTTPConnectionPool(Clock())
        pool.retryAutomatically = False

        # Add a connection to the cache:
        protocol = StubHTTPProtocol()
        protocol.makeConnection(StringTransport())
        pool._putConnection(123, protocol)

        # Retrieve it, it should come back unwrapped:
        d = pool.getConnection(123, DummyEndpoint())

        def gotConnection(connection):
            self.assertIdentical(connection, protocol)
        return d.addCallback(gotConnection)
test_web.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 25 收藏 0 点赞 0 评论 0
def _xforwardedforTest(self, header):
        """
        Assert that a request with the given value in its I{X-Forwarded-For}
        header is logged by L{proxiedLogFormatter} the same way it would have
        been logged by L{combinedLogFormatter} but with 172.16.1.2 as the
        client address instead of the normal value.

        @param header: An I{X-Forwarded-For} header with left-most address of
            172.16.1.2.
        """
        reactor = Clock()
        reactor.advance(1234567890)

        timestamp = http.datetimeToLogString(reactor.seconds())
        request = DummyRequestForLogTest(http.HTTPFactory(reactor=reactor))
        expected = http.combinedLogFormatter(timestamp, request).replace(
            u"1.2.3.4", u"172.16.1.2")
        request.requestHeaders.setRawHeaders(b"x-forwarded-for", [header])
        line = http.proxiedLogFormatter(timestamp, request)

        self.assertEqual(expected, line)
test_http2.py 文件源码 项目:zenchmarks 作者: squeaky-pl 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def test_initiallySchedulesOneDataCall(self):
        """
        When a H2Connection is established it schedules one call to be run as
        soon as the reactor has time.
        """
        reactor = task.Clock()
        a = H2Connection(reactor)

        calls = reactor.getDelayedCalls()
        self.assertEqual(len(calls), 1)
        call = calls[0]

        # Validate that the call is scheduled for right now, but hasn't run,
        # and that it's correct.
        self.assertTrue(call.active())
        self.assertEqual(call.time, 0)
        self.assertEqual(call.func, a._sendPrioritisedData)
        self.assertEqual(call.args, ())
        self.assertEqual(call.kw, {})


问题


面经


文章

微信
公众号

扫码关注公众号