def _getEndpoint(self, scheme, host, port):
"""
Get an endpoint for the given host and port, using a transport
selected based on scheme.
@param scheme: A string like C{'http'} or C{'https'} (the only two
supported values) to use to determine how to establish the
connection.
@param host: A C{str} giving the hostname which will be connected to in
order to issue a request.
@param port: An C{int} giving the port number the connection will be
on.
@return: An endpoint which can be used to connect to given address.
"""
kwargs = {}
if self._connectTimeout is not None:
kwargs['timeout'] = self._connectTimeout
kwargs['bindAddress'] = self._bindAddress
if scheme == 'http':
return TCP4ClientEndpoint(self._reactor, host, port, **kwargs)
elif scheme == 'https':
return SSL4ClientEndpoint(self._reactor, host, port,
self._wrapContextFactory(host, port),
**kwargs)
else:
raise SchemeNotSupported("Unsupported scheme: %r" % (scheme,))
python类SSL4ClientEndpoint()的实例源码
def test_sslPositionalArgs(self):
"""
When passed an SSL strports description, L{clientFromString} returns a
L{SSL4ClientEndpoint} instance initialized with the values from the
string.
"""
reactor = object()
client = endpoints.clientFromString(
reactor,
"ssl:example.net:4321:privateKey=%s:"
"certKey=%s:bindAddress=10.0.0.3:timeout=3:caCertsDir=%s" %
(escapedPEMPathName, escapedPEMPathName, escapedCAsPathName))
self.assertIsInstance(client, endpoints.SSL4ClientEndpoint)
self.assertIs(client._reactor, reactor)
self.assertEqual(client._host, "example.net")
self.assertEqual(client._port, 4321)
self.assertEqual(client._timeout, 3)
self.assertEqual(client._bindAddress, ("10.0.0.3", 0))
def test_sslWithDefaults(self):
"""
When passed an SSL strports description without extra arguments,
L{clientFromString} returns a L{SSL4ClientEndpoint} instance
whose context factory is initialized with default values.
"""
reactor = object()
client = endpoints.clientFromString(reactor, "ssl:example.net:4321")
self.assertIsInstance(client, endpoints.SSL4ClientEndpoint)
self.assertIs(client._reactor, reactor)
self.assertEqual(client._host, "example.net")
self.assertEqual(client._port, 4321)
certOptions = client._sslContextFactory
self.assertEqual(certOptions.method, SSLv23_METHOD)
self.assertIsNone(certOptions.certificate)
self.assertIsNone(certOptions.privateKey)
def main(reactor):
pem = generate_certs.and_generate()
caPem = FilePath(b"ca-private-cert.pem").getContent()
clientEndpoint = SSL4ClientEndpoint(
reactor, u"localhost", 4321,
optionsForClientTLS(u"the-authority", Certificate.loadPEM(caPem),
PrivateCertificate.loadPEM(pem)),
)
clientEndpoint = SSL4ClientEndpoint(
reactor, u"localhost", 4321,
optionsForClientTLS(u"the-authority", Certificate.loadPEM(caPem),
PrivateCertificate.loadPEM(pem)),
)
proto = yield clientEndpoint.connect(Factory.forProtocol(SendAnyData))
yield proto.deferred
def subEndpoint(self, reactor, host, port, contextFactory):
"""
Create an endpoint to connect to based on a single address result from
L{getaddrinfo}.
@param reactor: the reactor to connect to
@type reactor: L{IReactorTCP}
@param host: The IP address of the host to connect to, in presentation
format.
@type host: L{str}
@param port: The numeric port number to connect to.
@type port: L{int}
@param contextFactory: If not L{None}, the OpenSSL context factory to
use to produce client connections.
@return: a stream client endpoint that will connect to the given host
and port via the given reactor.
@rtype: L{IStreamClientEndpoint}
"""
if contextFactory is None:
return TCP4ClientEndpoint(reactor, host, port)
else:
return SSL4ClientEndpoint(reactor, host, port, contextFactory)
def createClientEndpoint(self, reactor, clientFactory, **connectArgs):
"""
Create an L{SSL4ClientEndpoint} and return the values needed to verify
its behaviour.
@param reactor: A fake L{IReactorSSL} that L{SSL4ClientEndpoint} can
call L{IReactorSSL.connectSSL} on.
@param clientFactory: The thing that we expect to be passed to our
L{IStreamClientEndpoint.connect} implementation.
@param connectArgs: Optional dictionary of arguments to
L{IReactorSSL.connectSSL}
"""
address = IPv4Address("TCP", "localhost", 80)
if connectArgs is None:
connectArgs = {}
return (endpoints.SSL4ClientEndpoint(reactor,
address.host,
address.port,
self.clientSSLContext,
**connectArgs),
(address.host, address.port, clientFactory,
self.clientSSLContext,
connectArgs.get('timeout', 30),
connectArgs.get('bindAddress', None)),
address)
def __init__(self,
url,
realm=None,
extra=None,
serializers=None,
ssl=None,
proxy=None,
headers=None):
"""
:param url: The WebSocket URL of the WAMP router to connect to (e.g. `ws://somehost.com:8090/somepath`)
:type url: str
:param realm: The WAMP realm to join the application session to.
:type realm: str
:param extra: Optional extra configuration to forward to the application component.
:type extra: dict
:param serializers: A list of WAMP serializers to use (or None for default serializers).
Serializers must implement :class:`autobahn.wamp.interfaces.ISerializer`.
:type serializers: list
:param ssl: (Optional). If specified this should be an
instance suitable to pass as ``sslContextFactory`` to
:class:`twisted.internet.endpoints.SSL4ClientEndpoint`` such
as :class:`twisted.internet.ssl.CertificateOptions`. Leaving
it as ``None`` will use the result of calling Twisted's
:meth:`twisted.internet.ssl.platformTrust` which tries to use
your distribution's CA certificates.
:type ssl: :class:`twisted.internet.ssl.CertificateOptions`
:param proxy: Explicit proxy server to use; a dict with ``host`` and ``port`` keys
:type proxy: dict or None
:param headers: Additional headers to send (only applies to WAMP-over-WebSocket).
:type headers: dict
"""
assert(type(url) == six.text_type)
assert(realm is None or type(realm) == six.text_type)
assert(extra is None or type(extra) == dict)
assert(headers is None or type(headers) == dict)
assert(proxy is None or type(proxy) == dict)
self.url = url
self.realm = realm
self.extra = extra or dict()
self.serializers = serializers
self.ssl = ssl
self.proxy = proxy
self.headers = headers
# this if for auto-reconnection when Twisted ClientService is avail
self._client_service = None
# total number of successful connections
self._connect_successes = 0
def test_ssl(self):
"""
When passed an SSL strports description, L{clientFromString} returns a
L{SSL4ClientEndpoint} instance initialized with the values from the
string.
"""
reactor = object()
client = endpoints.clientFromString(
reactor,
"ssl:host=example.net:port=4321:privateKey=%s:"
"certKey=%s:bindAddress=10.0.0.3:timeout=3:caCertsDir=%s" %
(escapedPEMPathName, escapedPEMPathName, escapedCAsPathName))
self.assertIsInstance(client, endpoints.SSL4ClientEndpoint)
self.assertIs(client._reactor, reactor)
self.assertEqual(client._host, "example.net")
self.assertEqual(client._port, 4321)
self.assertEqual(client._timeout, 3)
self.assertEqual(client._bindAddress, ("10.0.0.3", 0))
certOptions = client._sslContextFactory
self.assertIsInstance(certOptions, CertificateOptions)
self.assertEqual(certOptions.method, SSLv23_METHOD)
self.assertTrue(certOptions._options & OP_NO_SSLv3)
ctx = certOptions.getContext()
self.assertIsInstance(ctx, ContextType)
self.assertEqual(Certificate(certOptions.certificate), testCertificate)
privateCert = PrivateCertificate(certOptions.certificate)
privateCert._setPrivateKey(KeyPair(certOptions.privateKey))
self.assertEqual(privateCert, testPrivateCertificate)
expectedCerts = [
Certificate.loadPEM(x.getContent()) for x in
[casPath.child("thing1.pem"), casPath.child("thing2.pem")]
if x.basename().lower().endswith('.pem')
]
addedCerts = []
class ListCtx(object):
def get_cert_store(self):
class Store(object):
def add_cert(self, cert):
addedCerts.append(cert)
return Store()
certOptions.trustRoot._addCACertsToContext(ListCtx())
self.assertEqual(
sorted((Certificate(x) for x in addedCerts),
key=lambda cert: cert.digest()),
sorted(expectedCerts,
key=lambda cert: cert.digest())
)