def _configure_ssl(self, properties):
if (not properties or
not self._SSL_PROPS.intersection(set(iter(properties)))):
return None
mode = proton.SSLDomain.MODE_CLIENT
if properties.get('x-ssl-server', properties.get('x-server')):
mode = proton.SSLDomain.MODE_SERVER
identity = properties.get('x-ssl-identity')
ca_file = properties.get('x-ssl-ca-file')
if (not ca_file and properties.get('x-ssl') and
hasattr(ssl, 'get_default_verify_paths')):
ca_file = ssl.get_default_verify_paths().cafile
hostname = properties.get('x-ssl-peer-name',
properties.get('hostname'))
# default to most secure level of certificate validation
if not ca_file:
vdefault = 'no-verify'
elif not hostname:
vdefault = 'verify-cert'
else:
vdefault = 'verify-peer'
vmode = properties.get('x-ssl-verify-mode', vdefault)
try:
vmode = self._VERIFY_MODES[vmode]
except KeyError:
raise proton.SSLException("bad value for x-ssl-verify-mode: '%s'" %
vmode)
if vmode == proton.SSLDomain.VERIFY_PEER_NAME:
if not hostname or not ca_file:
raise proton.SSLException("verify-peer needs x-ssl-peer-name"
" and x-ssl-ca-file")
elif vmode == proton.SSLDomain.VERIFY_PEER:
if not ca_file:
raise proton.SSLException("verify-cert needs x-ssl-ca-file")
# This will throw proton.SSLUnavailable if SSL support is not installed
domain = proton.SSLDomain(mode)
if identity:
# our identity:
domain.set_credentials(identity[0], identity[1], identity[2])
if ca_file:
# how we verify peers:
domain.set_trusted_ca_db(ca_file)
domain.set_peer_authentication(vmode, ca_file)
if mode == proton.SSLDomain.MODE_SERVER:
if properties.get('x-ssl-allow-cleartext'):
domain.allow_unsecured_client()
pn_ssl = proton.SSL(self._pn_transport, domain)
if hostname:
pn_ssl.peer_hostname = hostname
LOG.debug("SSL configured for connection %s", self._name)
return pn_ssl
评论列表
文章目录