def _create_ssl_ctx(self, sslp):
if isinstance(sslp, ssl.SSLContext):
return sslp
ca = sslp.get('ca')
capath = sslp.get('capath')
hasnoca = ca is None and capath is None
ctx = ssl.create_default_context(cafile=ca, capath=capath)
ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
if 'cert' in sslp:
ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
if 'cipher' in sslp:
ctx.set_ciphers(sslp['cipher'])
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
return ctx
python类SSLContext()的实例源码
def _create_ssl_ctx(self, sslp):
if isinstance(sslp, ssl.SSLContext):
return sslp
ca = sslp.get('ca')
capath = sslp.get('capath')
hasnoca = ca is None and capath is None
ctx = ssl.create_default_context(cafile=ca, capath=capath)
ctx.check_hostname = not hasnoca and sslp.get('check_hostname', True)
ctx.verify_mode = ssl.CERT_NONE if hasnoca else ssl.CERT_REQUIRED
if 'cert' in sslp:
ctx.load_cert_chain(sslp['cert'], keyfile=sslp.get('key'))
if 'cipher' in sslp:
ctx.set_ciphers(sslp['cipher'])
ctx.options |= ssl.OP_NO_SSLv2
ctx.options |= ssl.OP_NO_SSLv3
return ctx
def get_ssl_context(self, protocol=None):
"""
This method returns a SSL context based on the file that was specified
when this object was created.
Arguments:
- An optional protocol. SSLv2 is used by default.
Returns:
- The SSL context
"""
# Validate the arguments
if protocol is None:
protocol = ssl.PROTOCOL_SSLv2
# Create an SSL context from the stored file and password.
ssl_context = ssl.SSLContext(protocol)
ssl_context.load_cert_chain(self._cert_filename,
password=self._password)
# Return the context
return ssl_context
def __init__(self, target):
# Target comes as protocol://target:port/path
self.target = target
proto, host, path = target.split(':')
host = host[2:]
self.path = '/' + path.split('/', 1)[1]
if proto.lower() == 'https':
#Create unverified (insecure) context
try:
uv_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
self.session = HTTPSConnection(host,context=uv_context)
except AttributeError:
#This does not exist on python < 2.7.11
self.session = HTTPSConnection(host)
else:
self.session = HTTPConnection(host)
self.lastresult = None
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx
def make_HTTPS_handler(params, **kwargs):
opts_no_check_certificate = params.get('nocheckcertificate', False)
if hasattr(ssl, 'create_default_context'): # Python >= 3.4 or 2.7.9
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
if opts_no_check_certificate:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
try:
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
except TypeError:
# Python 2.7.8
# (create_default_context present but HTTPSHandler has no context=)
pass
if sys.version_info < (3, 2):
return YoutubeDLHTTPSHandler(params, **kwargs)
else: # Python < 3.4
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = (ssl.CERT_NONE
if opts_no_check_certificate
else ssl.CERT_REQUIRED)
context.set_default_verify_paths()
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx
def __init__(self, host, port=None, key_file=None, cert_file=None,
timeout=None, proxy_info=None,
ca_certs=None, disable_ssl_certificate_validation=False):
self.proxy_info = proxy_info
context = None
if ca_certs is None:
ca_certs = CA_CERTS
if (cert_file or ca_certs) and not disable_ssl_certificate_validation:
if not hasattr(ssl, 'SSLContext'):
raise CertificateValidationUnsupportedInPython31()
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = ssl.CERT_REQUIRED
if cert_file:
context.load_cert_chain(cert_file, key_file)
if ca_certs:
context.load_verify_locations(ca_certs)
http.client.HTTPSConnection.__init__(
self, host, port=port, key_file=key_file,
cert_file=cert_file, timeout=timeout, context=context,
check_hostname=True)
def start_connection(self) -> None:
if self.settings.ftps['no_certificate_check']:
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.verify_mode = ssl.CERT_NONE
context.check_hostname = False
else:
context = ssl.create_default_context()
self.ftps = FTP_TLS(
host=self.settings.ftps['address'],
user=self.settings.ftps['user'],
passwd=self.settings.ftps['passwd'],
context=context,
source_address=self.settings.ftps['source_address'],
timeout=self.settings.timeout_timer
)
# Hath downloads
self.ftps.prot_p()
def serve(self):
self.initialize()
self.challenge_thread = ChallengeThread.ChallengeThread(self)
self.challenge_thread.start()
if self.ssl_on:
ssl_context = ssl.SSLContext(protocol=ssl.PROTOCOL_SSLv23)
ssl_context.load_cert_chain(self.ssl_cert)
self.server_socket = websockets.serve(
self.handle_connection, self.listen_address, self.port, ssl=ssl_context)
else:
self.server_socket = websockets.serve(
self.handle_connection, self.listen_address, self.port)
try:
asyncio.get_event_loop().run_until_complete(self.server_socket)
asyncio.get_event_loop().run_forever()
except KeyboardInterrupt:
print("Closing the server")
asyncio.get_event_loop().close()
def __init__(self, address, username='root', password='ca$hc0w', port=443, sslContext=None):
self.address = address
self.username = username
self.password = password
self.port = port
if not sslContext:
self.sslContext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
self.sslContext.verify_mode = ssl.CERT_NONE
else:
self.sslContext = sslContext
self._apiType = ''
self._vms = dict()
self._datacenters = dict()
self._hosts = dict()
self._datastores = dict()
def __init__(self, host, port=None, key_file=None, cert_file=None,
strict=_strict_sentinel, timeout=socket._GLOBAL_DEFAULT_TIMEOUT,
source_address=None, **_3to2kwargs):
if 'check_hostname' in _3to2kwargs: check_hostname = _3to2kwargs['check_hostname']; del _3to2kwargs['check_hostname']
else: check_hostname = None
if 'context' in _3to2kwargs: context = _3to2kwargs['context']; del _3to2kwargs['context']
else: context = None
super(HTTPSConnection, self).__init__(host, port, strict, timeout,
source_address)
self.key_file = key_file
self.cert_file = cert_file
if context is None:
# Some reasonable defaults
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
will_verify = context.verify_mode != ssl.CERT_NONE
if check_hostname is None:
check_hostname = will_verify
elif check_hostname and not will_verify:
raise ValueError("check_hostname needs a SSL context with "
"either CERT_OPTIONAL or CERT_REQUIRED")
if key_file or cert_file:
context.load_cert_chain(cert_file, key_file)
self._context = context
self._check_hostname = check_hostname
def make_https_server(case, certfile=CERTFILE, host=HOST, handler_class=None):
# we assume the certfile contains both private key and certificate
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.load_cert_chain(certfile)
server = HTTPSServerThread(context, host, handler_class)
flag = threading.Event()
server.start(flag)
flag.wait()
def cleanup():
if support.verbose:
sys.stdout.write('stopping HTTPS server\n')
server.stop()
if support.verbose:
sys.stdout.write('joining HTTPS thread\n')
server.join()
case.addCleanup(cleanup)
return server
def urlopen(url, data=None, timeout=socket._GLOBAL_DEFAULT_TIMEOUT, **_3to2kwargs):
if 'cadefault' in _3to2kwargs: cadefault = _3to2kwargs['cadefault']; del _3to2kwargs['cadefault']
else: cadefault = False
if 'capath' in _3to2kwargs: capath = _3to2kwargs['capath']; del _3to2kwargs['capath']
else: capath = None
if 'cafile' in _3to2kwargs: cafile = _3to2kwargs['cafile']; del _3to2kwargs['cafile']
else: cafile = None
global _opener
if cafile or capath or cadefault:
if not _have_ssl:
raise ValueError('SSL support not available')
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
context.verify_mode = ssl.CERT_REQUIRED
if cafile or capath:
context.load_verify_locations(cafile, capath)
else:
context.set_default_verify_paths()
https_handler = HTTPSHandler(context=context, check_hostname=True)
opener = build_opener(https_handler)
elif _opener is None:
_opener = opener = build_opener()
else:
opener = _opener
return opener.open(url, data, timeout)
def make_HTTPS_handler(params, **kwargs):
opts_no_check_certificate = params.get('nocheckcertificate', False)
if hasattr(ssl, 'create_default_context'): # Python >= 3.4 or 2.7.9
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
if opts_no_check_certificate:
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
try:
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
except TypeError:
# Python 2.7.8
# (create_default_context present but HTTPSHandler has no context=)
pass
if sys.version_info < (3, 2):
return YoutubeDLHTTPSHandler(params, **kwargs)
else: # Python < 3.4
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = (ssl.CERT_NONE
if opts_no_check_certificate
else ssl.CERT_REQUIRED)
context.set_default_verify_paths()
return YoutubeDLHTTPSHandler(params, context=context, **kwargs)
def setup_client(self):
if self.client is None:
if not HAVE_MQTT:
print("Please install paho-mqtt 'pip install paho-mqtt' "
"to use this library")
return False
self.client = mqtt.Client(
client_id=self.blid, clean_session=self.clean,
protocol=mqtt.MQTTv311)
# Assign event callbacks
self.client.on_message = self.on_message
self.client.on_connect = self.on_connect
self.client.on_publish = self.on_publish
self.client.on_subscribe = self.on_subscribe
self.client.on_disconnect = self.on_disconnect
# Uncomment to enable debug messages
# client.on_log = self.on_log
# set TLS, self.cert_name is required by paho-mqtt, even if the
# certificate is not used...
# but v1.3 changes all this, so have to do the following:
self.log.info("Seting TLS")
try:
self.client.tls_set(
self.cert_name, cert_reqs=ssl.CERT_NONE,
tls_version=ssl.PROTOCOL_TLSv1)
except ValueError: # try V1.3 version
self.log.warn("TLS Setting failed - trying 1.3 version")
self.client._ssl_context = None
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1)
context.verify_mode = ssl.CERT_NONE
context.load_default_certs()
self.client.tls_set_context(context)
# disables peer verification
self.client.tls_insecure_set(True)
self.client.username_pw_set(self.blid, self.password)
return True
return False
def wrap_socket(self, sock):
"""Wrap a socket in an SSL context (see `ssl.wrap_socket`)
:param socket: Plain socket
:type socket: :class:`socket.socket`
"""
if self._wrap_socket is None:
if hasattr(ssl, 'SSLContext'):
ssl_context = ssl.create_default_context(cafile=self.cafile)
ssl_context.check_hostname = False
if self.certfile is not None:
ssl_context.load_cert_chain(certfile=self.certfile,
keyfile=self.keyfile,
password=self.password)
self._wrap_socket = ssl_context.wrap_socket
else:
self._wrap_socket = self._legacy_wrap_socket()
return self._wrap_socket(sock)
def load_ssl_context(cert_file, pkey_file=None, protocol=None):
"""Loads SSL context from cert/private key files and optional protocol.
Many parameters are directly taken from the API of
:py:class:`ssl.SSLContext`.
:param cert_file: Path of the certificate to use.
:param pkey_file: Path of the private key to use. If not given, the key
will be obtained from the certificate file.
:param protocol: One of the ``PROTOCOL_*`` constants in the stdlib ``ssl``
module. Defaults to ``PROTOCOL_SSLv23``.
"""
if protocol is None:
protocol = ssl.PROTOCOL_SSLv23
ctx = _SSLContext(protocol)
ctx.load_cert_chain(cert_file, pkey_file)
return ctx