def _startTLS(self, verify=True, caFile=None, caPath=None, caData=None):
if self.startedTLS:
raise LDAPError('TLS layer already installed')
if verify:
verifyMode = ssl.CERT_REQUIRED
else:
verifyMode = ssl.CERT_NONE
try:
proto = ssl.PROTOCOL_TLS
except AttributeError:
proto = ssl.PROTOCOL_SSLv23
try:
ctx = ssl.SSLContext(proto)
ctx.verify_mode = verifyMode
ctx.check_hostname = False # we do this ourselves
if verify:
ctx.load_default_certs()
if caFile or caPath or caData:
ctx.load_verify_locations(cafile=caFile, capath=caPath, cadata=caData)
self._sock = ctx.wrap_socket(self._sock)
except AttributeError:
# SSLContext wasn't added until 2.7.9
if caPath or caData:
raise RuntimeError('python version >= 2.7.9 required for SSL caPath/caData')
self._sock = ssl.wrap_socket(self._sock, ca_certs=caFile, cert_reqs=verifyMode, ssl_version=proto)
if verify:
# implement a consistent match_hostname according to RFC 4513 sec 3.1.3
cert = self._sock.getpeercert()
certCN = dict([e[0] for e in cert['subject']])['commonName']
if self.host == certCN:
logger.debug('Matched server identity to cert commonName')
else:
valid = False
tried = [certCN]
for type, value in cert.get('subjectAltName', []):
if type == 'DNS' and value.startswith('*.'):
valid = self.host.endswith(value[1:])
else:
valid = (self.host == value)
tried.append(value)
if valid:
logger.debug('Matched server identity to cert {0} subjectAltName'.format(type))
break
if not valid:
raise LDAPConnectionError('Server identity "{0}" does not match any cert names: {1}'.format(self.host, ', '.join(tried)))
else:
logger.debug('Skipping hostname validation')
self.startedTLS = True
logger.debug('Installed TLS layer on #{0}'.format(self.ID))
评论列表
文章目录