def create_ssl_context(proto=ssl.PROTOCOL_SSLv23,
verify_mode=ssl.CERT_NONE,
protocols=None,
options=None,
ciphers="ALL"):
protocols = protocols or ('PROTOCOL_SSLv3','PROTOCOL_TLSv1',
'PROTOCOL_TLSv1_1','PROTOCOL_TLSv1_2')
options = options or ('OP_CIPHER_SERVER_PREFERENCE','OP_SINGLE_DH_USE',
'OP_SINGLE_ECDH_USE','OP_NO_COMPRESSION')
context = ssl.SSLContext(proto)
context.verify_mode = verify_mode
# reset protocol, options
context.protocol = 0
context.options = 0
for p in protocols:
context.protocol |= getattr(ssl, p, 0)
for o in options:
context.options |= getattr(ssl, o, 0)
context.set_ciphers(ciphers)
return context
python类SSLContext()的实例源码
def _getConnection(self):
if self.__connection is not None:
return self.__connection
url = self.__url
if url.scheme == 'http':
connection = http.client.HTTPConnection(url.hostname, url.port)
elif url.scheme == 'https':
ctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
connection = http.client.HTTPSConnection(url.hostname, url.port,
context=ctx)
else:
raise BuildError("Unsupported URL scheme: '{}'".format(url.schema))
self.__connection = connection
return connection
def _load_wincerts():
"""Set _WINCERTS to an instance of wincertstore.Certfile."""
global _WINCERTS
certfile = CertFile()
certfile.addstore("CA")
certfile.addstore("ROOT")
atexit.register(certfile.close)
_WINCERTS = certfile
# XXX: Possible future work.
# - Support CRL files? Only supported by CPython >= 2.7.9 and >= 3.4
# http://bugs.python.org/issue8813
# - OCSP? Not supported by python at all.
# http://bugs.python.org/issue17123
# - Setting OP_NO_COMPRESSION? The server doesn't yet.
# - Adding an ssl_context keyword argument to MongoClient? This might
# be useful for sites that have unusual requirements rather than
# trying to expose every SSLContext option through a keyword/uri
# parameter.
def test_context(self):
self.client.quit()
ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
self.assertRaises(ValueError, ftplib.FTP_TLS, keyfile=CERTFILE,
context=ctx)
self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
context=ctx)
self.assertRaises(ValueError, ftplib.FTP_TLS, certfile=CERTFILE,
keyfile=CERTFILE, context=ctx)
self.client = ftplib.FTP_TLS(context=ctx, timeout=10)
self.client.connect(self.server.host, self.server.port)
self.assertNotIsInstance(self.client.sock, ssl.SSLSocket)
self.client.auth()
self.assertIs(self.client.sock.context, ctx)
self.assertIsInstance(self.client.sock, ssl.SSLSocket)
self.client.prot_p()
with self.client.transfercmd('list') as sock:
self.assertIs(sock.context, ctx)
self.assertIsInstance(sock, ssl.SSLSocket)
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx
def upgradetotls(self):
"""
upgrade to a tls wrapped connection
:return: None
"""
# TODO: newer TLS version?
# noinspection PyUnresolvedReferences
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
# TODO: PLATFORM STAGECERTIFICATEFILE is not the correct name for this value, move to handler or set a different
# variable in TRANSPORT with the same initial value?
certkeyfile = sanatizefilename(self.handler.platform.options['STAGECERTIFICATEFILE']['Value'])
context.load_cert_chain(certfile=certkeyfile, keyfile=certkeyfile)
self.conn = context.wrap_bio(self.recvdataqueue.memorybio, self.senddataqueue.memorybio, server_side=True)
print_message("Waiting for connection and TLS handshake...")
while True:
try:
self.conn.do_handshake()
break
except (ssl.SSLWantReadError, ssl.SSLSyscallError):
pass
print_message("Upgrade to TLS done")
def upgradetotls(self):
"""
upgrade to a tls wrapped connection
:return: None
"""
print_debug(DEBUG_MODULE, "upgrading to TLS context")
# TODO: newer TLS version?
# noinspection PyUnresolvedReferences
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
# TODO: PLATFORM STAGECERTIFICATEFILE is not the correct name for this value, move to handler or set a different
# variable in TRANSPORT with the same initial value?
certkeyfile = sanatizefilename(self.handler.platform.options['STAGECERTIFICATEFILE']['Value'])
context.load_cert_chain(certfile=certkeyfile, keyfile=certkeyfile)
self.conn = context.wrap_socket(self.conn, server_side=True)
print_message("Upgrade to TLS done")
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx
def __init__(self, *args, **kwargs):
"""The ``ssl_options`` keyword argument may either be an
`ssl.SSLContext` object or a dictionary of keywords arguments
for `ssl.wrap_socket`
"""
self._ssl_options = kwargs.pop('ssl_options', _client_ssl_defaults)
super(SSLIOStream, self).__init__(*args, **kwargs)
self._ssl_accepting = True
self._handshake_reading = False
self._handshake_writing = False
self._ssl_connect_callback = None
self._server_hostname = None
# If the socket is already connected, attempt to start the handshake.
try:
self.socket.getpeername()
except socket.error:
pass
else:
# Indirectly start the handshake, which will run on the next
# IOLoop iteration and then the real IO state will be set in
# _handle_events.
self._add_io_state(self.io_loop.WRITE)
def __init__(self, *args, **kwargs):
"""The ``ssl_options`` keyword argument may either be an
`ssl.SSLContext` object or a dictionary of keywords arguments
for `ssl.wrap_socket`
"""
self._ssl_options = kwargs.pop('ssl_options', _client_ssl_defaults)
super(SSLIOStream, self).__init__(*args, **kwargs)
self._ssl_accepting = True
self._handshake_reading = False
self._handshake_writing = False
self._ssl_connect_callback = None
self._server_hostname = None
# If the socket is already connected, attempt to start the handshake.
try:
self.socket.getpeername()
except socket.error:
pass
else:
# Indirectly start the handshake, which will run on the next
# IOLoop iteration and then the real IO state will be set in
# _handle_events.
self._add_io_state(self.io_loop.WRITE)
def __init__(self, *args, **kwargs):
"""The ``ssl_options`` keyword argument may either be an
`ssl.SSLContext` object or a dictionary of keywords arguments
for `ssl.wrap_socket`
"""
self._ssl_options = kwargs.pop('ssl_options', _client_ssl_defaults)
super(SSLIOStream, self).__init__(*args, **kwargs)
self._ssl_accepting = True
self._handshake_reading = False
self._handshake_writing = False
self._ssl_connect_callback = None
self._server_hostname = None
# If the socket is already connected, attempt to start the handshake.
try:
self.socket.getpeername()
except socket.error:
pass
else:
# Indirectly start the handshake, which will run on the next
# IOLoop iteration and then the real IO state will be set in
# _handle_events.
self._add_io_state(self.io_loop.WRITE)
def __init__(self, ip, port, repository: Repository):
self._ip = ip
self._port = port
self._loop = asyncio.get_event_loop()
self._client_protocols = {}
self._service_protocols = {}
self._repository = repository
self._tcp_pingers = {}
self._http_pingers = {}
self.logger = logging.getLogger()
try:
config = json_file_to_dict('./config.json')
self._ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
self._ssl_context.load_cert_chain(config['SSL_CERTIFICATE'], config['SSL_KEY'])
except:
self._ssl_context = None
def _do_ssl_handshake(self):
incoming = ssl.MemoryBIO()
outgoing = ssl.MemoryBIO()
sslobj = ssl.SSLContext().wrap_bio(incoming, outgoing, False)
# do_handshake()
while True:
try:
sslobj.do_handshake()
except ssl.SSLWantReadError:
self._send_message(TDS_PRELOGIN, outgoing.read())
tag, _, _, buf = self._read_response_packet()
assert tag == TDS_PRELOGIN
incoming.write(buf)
else:
break
return sslobj, incoming, outgoing
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx
def testHttpsContext(self):
client = httplib2.Http(ca_certs=self.ca_certs_path)
# Establish connection to local server
client.request('https://localhost:%d/' % (self.port))
# Verify that connection uses a TLS context with the correct hostname
conn = client.connections['https:localhost:%d' % self.port]
self.assertIsInstance(conn.sock, ssl.SSLSocket)
self.assertTrue(hasattr(conn.sock, 'context'))
self.assertIsInstance(conn.sock.context, ssl.SSLContext)
self.assertTrue(conn.sock.context.check_hostname)
self.assertEqual(conn.sock.server_hostname, 'localhost')
self.assertEqual(conn.sock.context.verify_mode, ssl.CERT_REQUIRED)
self.assertEqual(conn.sock.context.protocol, ssl.PROTOCOL_SSLv23)
def test_ssl_hostname_mismatch_repeat(self):
# https://github.com/httplib2/httplib2/issues/5
# FIXME(temoto): as of 2017-01-05 this is only a reference code, not useful test.
# Because it doesn't provoke described error on my machine.
# Instead `SSLContext.wrap_socket` raises `ssl.CertificateError`
# which was also added to original patch.
# url host is intentionally different, we provoke ssl hostname mismatch error
url = 'https://127.0.0.1:%d/' % (self.port,)
http = httplib2.Http(ca_certs=self.ca_certs_path, proxy_info=None)
def once():
try:
http.request(url)
assert False, 'expected certificate hostname mismatch error'
except Exception as e:
print('%s errno=%s' % (repr(e), getattr(e, 'errno', None)))
once()
once()
def __init__(self, host, port=None, key_file=None, cert_file=None,
timeout=None, proxy_info=None,
ca_certs=None, disable_ssl_certificate_validation=False):
# TODO: implement proxy_info
self.proxy_info = proxy_info
context = None
if ca_certs is None:
ca_certs = CA_CERTS
if (cert_file or ca_certs):
if not hasattr(ssl, 'SSLContext'):
raise CertificateValidationUnsupportedInPython31()
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
if disable_ssl_certificate_validation:
context.verify_mode = ssl.CERT_NONE
else:
context.verify_mode = ssl.CERT_REQUIRED
if cert_file:
context.load_cert_chain(cert_file, key_file)
if ca_certs:
context.load_verify_locations(ca_certs)
http.client.HTTPSConnection.__init__(
self, host, port=port, key_file=key_file,
cert_file=cert_file, timeout=timeout, context=context,
check_hostname=disable_ssl_certificate_validation ^ True)
def _create_transport_context(server_side, server_hostname):
if server_side:
raise ValueError('Server side SSL needs a valid SSLContext')
# Client side may pass ssl=True to use a default
# context; in that case the sslcontext passed is None.
# The default is secure for client connections.
if hasattr(ssl, 'create_default_context'):
# Python 3.4+: use up-to-date strong settings.
sslcontext = ssl.create_default_context()
if not server_hostname:
sslcontext.check_hostname = False
else:
# Fallback for Python 3.3.
sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext.options |= ssl.OP_NO_SSLv2
sslcontext.options |= ssl.OP_NO_SSLv3
sslcontext.set_default_verify_paths()
sslcontext.verify_mode = ssl.CERT_REQUIRED
return sslcontext
def __init__(self, context, server_side, server_hostname=None):
"""
The *context* argument specifies the ssl.SSLContext to use.
The *server_side* argument indicates whether this is a server side or
client side transport.
The optional *server_hostname* argument can be used to specify the
hostname you are connecting to. You may only specify this parameter if
the _ssl module supports Server Name Indication (SNI).
"""
self._context = context
self._server_side = server_side
self._server_hostname = server_hostname
self._state = _UNWRAPPED
self._incoming = ssl.MemoryBIO()
self._outgoing = ssl.MemoryBIO()
self._sslobj = None
self._need_ssldata = False
self._handshake_cb = None
self._shutdown_cb = None
def start(loop, host, port):
global server
sslctx = None
if args.tls:
import ssl
# TODO: take cert/key from args as well.
here = os.path.join(os.path.dirname(__file__), '..', 'tests')
sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslctx.options |= ssl.OP_NO_SSLv2
sslctx.load_cert_chain(
certfile=os.path.join(here, 'ssl_cert.pem'),
keyfile=os.path.join(here, 'ssl_key.pem'))
server = yield from loop.create_server(Service, host, port, ssl=sslctx)
dprint('serving TLS' if sslctx else 'serving',
[s.getsockname() for s in server.sockets])
yield from server.wait_closed()
def test_create_unix_server_ssl_verified(self):
proto = MyProto(loop=self.loop)
server, path = self._make_ssl_unix_server(
lambda: proto, SIGNED_CERTFILE)
sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext_client.options |= ssl.OP_NO_SSLv2
sslcontext_client.verify_mode = ssl.CERT_REQUIRED
sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
if hasattr(sslcontext_client, 'check_hostname'):
sslcontext_client.check_hostname = True
# Connection succeeds with correct CA and server hostname.
f_c = self.loop.create_unix_connection(MyProto, path,
ssl=sslcontext_client,
server_hostname='localhost')
client, pr = self.loop.run_until_complete(f_c)
# close connection
proto.transport.close()
client.close()
server.close()
self.loop.run_until_complete(proto.done)
def test_create_server_ssl_verified(self):
proto = MyProto(loop=self.loop)
server, host, port = self._make_ssl_server(
lambda: proto, SIGNED_CERTFILE)
sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext_client.options |= ssl.OP_NO_SSLv2
sslcontext_client.verify_mode = ssl.CERT_REQUIRED
sslcontext_client.load_verify_locations(cafile=SIGNING_CA)
if hasattr(sslcontext_client, 'check_hostname'):
sslcontext_client.check_hostname = True
# Connection succeeds with correct CA and server hostname.
f_c = self.loop.create_connection(MyProto, host, port,
ssl=sslcontext_client,
server_hostname='localhost')
client, pr = self.loop.run_until_complete(f_c)
# close connection
proto.transport.close()
client.close()
server.close()
self.loop.run_until_complete(proto.done)
def make_HTTPS_handler(params, **kwargs):
opts_no_check_certificate = params.get('nocheckcertificate', False)
if hasattr(ssl, 'create_default_context'): # Python >= 3.4 or 2.7.9
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
if opts_no_check_certificate:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
try:
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
except TypeError:
# Python 2.7.8
# (create_default_context present but HTTPSHandler has no context=)
pass
if sys.version_info < (3, 2):
return YoutubeDLHTTPSHandler(params, **kwargs)
else: # Python < 3.4
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = (ssl.CERT_NONE
if opts_no_check_certificate
else ssl.CERT_REQUIRED)
context.set_default_verify_paths()
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx
def ssl_wrap_socket(cls, s, ssl_options, server_hostname=None, **kwargs):
"""Returns an ``ssl.SSLSocket`` wrapping the given socket.
``ssl_options`` may be either a dictionary (as accepted by
`ssl_options_to_context`) or an `ssl.SSLContext` object.
Additional keyword arguments are passed to ``wrap_socket``
(either the `~ssl.SSLContext` method or the `ssl` module function
as appropriate).
"""
context = ssl_options_to_context(ssl_options)
if hasattr(ssl, 'SSLContext') and isinstance(context, ssl.SSLContext):
if server_hostname is not None and getattr(ssl, 'HAS_SNI'):
# Python doesn't have server-side SNI support so we can't
# really unittest this, but it can be manually tested with
# python3.2 -m tornado.httpclient https://sni.velox.ch
return context.wrap_socket(s, server_hostname=server_hostname,
**kwargs)
else:
return context.wrap_socket(s, **kwargs)
else:
return ssl.wrap_socket(s, **dict(context, **kwargs))
def connect_to_vc(vchost, user, pwd):
# Disabling SSL certificate verification
if hasattr(ssl, 'SSLContext'):
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.verify_mode = ssl.CERT_NONE
else:
context = None
if vchost.find(':') != -1:
host, port = vchost.split(':')
else:
host = vchost
port = 443
if context:
service_instance = SmartConnect(host=host, port=port, user=user, pwd=pwd, sslContext=context)
else:
service_instance = SmartConnect(host=host, port=port, user=user, pwd=pwd)
return service_instance.RetrieveContent()
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx