def loop(self):
"""Main server loop for accepting connections. Better call it on its own thread"""
while True:
try:
(csock, (ipaddr, port)) = self.connection["sock"].accept()
self._log("L", "New connection from %s:%s" % (str(ipaddr),
str(port)))
except sock_error:
raise sock_error
try:
csock = ssl.wrap_socket(csock, server_side=True, certfile="server.crt",
keyfile="server.key",
ssl_version=ssl.PROTOCOL_TLSv1_2)
except AttributeError: # All PROTOCOL consts are merged on TLS in Python2.7.13
csock = ssl.wrap_socket(csock, server_side=True, certfile="server.crt",
keyfile="server.key",
ssl_version=ssl.PROTOCOL_TLS)
self.clients["hosts"][str(self.clients["serial"])] = Host(csock, ipaddr, port,
self.clients["serial"])
self.clients["serial"] += 1
python类PROTOCOL_TLS的实例源码
def ssl_context():
import ssl, sys
if tuple(sys.version_info)[:-2] < (2, 7, 13):
ssl.PROTOCOL_TLS = ssl.PROTOCOL_SSLv23
context = ssl.SSLContext(ssl.PROTOCOL_TLS)
context.load_cert_chain('/etc/ssl/certs/znc.pem')
return context
def _startTLS(self, verify=True, caFile=None, caPath=None, caData=None):
if self.startedTLS:
raise LDAPError('TLS layer already installed')
if verify:
verifyMode = ssl.CERT_REQUIRED
else:
verifyMode = ssl.CERT_NONE
try:
proto = ssl.PROTOCOL_TLS
except AttributeError:
proto = ssl.PROTOCOL_SSLv23
try:
ctx = ssl.SSLContext(proto)
ctx.verify_mode = verifyMode
ctx.check_hostname = False # we do this ourselves
if verify:
ctx.load_default_certs()
if caFile or caPath or caData:
ctx.load_verify_locations(cafile=caFile, capath=caPath, cadata=caData)
self._sock = ctx.wrap_socket(self._sock)
except AttributeError:
# SSLContext wasn't added until 2.7.9
if caPath or caData:
raise RuntimeError('python version >= 2.7.9 required for SSL caPath/caData')
self._sock = ssl.wrap_socket(self._sock, ca_certs=caFile, cert_reqs=verifyMode, ssl_version=proto)
if verify:
# implement a consistent match_hostname according to RFC 4513 sec 3.1.3
cert = self._sock.getpeercert()
certCN = dict([e[0] for e in cert['subject']])['commonName']
if self.host == certCN:
logger.debug('Matched server identity to cert commonName')
else:
valid = False
tried = [certCN]
for type, value in cert.get('subjectAltName', []):
if type == 'DNS' and value.startswith('*.'):
valid = self.host.endswith(value[1:])
else:
valid = (self.host == value)
tried.append(value)
if valid:
logger.debug('Matched server identity to cert {0} subjectAltName'.format(type))
break
if not valid:
raise LDAPConnectionError('Server identity "{0}" does not match any cert names: {1}'.format(self.host, ', '.join(tried)))
else:
logger.debug('Skipping hostname validation')
self.startedTLS = True
logger.debug('Installed TLS layer on #{0}'.format(self.ID))