def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
python类PROTOCOL_SSLv23()的实例源码
def resolve_ssl_version(candidate):
"""
like resolve_cert_reqs
"""
if candidate is None:
return PROTOCOL_SSLv23
if isinstance(candidate, str):
res = getattr(ssl, candidate, None)
if res is None:
res = getattr(ssl, 'PROTOCOL_' + candidate)
return res
return candidate
def resolve_ssl_version(candidate):
"""
like resolve_cert_reqs
"""
if candidate is None:
return PROTOCOL_SSLv23
if isinstance(candidate, str):
res = getattr(ssl, candidate, None)
if res is None:
res = getattr(ssl, 'PROTOCOL_' + candidate)
return res
return candidate
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def resolve_ssl_version(candidate):
"""
like resolve_cert_reqs
"""
if candidate is None:
return PROTOCOL_SSLv23
if isinstance(candidate, str):
res = getattr(ssl, candidate, None)
if res is None:
res = getattr(ssl, 'PROTOCOL_' + candidate)
return res
return candidate
def ssl_wrap_socket(self):
# Allow sending of keep-alive messages - seems to prevent some servers
# from closing SSL, leading to deadlocks.
self.sock.setsockopt(socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1)
try:
import ssl
if self.ca_certs is not None:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
if self.ssl_version == "tls1":
ssl_version = ssl.PROTOCOL_TLSv1
elif self.ssl_version == "ssl2":
ssl_version = ssl.PROTOCOL_SSLv2
elif self.ssl_version == "ssl3":
ssl_version = ssl.PROTOCOL_SSLv3
elif self.ssl_version == "ssl23" or self.ssl_version is None:
ssl_version = ssl.PROTOCOL_SSLv23
else:
raise socket.sslerror("Invalid SSL version requested: %s", self.ssl_version)
self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile, ca_certs=self.ca_certs, cert_reqs=cert_reqs, ssl_version=ssl_version)
ssl_exc = ssl.SSLError
self.read_fd = self.sock.fileno()
except ImportError:
# No ssl module, and socket.ssl has no fileno(), and does not allow certificate verification
raise socket.sslerror("imaplib2 SSL mode does not work without ssl module")
if self.cert_verify_cb is not None:
cert_err = self.cert_verify_cb(self.sock.getpeercert(), self.host)
if cert_err:
raise ssl_exc(cert_err)
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def resolve_ssl_version(candidate):
"""
like resolve_cert_reqs
"""
if candidate is None:
return PROTOCOL_SSLv23
if isinstance(candidate, str):
res = getattr(ssl, candidate, None)
if res is None:
res = getattr(ssl, 'PROTOCOL_' + candidate)
return res
return candidate
def ssl_options_to_context(ssl_options):
"""Try to convert an ``ssl_options`` dictionary to an
`~ssl.SSLContext` object.
The ``ssl_options`` dictionary contains keywords to be passed to
`ssl.wrap_socket`. In Python 2.7.9+, `ssl.SSLContext` objects can
be used instead. This function converts the dict form to its
`~ssl.SSLContext` equivalent, and may be used when a component which
accepts both forms needs to upgrade to the `~ssl.SSLContext` version
to use features like SNI or NPN.
"""
if isinstance(ssl_options, dict):
assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options
if (not hasattr(ssl, 'SSLContext') or
isinstance(ssl_options, ssl.SSLContext)):
return ssl_options
context = ssl.SSLContext(
ssl_options.get('ssl_version', ssl.PROTOCOL_SSLv23))
if 'certfile' in ssl_options:
context.load_cert_chain(ssl_options['certfile'], ssl_options.get('keyfile', None))
if 'cert_reqs' in ssl_options:
context.verify_mode = ssl_options['cert_reqs']
if 'ca_certs' in ssl_options:
context.load_verify_locations(ssl_options['ca_certs'])
if 'ciphers' in ssl_options:
context.set_ciphers(ssl_options['ciphers'])
if hasattr(ssl, 'OP_NO_COMPRESSION'):
# Disable TLS compression to avoid CRIME and related attacks.
# This constant wasn't added until python 3.3.
context.options |= ssl.OP_NO_COMPRESSION
return context
def resolve_ssl_version(candidate):
"""
like resolve_cert_reqs
"""
if candidate is None:
return PROTOCOL_SSLv23
if isinstance(candidate, str):
res = getattr(ssl, candidate, None)
if res is None:
res = getattr(ssl, 'PROTOCOL_' + candidate)
return res
return candidate
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def resolve_ssl_version(candidate):
"""
like resolve_cert_reqs
"""
if candidate is None:
return PROTOCOL_SSLv23
if isinstance(candidate, str):
res = getattr(ssl, candidate, None)
if res is None:
res = getattr(ssl, 'PROTOCOL_' + candidate)
return res
return candidate
def ssl_options_to_context(ssl_options):
"""Try to convert an ``ssl_options`` dictionary to an
`~ssl.SSLContext` object.
The ``ssl_options`` dictionary contains keywords to be passed to
`ssl.wrap_socket`. In Python 2.7.9+, `ssl.SSLContext` objects can
be used instead. This function converts the dict form to its
`~ssl.SSLContext` equivalent, and may be used when a component which
accepts both forms needs to upgrade to the `~ssl.SSLContext` version
to use features like SNI or NPN.
"""
if isinstance(ssl_options, dict):
assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options
if (not hasattr(ssl, 'SSLContext') or
isinstance(ssl_options, ssl.SSLContext)):
return ssl_options
context = ssl.SSLContext(
ssl_options.get('ssl_version', ssl.PROTOCOL_SSLv23))
if 'certfile' in ssl_options:
context.load_cert_chain(ssl_options['certfile'], ssl_options.get('keyfile', None))
if 'cert_reqs' in ssl_options:
context.verify_mode = ssl_options['cert_reqs']
if 'ca_certs' in ssl_options:
context.load_verify_locations(ssl_options['ca_certs'])
if 'ciphers' in ssl_options:
context.set_ciphers(ssl_options['ciphers'])
if hasattr(ssl, 'OP_NO_COMPRESSION'):
# Disable TLS compression to avoid CRIME and related attacks.
# This constant wasn't added until python 3.3.
context.options |= ssl.OP_NO_COMPRESSION
return context
def resolve_ssl_version(candidate):
"""
like resolve_cert_reqs
"""
if candidate is None:
return PROTOCOL_SSLv23
if isinstance(candidate, str):
res = getattr(ssl, candidate, None)
if res is None:
res = getattr(ssl, 'PROTOCOL_' + candidate)
return res
return candidate
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def resolve_ssl_version(candidate):
"""
like resolve_cert_reqs
"""
if candidate is None:
return PROTOCOL_SSLv23
if isinstance(candidate, str):
res = getattr(ssl, candidate, None)
if res is None:
res = getattr(ssl, 'PROTOCOL_' + candidate)
return res
return candidate
def ssl_options_to_context(ssl_options):
"""Try to convert an ``ssl_options`` dictionary to an
`~ssl.SSLContext` object.
The ``ssl_options`` dictionary contains keywords to be passed to
`ssl.wrap_socket`. In Python 2.7.9+, `ssl.SSLContext` objects can
be used instead. This function converts the dict form to its
`~ssl.SSLContext` equivalent, and may be used when a component which
accepts both forms needs to upgrade to the `~ssl.SSLContext` version
to use features like SNI or NPN.
"""
if isinstance(ssl_options, dict):
assert all(k in _SSL_CONTEXT_KEYWORDS for k in ssl_options), ssl_options
if (not hasattr(ssl, 'SSLContext') or
isinstance(ssl_options, ssl.SSLContext)):
return ssl_options
context = ssl.SSLContext(
ssl_options.get('ssl_version', ssl.PROTOCOL_SSLv23))
if 'certfile' in ssl_options:
context.load_cert_chain(ssl_options['certfile'], ssl_options.get('keyfile', None))
if 'cert_reqs' in ssl_options:
context.verify_mode = ssl_options['cert_reqs']
if 'ca_certs' in ssl_options:
context.load_verify_locations(ssl_options['ca_certs'])
if 'ciphers' in ssl_options:
context.set_ciphers(ssl_options['ciphers'])
if hasattr(ssl, 'OP_NO_COMPRESSION'):
# Disable TLS compression to avoid CRIME and related attacks.
# This constant wasn't added until python 3.3.
context.options |= ssl.OP_NO_COMPRESSION
return context
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def resolve_ssl_version(candidate):
"""
like resolve_cert_reqs
"""
if candidate is None:
return PROTOCOL_SSLv23
if isinstance(candidate, str):
res = getattr(ssl, candidate, None)
if res is None:
res = getattr(ssl, 'PROTOCOL_' + candidate)
return res
return candidate
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise