def test_instantiation(self):
"""
`OpenSSL.tsafe.Connection` can be instantiated.
"""
# The following line should not throw an error. This isn't an ideal
# test. It would be great to refactor the other Connection tests so
# they could automatically be applied to this class too.
Connection(Context(TLSv1_METHOD), None)
python类TLSv1_METHOD()的实例源码
def go():
def cb(a, b, c):
print count.next()
return "foobar"
c = Context(TLSv1_METHOD)
c.set_passwd_cb(cb)
while 1:
c.use_privatekey_file('pkey.pem')
def go():
port = socket()
port.bind(('', 0))
port.listen(1)
called = []
def info(*args):
print count.next()
called.append(None)
return 1
context = Context(TLSv1_METHOD)
context.set_verify(VERIFY_PEER, info)
context.use_certificate(
load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
context.use_privatekey(
load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
while 1:
client = socket()
client.setblocking(False)
client.connect_ex(port.getsockname())
clientSSL = Connection(context, client)
clientSSL.set_connect_state()
server, ignored = port.accept()
server.setblocking(False)
serverSSL = Connection(context, server)
serverSSL.set_accept_state()
del called[:]
while not called:
for ssl in clientSSL, serverSSL:
try:
ssl.send('foo')
except WantReadError, e:
pass
def main():
port = socket()
port.bind(('', 0))
port.listen(5)
client = socket()
client.setblocking(False)
client.connect_ex(port.getsockname())
client.setblocking(True)
server = port.accept()[0]
clientCtx = Context(TLSv1_METHOD)
clientCtx.set_cipher_list('ALL:ADH')
clientCtx.load_tmp_dh('dhparam.pem')
sslClient = Connection(clientCtx, client)
sslClient.set_connect_state()
serverCtx = Context(TLSv1_METHOD)
serverCtx.set_cipher_list('ALL:ADH')
serverCtx.load_tmp_dh('dhparam.pem')
sslServer = Connection(serverCtx, server)
sslServer.set_accept_state()
t1 = Thread(target=send, args=(sslClient,))
t2 = Thread(target=send, args=(sslServer,))
t3 = Thread(target=recv, args=(sslClient,))
t4 = Thread(target=recv, args=(sslServer,))
t1.start()
t2.start()
t3.start()
t4.start()
t1.join()
t2.join()
t3.join()
t4.join()
def printcert(host, port, hostname):
con = Connection(Context(TLSv1_METHOD), socket(AF_INET, SOCK_STREAM))
con.connect((host, port))
con.set_tlsext_host_name(hostname if hostname else host)
con.do_handshake()
con.shutdown()
con.close()
print dump_certificate(FILETYPE_PEM, walkchain(con.get_peer_cert_chain()))
def main():
cert = "/etc/ssl/ihc/crt"
key = "/etc/ssl/ihc/key"
httpserver = webserver.Site(HTTPServer())
context = Context(TLSv1_METHOD)
context.use_certificate_chain_file(cert)
context.use_privatekey_file(key)
reactor.listenSSL(HTTP_PORT, httpserver, ContextFactory(context), interface='192.168.102.130')
reactor.run()
def test_method(self):
"""
L{Context} can be instantiated with one of L{SSLv2_METHOD},
L{SSLv3_METHOD}, L{SSLv23_METHOD}, or L{TLSv1_METHOD}.
"""
for meth in [SSLv2_METHOD, SSLv3_METHOD, SSLv23_METHOD, TLSv1_METHOD]:
Context(meth)
self.assertRaises(TypeError, Context, "")
self.assertRaises(ValueError, Context, 10)
def test_type(self):
"""
L{Context} and L{ContextType} refer to the same type object and can be
used to create instances of that type.
"""
self.assertIdentical(Context, ContextType)
self.assertConsistentType(Context, 'Context', TLSv1_METHOD)
def test_use_privatekey(self):
"""
L{Context.use_privatekey} takes an L{OpenSSL.crypto.PKey} instance.
"""
key = PKey()
key.generate_key(TYPE_RSA, 128)
ctx = Context(TLSv1_METHOD)
ctx.use_privatekey(key)
self.assertRaises(TypeError, ctx.use_privatekey, "")
def test_set_info_callback(self):
"""
L{Context.set_info_callback} accepts a callable which will be invoked
when certain information about an SSL connection is available.
"""
(server, client) = socket_pair()
clientSSL = Connection(Context(TLSv1_METHOD), client)
clientSSL.set_connect_state()
called = []
def info(conn, where, ret):
called.append((conn, where, ret))
context = Context(TLSv1_METHOD)
context.set_info_callback(info)
context.use_certificate(
load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
context.use_privatekey(
load_privatekey(FILETYPE_PEM, cleartextPrivateKeyPEM))
serverSSL = Connection(context, server)
serverSSL.set_accept_state()
while not called:
for ssl in clientSSL, serverSSL:
try:
ssl.do_handshake()
except WantReadError:
pass
# Kind of lame. Just make sure it got called somehow.
self.assertTrue(called)
def test_load_verify_invalid_file(self):
"""
L{Context.load_verify_locations} raises L{Error} when passed a
non-existent cafile.
"""
clientContext = Context(TLSv1_METHOD)
self.assertRaises(
Error, clientContext.load_verify_locations, self.mktemp())
def test_set_default_verify_paths_signature(self):
"""
L{Context.set_default_verify_paths} takes no arguments and raises
L{TypeError} if given any.
"""
context = Context(TLSv1_METHOD)
self.assertRaises(TypeError, context.set_default_verify_paths, None)
self.assertRaises(TypeError, context.set_default_verify_paths, 1)
self.assertRaises(TypeError, context.set_default_verify_paths, "")
def test_add_extra_chain_cert_invalid_cert(self):
"""
L{Context.add_extra_chain_cert} raises L{TypeError} if called with
other than one argument or if called with an object which is not an
instance of L{X509}.
"""
context = Context(TLSv1_METHOD)
self.assertRaises(TypeError, context.add_extra_chain_cert)
self.assertRaises(TypeError, context.add_extra_chain_cert, object())
self.assertRaises(TypeError, context.add_extra_chain_cert, object(), object())
def test_add_extra_chain_cert(self):
"""
L{Context.add_extra_chain_cert} accepts an L{X509} instance to add to
the certificate chain.
"""
context = Context(TLSv1_METHOD)
context.add_extra_chain_cert(load_certificate(FILETYPE_PEM, cleartextCertificatePEM))
# XXX Oh no, actually asserting something about its behavior would be really hard.
# See #477521.
def test_type(self):
"""
L{Connection} and L{ConnectionType} refer to the same type object and
can be used to create instances of that type.
"""
self.assertIdentical(Connection, ConnectionType)
ctx = Context(TLSv1_METHOD)
self.assertConsistentType(Connection, 'Connection', ctx, None)
def test_set_client_ca_list_errors(self):
"""
L{Context.set_client_ca_list} raises a L{TypeError} if called with a
non-list or a list that contains objects other than X509Names.
"""
ctx = Context(TLSv1_METHOD)
self.assertRaises(TypeError, ctx.set_client_ca_list, "spam")
self.assertRaises(TypeError, ctx.set_client_ca_list, ["spam"])
self.assertIdentical(ctx.set_client_ca_list([]), None)
def test_add_client_ca_errors(self):
"""
L{Context.add_client_ca} raises L{TypeError} if called with a non-X509
object or with a number of arguments other than one.
"""
ctx = Context(TLSv1_METHOD)
cacert = load_certificate(FILETYPE_PEM, root_cert_pem)
self.assertRaises(TypeError, ctx.add_client_ca)
self.assertRaises(TypeError, ctx.add_client_ca, "spam")
self.assertRaises(TypeError, ctx.add_client_ca, cacert, cacert)
def __init__(self, *args, **kw):
kw['sslmethod'] = SSL.TLSv1_METHOD
ssl.DefaultOpenSSLContextFactory.__init__(self, *args, **kw)
def __init__(self, privateKey=None, certificate=None, method=None, verify=False, caCerts=None,
enableSessions=True):
"""
Create an OpenSSL context SSL connection context factory.
@param privateKey: A PKey object holding the private key.
@param certificate: An X509 object holding the certificate.
@param method: The SSL protocol to use, one of SSLv23_METHOD,
SSLv2_METHOD, SSLv3_METHOD, TLSv1_METHOD. Defaults to TLSv1_METHOD.
@param verify: If True, verify certificates received from the peer and
fail the handshake if verification fails. Otherwise, allow anonymous
sessions and sessions with certificates which fail validation. By
default this is False.
@param caCerts: List of certificate authority certificates to
send to the client when requesting a certificate. Only used if verify
is True, and if verify is True, either this must be specified or
caCertsFile must be given. Since verify is False by default,
this is None by default.
@param enableSessions: If True, set a session ID on each context. This
allows a shortened handshake to be used when a known client reconnects.
"""
assert (privateKey is None) == (certificate is None), "Specify neither or both of privateKey and certificate"
self.privateKey = privateKey
self.certificate = certificate
if method is not None:
self.method = method
self.verify = verify
assert ((verify and caCerts) or
(not verify)), "Specify client CA certificate information if and only if enabling certificate verification"
self.caCerts = caCerts
self.enableSessions = enableSessions
def __init__(self, privateKey=None, certificate=None, method=None, verify=False, caCerts=None,
enableSessions=True):
"""
Create an OpenSSL context SSL connection context factory.
@param privateKey: A PKey object holding the private key.
@param certificate: An X509 object holding the certificate.
@param method: The SSL protocol to use, one of SSLv23_METHOD,
SSLv2_METHOD, SSLv3_METHOD, TLSv1_METHOD. Defaults to TLSv1_METHOD.
@param verify: If True, verify certificates received from the peer and
fail the handshake if verification fails. Otherwise, allow anonymous
sessions and sessions with certificates which fail validation. By
default this is False.
@param caCerts: List of certificate authority certificates to
send to the client when requesting a certificate. Only used if verify
is True, and if verify is True, either this must be specified or
caCertsFile must be given. Since verify is False by default,
this is None by default.
@param enableSessions: If True, set a session ID on each context. This
allows a shortened handshake to be used when a known client reconnects.
"""
assert (privateKey is None) == (certificate is None), "Specify neither or both of privateKey and certificate"
self.privateKey = privateKey
self.certificate = certificate
if method is not None:
self.method = method
self.verify = verify
assert ((verify and caCerts) or
(not verify)), "Specify client CA certificate information if and only if enabling certificate verification"
self.caCerts = caCerts
self.enableSessions = enableSessions