def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
ssl_ca_certs=None, **kwargs):
if not ssl_available:
raise RedisError("Python wasn't built with SSL support")
super(SSLConnection, self).__init__(**kwargs)
self.keyfile = ssl_keyfile
self.certfile = ssl_certfile
if ssl_cert_reqs is None:
ssl_cert_reqs = ssl.CERT_NONE
elif isinstance(ssl_cert_reqs, basestring):
CERT_REQS = {
'none': ssl.CERT_NONE,
'optional': ssl.CERT_OPTIONAL,
'required': ssl.CERT_REQUIRED
}
if ssl_cert_reqs not in CERT_REQS:
raise RedisError(
"Invalid SSL Certificate Requirements Flag: %s" %
ssl_cert_reqs)
ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
self.cert_reqs = ssl_cert_reqs
self.ca_certs = ssl_ca_certs
python类CERT_OPTIONAL的实例源码
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
ssl_ca_certs=None, **kwargs):
if not ssl_available:
raise RedisError("Python wasn't built with SSL support")
super(SSLConnection, self).__init__(**kwargs)
self.keyfile = ssl_keyfile
self.certfile = ssl_certfile
if ssl_cert_reqs is None:
ssl_cert_reqs = ssl.CERT_NONE
elif isinstance(ssl_cert_reqs, basestring):
CERT_REQS = {
'none': ssl.CERT_NONE,
'optional': ssl.CERT_OPTIONAL,
'required': ssl.CERT_REQUIRED
}
if ssl_cert_reqs not in CERT_REQS:
raise RedisError(
"Invalid SSL Certificate Requirements Flag: %s" %
ssl_cert_reqs)
ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
self.cert_reqs = ssl_cert_reqs
self.ca_certs = ssl_ca_certs
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
ssl_ca_certs=None, **kwargs):
if not ssl_available:
raise RedisError("Python wasn't built with SSL support")
super(SSLConnection, self).__init__(**kwargs)
self.keyfile = ssl_keyfile
self.certfile = ssl_certfile
if ssl_cert_reqs is None:
ssl_cert_reqs = ssl.CERT_NONE
elif isinstance(ssl_cert_reqs, basestring):
CERT_REQS = {
'none': ssl.CERT_NONE,
'optional': ssl.CERT_OPTIONAL,
'required': ssl.CERT_REQUIRED
}
if ssl_cert_reqs not in CERT_REQS:
raise RedisError(
"Invalid SSL Certificate Requirements Flag: %s" %
ssl_cert_reqs)
ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
self.cert_reqs = ssl_cert_reqs
self.ca_certs = ssl_ca_certs
def _verify_cert(self, peer_cert):
"""Returns True if peercert is valid according to the configured
validation mode and hostname.
The ssl handshake already tested the certificate for a valid
CA signature; the only thing that remains is to check
the hostname.
"""
verify_mode = self.ssl_options.verify_mode
assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
if verify_mode == ssl.CERT_NONE or self.host is None:
return True
if peer_cert is None and verify_mode == ssl.CERT_REQUIRED:
logger.warning("No SSL certificate given")
return False
try:
ssl_match_hostname(peer_cert, self.host)
except SSLCertificateError:
logger.warning("Invalid SSL certificate", exc_info=True)
return False
else:
return True
def wrap_socket(self, sock):
try:
if self.clientcert_req:
ca_certs = self.interface[4]
cert_reqs = ssl.CERT_OPTIONAL
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
ssl_version=ssl.PROTOCOL_SSLv23)
else:
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
ssl_version=ssl.PROTOCOL_SSLv23)
except SSLError:
# Generally this happens when an HTTP request is received on a
# secure socket. We don't do anything because it will be detected
# by Worker and dealt with appropriately.
pass
return sock
def wrap_socket(self, sock):
try:
if self.clientcert_req:
ca_certs = self.interface[4]
cert_reqs = ssl.CERT_OPTIONAL
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
ssl_version=ssl.PROTOCOL_SSLv23)
else:
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
ssl_version=ssl.PROTOCOL_SSLv23)
except SSLError:
# Generally this happens when an HTTP request is received on a
# secure socket. We don't do anything because it will be detected
# by Worker and dealt with appropriately.
pass
return sock
def create_secure_socket(self, cert_dir, host='localhost', port=8001):
context = ssl.SSLContext() # Defaults to SSL/TLS support with PROTOCOL_TLS (best for now for compatibility)
context.verify_mode = ssl.CERT_OPTIONAL # ssl.CERT_REQUIRED is more secure
context.check_hostname = False # Hostname verification on certs (Dont want for now)
context.load_default_certs(purpose=ssl.Purpose.CLIENT_AUTH) # Load the public CA certs for the server socket (need CLIENT_AUTH param)
self.generate_server_self_cert(cert_dir)
context.load_cert_chain(join(cert_dir, "KnowledgeManagement.crt"), keyfile=join(cert_dir, "KnowledgeManagement.key"))
# Create the secure socket that will be listening for connections.
# Note that the SSL handshake is NOT performed upon connection so the server can securely transfer the cert
# if required.
self.server_proc = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self.server_proc.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
secureSocket = context.wrap_socket(self.server_proc, server_side=True)
secureSocket.bind((host, port))
print("SSL Server started on {}:{}".format(host, port))
#self.server_negotiate_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
#self.server_negotiate_sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
#self.server_negotiate_sock.bind((host, port+1))
self.is_listening=True
return secureSocket
def wrap_socket(self, sock):
try:
if self.clientcert_req:
ca_certs = self.interface[4]
cert_reqs = ssl.CERT_OPTIONAL
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
ssl_version=ssl.PROTOCOL_SSLv23)
else:
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
ssl_version=ssl.PROTOCOL_SSLv23)
except SSLError:
# Generally this happens when an HTTP request is received on a
# secure socket. We don't do anything because it will be detected
# by Worker and dealt with appropriately.
pass
return sock
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
ssl_ca_certs=None, **kwargs):
if not ssl_available:
raise RedisError("Python wasn't built with SSL support")
super(SSLConnection, self).__init__(**kwargs)
self.keyfile = ssl_keyfile
self.certfile = ssl_certfile
if ssl_cert_reqs is None:
ssl_cert_reqs = ssl.CERT_NONE
elif isinstance(ssl_cert_reqs, basestring):
CERT_REQS = {
'none': ssl.CERT_NONE,
'optional': ssl.CERT_OPTIONAL,
'required': ssl.CERT_REQUIRED
}
if ssl_cert_reqs not in CERT_REQS:
raise RedisError(
"Invalid SSL Certificate Requirements Flag: %s" %
ssl_cert_reqs)
ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
self.cert_reqs = ssl_cert_reqs
self.ca_certs = ssl_ca_certs
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
ssl_ca_certs=None, **kwargs):
if not ssl_available:
raise RedisError("Python wasn't built with SSL support")
super(SSLConnection, self).__init__(**kwargs)
self.keyfile = ssl_keyfile
self.certfile = ssl_certfile
if ssl_cert_reqs is None:
ssl_cert_reqs = ssl.CERT_NONE
elif isinstance(ssl_cert_reqs, basestring):
CERT_REQS = {
'none': ssl.CERT_NONE,
'optional': ssl.CERT_OPTIONAL,
'required': ssl.CERT_REQUIRED
}
if ssl_cert_reqs not in CERT_REQS:
raise RedisError(
"Invalid SSL Certificate Requirements Flag: %s" %
ssl_cert_reqs)
ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
self.cert_reqs = ssl_cert_reqs
self.ca_certs = ssl_ca_certs
def wrap_socket(self, sock):
try:
if self.clientcert_req:
ca_certs = self.interface[4]
cert_reqs = ssl.CERT_OPTIONAL
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
ssl_version=ssl.PROTOCOL_SSLv23)
else:
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
ssl_version=ssl.PROTOCOL_SSLv23)
except SSLError:
# Generally this happens when an HTTP request is received on a
# secure socket. We don't do anything because it will be detected
# by Worker and dealt with appropriately.
pass
return sock
def run(self, handler):
context = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
context.load_cert_chain(handler.certificate_file, handler.private_key_file)
context.load_verify_locations(cafile=handler.certificate_file)
context.load_verify_locations(cafile=handler.crl_file)
context.options &= ssl.OP_NO_SSLv3
context.options &= ssl.OP_NO_SSLv2
context.verify_flags |= ssl.VERIFY_CRL_CHECK_LEAF
context.verify_mode = ssl.CERT_OPTIONAL
self.options['ssl_context'] = context
logger.info('Starting server on host %s port %d.', self.host, self.port)
server = pywsgi.WSGIServer(
(self.host, self.port),
handler,
ssl_context=context,
handler_class=RequestHandler,
log=logger,
error_log=logger,
)
server.serve_forever()
def _connect(self):
if self.server_tls:
raise Exception("TBD")
print(self.client.tls_set(self.server_tls.server_cert, cert_reqs=ssl.CERT_OPTIONAL))
print(self.client.connect(self.host, self.port))
else:
self.client.connect(self.host, self.port)
self.client.subscribe(self.topics)
def on_connect(client, userdata, flags, rc):
print("Connected with result code "+str(rc))
self.client.on_connect = on_connect
def on_publish(client, userdata, mid):
print("Successfully published mid %d" % mid)
self.client.on_publish = on_publish
def wrap_socket(self, sock):
try:
if self.clientcert_req:
ca_certs = self.interface[4]
cert_reqs = ssl.CERT_OPTIONAL
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
ssl_version=ssl.PROTOCOL_SSLv23)
else:
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
ssl_version=ssl.PROTOCOL_SSLv23)
except SSLError:
# Generally this happens when an HTTP request is received on a
# secure socket. We don't do anything because it will be detected
# by Worker and dealt with appropriately.
pass
return sock
def wrap_socket(self, sock):
try:
if self.clientcert_req:
ca_certs = self.interface[4]
cert_reqs = ssl.CERT_OPTIONAL
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
ssl_version=ssl.PROTOCOL_SSLv23)
else:
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
ssl_version=ssl.PROTOCOL_SSLv23)
except SSLError:
# Generally this happens when an HTTP request is received on a
# secure socket. We don't do anything because it will be detected
# by Worker and dealt with appropriately.
pass
return sock
def wrap_socket(self, sock):
try:
if self.clientcert_req:
ca_certs = self.interface[4]
cert_reqs = ssl.CERT_OPTIONAL
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
ssl_version=ssl.PROTOCOL_SSLv23)
else:
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
ssl_version=ssl.PROTOCOL_SSLv23)
except SSLError:
# Generally this happens when an HTTP request is received on a
# secure socket. We don't do anything because it will be detected
# by Worker and dealt with appropriately.
pass
return sock
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
ssl_ca_certs=None, **kwargs):
if not ssl_available:
raise RedisError("Python wasn't built with SSL support")
super(SSLConnection, self).__init__(**kwargs)
self.keyfile = ssl_keyfile
self.certfile = ssl_certfile
if ssl_cert_reqs is None:
ssl_cert_reqs = ssl.CERT_NONE
elif isinstance(ssl_cert_reqs, basestring):
CERT_REQS = {
'none': ssl.CERT_NONE,
'optional': ssl.CERT_OPTIONAL,
'required': ssl.CERT_REQUIRED
}
if ssl_cert_reqs not in CERT_REQS:
raise RedisError(
"Invalid SSL Certificate Requirements Flag: %s" %
ssl_cert_reqs)
ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
self.cert_reqs = ssl_cert_reqs
self.ca_certs = ssl_ca_certs
def __init__(self, keyfile=None, certfile=None,
cert_reqs=None, ca_certs=None):
self.keyfile = keyfile
self.certfile = certfile
if cert_reqs is None:
self.cert_reqs = ssl.CERT_NONE
elif isinstance(cert_reqs, str):
CERT_REQS = {
'none': ssl.CERT_NONE,
'optional': ssl.CERT_OPTIONAL,
'required': ssl.CERT_REQUIRED
}
if cert_reqs not in CERT_REQS:
raise RedisError(
"Invalid SSL Certificate Requirements Flag: %s" %
cert_reqs)
self.cert_reqs = CERT_REQS[cert_reqs]
self.ca_certs = ca_certs
self.context = None
def test_cert_reqs_options(self):
import ssl
with pytest.raises(TypeError) as e:
pool = aredis.ConnectionPool.from_url(
'rediss://?ssl_cert_reqs=none&ssl_keyfile=test')
assert e.message == 'certfile should be a valid filesystem path'
assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_NONE
with pytest.raises(TypeError) as e:
pool = aredis.ConnectionPool.from_url(
'rediss://?ssl_cert_reqs=optional&ssl_keyfile=test')
assert e.message == 'certfile should be a valid filesystem path'
assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_OPTIONAL
with pytest.raises(TypeError) as e:
pool = aredis.ConnectionPool.from_url(
'rediss://?ssl_cert_reqs=required&ssl_keyfile=test')
assert e.message == 'certfile should be a valid filesystem path'
assert pool.get_connection().ssl_context.verify_mode == ssl.CERT_REQUIRED
def __init__(self, ssl_keyfile=None, ssl_certfile=None, ssl_cert_reqs=None,
ssl_ca_certs=None, **kwargs):
if not ssl_available:
raise RedisError("Python wasn't built with SSL support")
super(SSLConnection, self).__init__(**kwargs)
self.keyfile = ssl_keyfile
self.certfile = ssl_certfile
if ssl_cert_reqs is None:
ssl_cert_reqs = ssl.CERT_NONE
elif isinstance(ssl_cert_reqs, basestring):
CERT_REQS = {
'none': ssl.CERT_NONE,
'optional': ssl.CERT_OPTIONAL,
'required': ssl.CERT_REQUIRED
}
if ssl_cert_reqs not in CERT_REQS:
raise RedisError(
"Invalid SSL Certificate Requirements Flag: %s" %
ssl_cert_reqs)
ssl_cert_reqs = CERT_REQS[ssl_cert_reqs]
self.cert_reqs = ssl_cert_reqs
self.ca_certs = ssl_ca_certs
def wrap_socket(self, sock):
try:
if self.clientcert_req:
ca_certs = self.interface[4]
cert_reqs = ssl.CERT_OPTIONAL
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
cert_reqs=cert_reqs,
ca_certs=ca_certs,
ssl_version=ssl.PROTOCOL_SSLv23)
else:
sock = ssl.wrap_socket(sock,
keyfile=self.interface[2],
certfile=self.interface[3],
server_side=True,
ssl_version=ssl.PROTOCOL_SSLv23)
except SSLError:
# Generally this happens when an HTTP request is received on a
# secure socket. We don't do anything because it will be detected
# by Worker and dealt with appropriately.
pass
return sock
def validate_cert_reqs(option, value):
"""Validate the cert reqs are valid. It must be None or one of the
three values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or
``ssl.CERT_REQUIRED``.
"""
if value is None:
return value
elif isinstance(value, string_type) and hasattr(ssl, value):
value = getattr(ssl, value)
if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
return value
raise ValueError("The value of %s must be one of: "
"`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
"`ssl.CERT_REQUIRED" % (option,))
def __get_verify_mode(self):
"""Whether to try to verify other peers' certificates and how to
behave if verification fails. This attribute must be one of
ssl.CERT_NONE, ssl.CERT_OPTIONAL or ssl.CERT_REQUIRED.
"""
return self._verify_mode
def validate_cert_reqs(option, value):
"""Validate the cert reqs are valid. It must be None or one of the three
values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
if value is None:
return value
if HAS_SSL:
if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
return value
raise ConfigurationError("The value of %s must be one of: "
"`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
"`ssl.CERT_REQUIRED" % (option,))
else:
raise ConfigurationError("The value of %s is set but can't be "
"validated. The ssl module is not available"
% (option,))
def _verify_cert(self, peercert):
"""Returns True if peercert is valid according to the configured
validation mode and hostname.
The ssl handshake already tested the certificate for a valid
CA signature; the only thing that remains is to check
the hostname.
"""
if isinstance(self._ssl_options, dict):
verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
elif isinstance(self._ssl_options, ssl.SSLContext):
verify_mode = self._ssl_options.verify_mode
assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
return True
cert = self.socket.getpeercert()
if cert is None and verify_mode == ssl.CERT_REQUIRED:
gen_log.warning("No SSL certificate given")
return False
try:
ssl_match_hostname(peercert, self._server_hostname)
except SSLCertificateError as e:
gen_log.warning("Invalid SSL certificate: %s" % e)
return False
else:
return True
def validate_cert_reqs(option, value):
"""Validate the cert reqs are valid. It must be None or one of the three
values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
if value is None:
return value
if HAS_SSL:
if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
return value
raise ConfigurationError("The value of %s must be one of: "
"`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
"`ssl.CERT_REQUIRED" % (option,))
else:
raise ConfigurationError("The value of %s is set but can't be "
"validated. The ssl module is not available"
% (option,))
def validate_cert_reqs(option, value):
"""Validate the cert reqs are valid. It must be None or one of the three
values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
if value is None:
return value
if HAS_SSL:
if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
return value
raise ConfigurationError("The value of %s must be one of: "
"`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
"`ssl.CERT_REQUIRED" % (option,))
else:
raise ConfigurationError("The value of %s is set but can't be "
"validated. The ssl module is not available"
% (option,))
def _verify_cert(self, peercert):
"""Returns True if peercert is valid according to the configured
validation mode and hostname.
The ssl handshake already tested the certificate for a valid
CA signature; the only thing that remains is to check
the hostname.
"""
if isinstance(self._ssl_options, dict):
verify_mode = self._ssl_options.get('cert_reqs', ssl.CERT_NONE)
elif isinstance(self._ssl_options, ssl.SSLContext):
verify_mode = self._ssl_options.verify_mode
assert verify_mode in (ssl.CERT_NONE, ssl.CERT_REQUIRED, ssl.CERT_OPTIONAL)
if verify_mode == ssl.CERT_NONE or self._server_hostname is None:
return True
cert = self.socket.getpeercert()
if cert is None and verify_mode == ssl.CERT_REQUIRED:
gen_log.warning("No SSL certificate given")
return False
try:
ssl_match_hostname(peercert, self._server_hostname)
except SSLCertificateError as e:
gen_log.warning("Invalid SSL certificate: %s" % e)
return False
else:
return True
def validate_cert_reqs(option, value):
"""Validate the cert reqs are valid. It must be None or one of the three
values ``ssl.CERT_NONE``, ``ssl.CERT_OPTIONAL`` or ``ssl.CERT_REQUIRED``"""
if value is None:
return value
if HAS_SSL:
if value in (ssl.CERT_NONE, ssl.CERT_OPTIONAL, ssl.CERT_REQUIRED):
return value
raise ConfigurationError("The value of %s must be one of: "
"`ssl.CERT_NONE`, `ssl.CERT_OPTIONAL` or "
"`ssl.CERT_REQUIRED" % (option,))
else:
raise ConfigurationError("The value of %s is set but can't be "
"validated. The ssl module is not available"
% (option,))
def set_cert(self, key_file=None, cert_file=None,
cert_reqs='CERT_NONE', ca_certs=None):
ssl_req_scheme = {
'CERT_NONE': ssl.CERT_NONE,
'CERT_OPTIONAL': ssl.CERT_OPTIONAL,
'CERT_REQUIRED': ssl.CERT_REQUIRED
}
self.key_file = key_file
self.cert_file = cert_file
self.cert_reqs = ssl_req_scheme.get(cert_reqs) or ssl.CERT_NONE
self.ca_certs = ca_certs