def connect(self):
sock = socket.create_connection((self.host, self.port), self.timeout)
if getattr(self, '_tunnel_host', False):
self.sock = sock
self._tunnel()
if not hasattr(ssl, 'SSLContext'):
# For 2.x
if self.ca_certs:
cert_reqs = ssl.CERT_REQUIRED
else:
cert_reqs = ssl.CERT_NONE
self.sock = ssl.wrap_socket(sock, self.key_file, self.cert_file,
cert_reqs=cert_reqs,
ssl_version=ssl.PROTOCOL_SSLv23,
ca_certs=self.ca_certs)
else:
context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
context.options |= ssl.OP_NO_SSLv2
if self.cert_file:
context.load_cert_chain(self.cert_file, self.key_file)
kwargs = {}
if self.ca_certs:
context.verify_mode = ssl.CERT_REQUIRED
context.load_verify_locations(cafile=self.ca_certs)
if getattr(ssl, 'HAS_SNI', False):
kwargs['server_hostname'] = self.host
self.sock = context.wrap_socket(sock, **kwargs)
if self.ca_certs and self.check_domain:
try:
match_hostname(self.sock.getpeercert(), self.host)
logger.debug('Host verified: %s', self.host)
except CertificateError:
self.sock.shutdown(socket.SHUT_RDWR)
self.sock.close()
raise
评论列表
文章目录