def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
python类OP_NO_SSLv2()的实例源码
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def main():
asyncio.set_event_loop(None)
if args.iocp:
from asyncio.windows_events import ProactorEventLoop
loop = ProactorEventLoop()
else:
loop = asyncio.new_event_loop()
sslctx = None
if args.tls:
import ssl
# TODO: take cert/key from args as well.
# - ????, ??????
#
# here = os.path.join(os.path.dirname(__file__), '..', 'tests')
here = os.path.join(os.path.dirname(__file__), '..', '..', 'tests')
sslctx = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslctx.options |= ssl.OP_NO_SSLv2
sslctx.load_cert_chain(
certfile=os.path.join(here, 'ssl_cert.pem'),
keyfile=os.path.join(here, 'ssl_key.pem'))
cache = Cache(loop)
task = asyncio.streams.start_server(cache.handle_client,
args.host, args.port,
ssl=sslctx, loop=loop)
svr = loop.run_until_complete(task)
for sock in svr.sockets:
logging.info('socket %s', sock.getsockname())
try:
loop.run_forever()
finally:
loop.close()
def _create_ssl_context(self, certfile, keyfile=None):
sslcontext = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext.options |= ssl.OP_NO_SSLv2
sslcontext.load_cert_chain(certfile, keyfile)
return sslcontext
def test_create_server_ssl_verify_failed(self):
proto = MyProto(loop=self.loop)
server, host, port = self._make_ssl_server(
lambda: proto, SIGNED_CERTFILE)
sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext_client.options |= ssl.OP_NO_SSLv2
sslcontext_client.verify_mode = ssl.CERT_REQUIRED
if hasattr(sslcontext_client, 'check_hostname'):
sslcontext_client.check_hostname = True
# no CA loaded
f_c = self.loop.create_connection(MyProto, host, port,
ssl=sslcontext_client)
with mock.patch.object(self.loop, 'call_exception_handler'):
with test_utils.disable_logger():
with self.assertRaisesRegex(ssl.SSLError,
'certificate verify failed '):
self.loop.run_until_complete(f_c)
# execute the loop to log the connection error
test_utils.run_briefly(self.loop)
# close connection
self.assertIsNone(proto.transport)
server.close()
def test_create_server_ssl_match_failed(self):
proto = MyProto(loop=self.loop)
server, host, port = self._make_ssl_server(
lambda: proto, SIGNED_CERTFILE)
sslcontext_client = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
sslcontext_client.options |= ssl.OP_NO_SSLv2
sslcontext_client.verify_mode = ssl.CERT_REQUIRED
sslcontext_client.load_verify_locations(
cafile=SIGNING_CA)
if hasattr(sslcontext_client, 'check_hostname'):
sslcontext_client.check_hostname = True
# incorrect server_hostname
f_c = self.loop.create_connection(MyProto, host, port,
ssl=sslcontext_client)
with mock.patch.object(self.loop, 'call_exception_handler'):
with test_utils.disable_logger():
with self.assertRaisesRegex(
ssl.CertificateError,
"hostname '127.0.0.1' doesn't match 'localhost'"):
self.loop.run_until_complete(f_c)
# close connection
proto.transport.close()
server.close()
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
"""Overrides HTTPSConnection.connect to specify TLS version"""
# Standard implementation from HTTPSConnection, which is not
# designed for extension, unfortunately
if sys.version_info >= (2, 7):
sock = socket.create_connection((self.host, self.port),
self.timeout, self.source_address)
elif sys.version_info >= (2, 6):
sock = socket.create_connection((self.host, self.port),
self.timeout)
else:
sock = socket.create_connection((self.host, self.port))
if getattr(self, '_tunnel_host', None):
self.sock = sock
self._tunnel()
if hasattr(ssl, 'SSLContext'):
# Since python 2.7.9, tls 1.1 and 1.2 are supported via
# SSLContext
ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
ssl_context.options |= ssl.OP_NO_SSLv2
ssl_context.options |= ssl.OP_NO_SSLv3
if self.key_file and self.cert_file:
ssl_context.load_cert_chain(keyfile=self.key_file,
certfile=self.cert_file)
self.sock = ssl_context.wrap_socket(sock)
else:
# This is the only difference; default wrap_socket uses SSLv23
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
ssl_version=ssl.PROTOCOL_TLSv1)
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else: # pragma: no cover
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError: # pragma: no cover
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise