def test_cancelGetConnectionCancelsEndpointConnect(self):
"""
Cancelling the C{Deferred} returned from
L{HTTPConnectionPool.getConnection} cancels the C{Deferred} returned
by opening a new connection with the given endpoint.
"""
self.assertEqual(self.pool._connections, {})
connectionResult = Deferred()
class Endpoint:
def connect(self, factory):
return connectionResult
d = self.pool.getConnection(12345, Endpoint())
d.cancel()
self.assertEqual(self.failureResultOf(connectionResult).type,
CancelledError)
python类HTTPConnectionPool()的实例源码
def test_nonPersistent(self):
"""
If C{persistent} is set to C{False} when creating the
L{HTTPConnectionPool}, C{Request}s are created with their
C{persistent} flag set to C{False}.
Elsewhere in the tests for the underlying HTTP code we ensure that
this will result in the disconnection of the HTTP protocol once the
request is done, so that the connection will not be returned to the
pool.
"""
pool = HTTPConnectionPool(self.reactor, persistent=False)
agent = client.Agent(self.reactor, pool=pool)
agent._getEndpoint = lambda *args: self
agent.request(b"GET", b"http://127.0.0.1")
self.assertEqual(self.protocol.requests[0][0].persistent, False)
def test_onlyRetryIdempotentMethods(self):
"""
Only GET, HEAD, OPTIONS, TRACE, DELETE methods cause a retry.
"""
pool = client.HTTPConnectionPool(None)
connection = client._RetryingHTTP11ClientProtocol(None, pool)
self.assertTrue(connection._shouldRetry(
b"GET", RequestNotSent(), None))
self.assertTrue(connection._shouldRetry(
b"HEAD", RequestNotSent(), None))
self.assertTrue(connection._shouldRetry(
b"OPTIONS", RequestNotSent(), None))
self.assertTrue(connection._shouldRetry(
b"TRACE", RequestNotSent(), None))
self.assertTrue(connection._shouldRetry(
b"DELETE", RequestNotSent(), None))
self.assertFalse(connection._shouldRetry(
b"POST", RequestNotSent(), None))
self.assertFalse(connection._shouldRetry(
b"MYMETHOD", RequestNotSent(), None))
# This will be covered by a different ticket, since we need support
#for resettable body producers:
# self.assertTrue(connection._doRetry("PUT", RequestNotSent(), None))
def test_onlyRetryIfNoResponseReceived(self):
"""
Only L{RequestNotSent}, L{RequestTransmissionFailed} and
L{ResponseNeverReceived} exceptions cause a retry.
"""
pool = client.HTTPConnectionPool(None)
connection = client._RetryingHTTP11ClientProtocol(None, pool)
self.assertTrue(connection._shouldRetry(
b"GET", RequestNotSent(), None))
self.assertTrue(connection._shouldRetry(
b"GET", RequestTransmissionFailed([]), None))
self.assertTrue(connection._shouldRetry(
b"GET", ResponseNeverReceived([]),None))
self.assertFalse(connection._shouldRetry(
b"GET", ResponseFailed([]), None))
self.assertFalse(connection._shouldRetry(
b"GET", ConnectionRefusedError(), None))
def test_wrappedOnPersistentReturned(self):
"""
If L{client.HTTPConnectionPool.getConnection} returns a previously
cached connection, it will get wrapped in a
L{client._RetryingHTTP11ClientProtocol}.
"""
pool = client.HTTPConnectionPool(Clock())
# Add a connection to the cache:
protocol = StubHTTPProtocol()
protocol.makeConnection(StringTransport())
pool._putConnection(123, protocol)
# Retrieve it, it should come back wrapped in a
# _RetryingHTTP11ClientProtocol:
d = pool.getConnection(123, DummyEndpoint())
def gotConnection(connection):
self.assertIsInstance(connection,
client._RetryingHTTP11ClientProtocol)
self.assertIdentical(connection._clientProtocol, protocol)
return d.addCallback(gotConnection)
def test_dontRetryIfRetryAutomaticallyFalse(self):
"""
If L{HTTPConnectionPool.retryAutomatically} is set to C{False}, don't
wrap connections with retrying logic.
"""
pool = client.HTTPConnectionPool(Clock())
pool.retryAutomatically = False
# Add a connection to the cache:
protocol = StubHTTPProtocol()
protocol.makeConnection(StringTransport())
pool._putConnection(123, protocol)
# Retrieve it, it should come back unwrapped:
d = pool.getConnection(123, DummyEndpoint())
def gotConnection(connection):
self.assertIdentical(connection, protocol)
return d.addCallback(gotConnection)
def inject_pool_to_treq(params):
params["pool"] = HTTPConnectionPool(reactor, params.pop("persistent", True))
for key, default_value in TREQ_POOL_DEFAULT_PARAMS.items():
setattr(params["pool"], key, params.pop(key, default_value))
def __init__(self, reactor, url, pool=None, timeout=None, connect_timeout=None):
"""
:param rector: Twisted reactor to use.
:type reactor: class
:param url: etcd URL, eg `http://localhost:2379`
:type url: str
:param pool: Twisted Web agent connection pool
:type pool:
:param timeout: If given, a global request timeout used for all
requests to etcd.
:type timeout: float or None
:param connect_timeout: If given, a global connection timeout used when
opening a new HTTP connection to etcd.
:type connect_timeout: float or None
"""
if type(url) != six.text_type:
raise TypeError('url must be of type unicode, was {}'.format(type(url)))
self._url = url
self._timeout = timeout
self._pool = pool or HTTPConnectionPool(reactor, persistent=True)
self._pool._factory.noisy = False
self._agent = Agent(reactor, connectTimeout=connect_timeout, pool=self._pool)
def setUp(self):
self.fakeReactor = self.createReactor()
self.pool = HTTPConnectionPool(self.fakeReactor)
self.pool._factory = DummyFactory
# The retry code path is tested in HTTPConnectionPoolRetryTests:
self.pool.retryAutomatically = False
def test_getReturnsNewIfCacheEmpty(self):
"""
If there are no cached connections,
L{HTTPConnectionPool.getConnection} returns a new connection.
"""
self.assertEqual(self.pool._connections, {})
def gotConnection(conn):
self.assertIsInstance(conn, StubHTTPProtocol)
# The new connection is not stored in the pool:
self.assertNotIn(conn, self.pool._connections.values())
unknownKey = 12245
d = self.pool.getConnection(unknownKey, DummyEndpoint())
return d.addCallback(gotConnection)
def test_getUsesQuiescentCallback(self):
"""
When L{HTTPConnectionPool.getConnection} connects, it returns a
C{Deferred} that fires with an instance of L{HTTP11ClientProtocol}
that has the correct quiescent callback attached. When this callback
is called the protocol is returned to the cache correctly, using the
right key.
"""
class StringEndpoint(object):
def connect(self, factory):
p = factory.buildProtocol(None)
p.makeConnection(StringTransport())
return succeed(p)
pool = HTTPConnectionPool(self.fakeReactor, True)
pool.retryAutomatically = False
result = []
key = "a key"
pool.getConnection(
key, StringEndpoint()).addCallback(
result.append)
protocol = result[0]
self.assertIsInstance(protocol, HTTP11ClientProtocol)
# Now that we have protocol instance, lets try to put it back in the
# pool:
protocol._state = "QUIESCENT"
protocol._quiescentCallback(protocol)
# If we try to retrive a connection to same destination again, we
# should get the same protocol, because it should've been added back
# to the pool:
result2 = []
pool.getConnection(
key, StringEndpoint()).addCallback(
result2.append)
self.assertIdentical(result2[0], protocol)
def test_closeCachedConnections(self):
"""
L{HTTPConnectionPool.closeCachedConnections} closes all cached
connections and removes them from the cache. It returns a Deferred
that fires when they have all lost their connections.
"""
persistent = []
def addProtocol(scheme, host, port):
p = HTTP11ClientProtocol()
p.makeConnection(StringTransport())
self.pool._putConnection((scheme, host, port), p)
persistent.append(p)
addProtocol("http", b"example.com", 80)
addProtocol("http", b"www2.example.com", 80)
doneDeferred = self.pool.closeCachedConnections()
# Connections have begun disconnecting:
for p in persistent:
self.assertEqual(p.transport.disconnecting, True)
self.assertEqual(self.pool._connections, {})
# All timeouts were cancelled and removed:
for dc in self.fakeReactor.getDelayedCalls():
self.assertEqual(dc.cancelled, True)
self.assertEqual(self.pool._timeouts, {})
# Returned Deferred fires when all connections have been closed:
result = []
doneDeferred.addCallback(result.append)
self.assertEqual(result, [])
persistent[0].connectionLost(Failure(ConnectionDone()))
self.assertEqual(result, [])
persistent[1].connectionLost(Failure(ConnectionDone()))
self.assertEqual(result, [None])
def test_persistent(self):
"""
If C{persistent} is set to C{True} on the L{HTTPConnectionPool} (the
default), C{Request}s are created with their C{persistent} flag set to
C{True}.
"""
pool = HTTPConnectionPool(self.reactor)
agent = client.Agent(self.reactor, pool=pool)
agent._getEndpoint = lambda *args: self
agent.request(b"GET", b"http://127.0.0.1")
self.assertEqual(self.protocol.requests[0][0].persistent, True)
def test_endpointFactoryDefaultPool(self):
"""
If no pool is passed in to L{Agent.usingEndpointFactory}, a default
pool is constructed with no persistent connections.
"""
agent = client.Agent.usingEndpointFactory(
self.reactor, StubEndpointFactory())
pool = agent._pool
self.assertEqual((pool.__class__, pool.persistent, pool._reactor),
(HTTPConnectionPool, False, agent._reactor))
def test_retryIfFailedDueToNonCancelException(self):
"""
If a request failed with L{ResponseNeverReceived} due to some
arbitrary exception, C{_shouldRetry} returns C{True} to indicate the
request should be retried.
"""
pool = client.HTTPConnectionPool(None)
connection = client._RetryingHTTP11ClientProtocol(None, pool)
self.assertTrue(connection._shouldRetry(
b"GET", ResponseNeverReceived([Failure(Exception())]), None))
def test_notWrappedOnNewReturned(self):
"""
If L{client.HTTPConnectionPool.getConnection} returns a new
connection, it will be returned as is.
"""
pool = client.HTTPConnectionPool(None)
d = pool.getConnection(123, DummyEndpoint())
def gotConnection(connection):
# Don't want to use isinstance since potentially the wrapper might
# subclass it at some point:
self.assertIdentical(connection.__class__, HTTP11ClientProtocol)
return d.addCallback(gotConnection)
def test_onlyRetryWithoutBody(self):
"""
L{_RetryingHTTP11ClientProtocol} only retries queries that don't have
a body.
This is an implementation restriction; if the restriction is fixed,
this test should be removed and PUT added to list of methods that
support retries.
"""
pool = client.HTTPConnectionPool(None)
connection = client._RetryingHTTP11ClientProtocol(None, pool)
self.assertTrue(connection._shouldRetry(b"GET", RequestNotSent(), None))
self.assertFalse(connection._shouldRetry(b"GET", RequestNotSent(), object()))
def __init__(self, *args, **kw):
super(TrueHeadersAgent, self).__init__(*args, **kw)
self._pool = HTTPConnectionPool(reactor, False)
def _default_client(jws_client, reactor, key, alg):
"""
Make a client if we didn't get one.
"""
if jws_client is None:
pool = HTTPConnectionPool(reactor)
agent = Agent(reactor, pool=pool)
jws_client = JWSClient(HTTPClient(agent=agent), key, alg)
return jws_client
def _get_agent():
context_factory = MyWebClientContextFactory()
try:
# HTTPConnectionPool has been present since Twisted version 12.1
from twisted.web.client import HTTPConnectionPool
pool = HTTPConnectionPool(reactor, persistent=True)
pool.maxPersistentPerHost = _MAX_PERSISTENT_PER_HOST
pool.cachedConnectionTimeout = _CACHED_CONNECTION_TIMEOUT
agent = Agent(reactor, context_factory,
connectTimeout=_CONNECT_TIMEOUT, pool=pool)
except ImportError:
from _zenclient import ZenAgent
agent = ZenAgent(reactor, context_factory, persistent=True, maxConnectionsPerHostName=1)
return agent