def ssl(sock, keyfile=None, certfile=None):
context = SSL.Context(SSL.SSLv23_METHOD)
if certfile is not None:
context.use_certificate_file(certfile)
if keyfile is not None:
context.use_privatekey_file(keyfile)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
timeout = sock.gettimeout()
try:
sock = sock._sock
except AttributeError:
pass
connection = SSL.Connection(context, sock)
ssl_sock = SSLObject(connection)
ssl_sock.settimeout(timeout)
try:
sock.getpeername()
except Exception:
# no, no connection yet
pass
else:
# yes, do the handshake
ssl_sock.do_handshake()
return ssl_sock
python类VERIFY_NONE的实例源码
def ssl(sock, keyfile=None, certfile=None):
context = SSL.Context(SSL.SSLv23_METHOD)
if certfile is not None:
context.use_certificate_file(certfile)
if keyfile is not None:
context.use_privatekey_file(keyfile)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
timeout = sock.gettimeout()
try:
sock = sock._sock
except AttributeError:
pass
connection = SSL.Connection(context, sock)
ssl_sock = SSLObject(connection)
ssl_sock.settimeout(timeout)
try:
sock.getpeername()
except Exception:
# no, no connection yet
pass
else:
# yes, do the handshake
ssl_sock.do_handshake()
return ssl_sock
def test_start_tls_smtp(self):
def verify_cb(conn, x509, err_num, err_depth, err_code):
return True
# This flow is simplified from RFC 3207 section 5.
# We don't really need all of this, but it helps to make sure
# that after realistic back-and-forth traffic the buffers end up
# in a sane state.
yield self.server_send_line(b"220 mail.example.com ready\r\n")
yield self.client_send_line(b"EHLO mail.example.com\r\n")
yield self.server_send_line(b"250-mail.example.com welcome\r\n")
yield self.server_send_line(b"250 STARTTLS\r\n")
yield self.client_send_line(b"STARTTLS\r\n")
yield self.server_send_line(b"220 Go ahead\r\n")
client_future = self.client_start_tls(
_client_ssl_options(SSL.VERIFY_NONE, verify_cb))
server_future = self.server_start_tls(_server_ssl_options())
self.client_stream = yield client_future
self.server_stream = yield server_future
self.assertTrue(isinstance(self.client_stream, MicroProxySSLIOStream))
self.assertTrue(isinstance(self.server_stream, MicroProxySSLIOStream))
yield self.client_send_line(b"EHLO mail.example.com\r\n")
yield self.server_send_line(b"250 mail.example.com welcome\r\n")
def create_dest_sslcontext(insecure=False, trusted_ca_certs="", alpn=None):
ssl_ctx = create_basic_sslcontext()
if not insecure:
trusted_ca_certs = trusted_ca_certs or certifi.where()
ssl_ctx.load_verify_locations(trusted_ca_certs)
ssl_ctx.set_verify(
SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT,
certificate_verify_cb)
else:
ssl_ctx.set_verify(SSL.VERIFY_NONE, certificate_verify_cb)
if alpn and HAS_ALPN:
ssl_ctx.set_alpn_protos(alpn)
return ssl_ctx
def ssl(sock, keyfile=None, certfile=None):
context = SSL.Context(SSL.SSLv23_METHOD)
if certfile is not None:
context.use_certificate_file(certfile)
if keyfile is not None:
context.use_privatekey_file(keyfile)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
timeout = sock.gettimeout()
try:
sock = sock._sock
except AttributeError:
pass
connection = SSL.Connection(context, sock)
ssl_sock = SSLObject(connection)
ssl_sock.settimeout(timeout)
try:
sock.getpeername()
except Exception:
# no, no connection yet
pass
else:
# yes, do the handshake
ssl_sock.do_handshake()
return ssl_sock
def ssl(sock, keyfile=None, certfile=None):
context = SSL.Context(SSL.SSLv23_METHOD)
if certfile is not None:
context.use_certificate_file(certfile)
if keyfile is not None:
context.use_privatekey_file(keyfile)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
timeout = sock.gettimeout()
try:
sock = sock._sock
except AttributeError:
pass
connection = SSL.Connection(context, sock)
ssl_sock = SSLObject(connection)
ssl_sock.settimeout(timeout)
try:
sock.getpeername()
except Exception:
# no, no connection yet
pass
else:
# yes, do the handshake
ssl_sock.do_handshake()
return ssl_sock
def _make_client_iostream(self, connection, **kwargs):
def verify_cb(conn, x509, err_num, err_depth, err_code):
return True
dest_context = _client_ssl_options(SSL.VERIFY_NONE, verify_cb)
return MicroProxySSLIOStream(
connection, io_loop=self.io_loop,
ssl_options=dest_context, **kwargs)
def getContext(self):
ctx = ssl.ClientContextFactory.getContext(self)
#TODO: replace VERIFY_NONE with VERIFY_PEER when we have
#a real server with a valid CA signed cert. If that doesn't
#work it'll be possible to use self-signed certs, if they're distributed,
#by placing the cert.pem file and location in the config and uncommenting
#the ctx.load_verify_locations line.
#As it stands this is using non-authenticated certs, meaning MITM exposed.
ctx.set_verify(SSL.VERIFY_NONE, verifyCallback)
#ctx.load_verify_locations("/path/to/cert.pem")
return ctx
def _makeContext(self):
ctx = SSL.Context(self.method)
ctx.set_app_data(_SSLApplicationData())
if self.certificate is not None and self.privateKey is not None:
ctx.use_certificate(self.certificate)
ctx.use_privatekey(self.privateKey)
# Sanity check
ctx.check_privatekey()
verifyFlags = SSL.VERIFY_NONE
if self.verify:
verifyFlags = SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT
if self.caCerts:
store = ctx.get_cert_store()
for cert in self.caCerts:
store.add_cert(cert)
def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
return True
ctx.set_verify(verifyFlags, _trackVerificationProblems)
if self.enableSessions:
sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
ctx.set_session_id(sessionName)
return ctx
def _makeContext(self):
ctx = SSL.Context(self.method)
ctx.set_app_data(_SSLApplicationData())
if self.certificate is not None and self.privateKey is not None:
ctx.use_certificate(self.certificate)
ctx.use_privatekey(self.privateKey)
# Sanity check
ctx.check_privatekey()
verifyFlags = SSL.VERIFY_NONE
if self.verify:
verifyFlags = SSL.VERIFY_PEER | SSL.VERIFY_FAIL_IF_NO_PEER_CERT
if self.caCerts:
store = ctx.get_cert_store()
for cert in self.caCerts:
store.add_cert(cert)
def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
return True
ctx.set_verify(verifyFlags, _trackVerificationProblems)
if self.enableSessions:
sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
ctx.set_session_id(sessionName)
return ctx
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
key_file_passphrase=None):
"""
Creates SSL context containing certificate and key file locations.
"""
ssl_context = SSL.Context(method)
# Key file defaults to certificate file if present.
if cert_file:
ssl_context.use_certificate_file(cert_file)
if key_file_passphrase:
passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
key_file_passphrase
ssl_context.set_passwd_cb(passwd_cb)
if key_file:
ssl_context.use_privatekey_file(key_file)
elif cert_file:
ssl_context.use_privatekey_file(cert_file)
if pem_file or ca_dir:
ssl_context.load_verify_locations(pem_file, ca_dir)
def _callback(conn, x509, errnum, errdepth, preverify_ok):
"""Default certification verification callback.
Performs no checks and returns the status passed in.
"""
return preverify_ok
verify_callback = _callback
if verify_peer:
ssl_context.set_verify_depth(9)
if url:
set_peer_verification_for_url_hostname(ssl_context, url)
else:
ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
else:
ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)
return ssl_context
def _makeContext(self):
ctx = SSL.Context(self.method)
ctx.set_app_data(_SSLApplicationData())
if self.certificate is not None and self.privateKey is not None:
ctx.use_certificate(self.certificate)
ctx.use_privatekey(self.privateKey)
# Sanity check
ctx.check_privatekey()
verifyFlags = SSL.VERIFY_NONE
if self.verify:
verifyFlags = SSL.VERIFY_PEER
if self.requireCertificate:
verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
if self.verifyOnce:
verifyFlags |= SSL.VERIFY_CLIENT_ONCE
if self.caCerts:
store = ctx.get_cert_store()
for cert in self.caCerts:
store.add_cert(cert)
def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
# retcode is the answer OpenSSL's default verifier would have
# given, had we allowed it to run.
if not preverify_ok:
ctx.get_app_data().problems.append(OpenSSLVerifyError(cert, errno, depth))
return preverify_ok
ctx.set_verify(verifyFlags, _trackVerificationProblems)
if self.verifyDepth is not None:
ctx.set_verify_depth(self.verifyDepth)
if self.enableSingleUseKeys:
ctx.set_options(SSL.OP_SINGLE_DH_USE)
if self.fixBrokenPeers:
ctx.set_options(self._OP_ALL)
if self.enableSessions:
sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
ctx.set_session_id(sessionName)
return ctx
def _do_ssl_handshake(self):
try:
self._handshake_reading = False
self._handshake_writing = False
self.socket.do_handshake()
except SSL.WantReadError:
self._handshake_reading = True
return
except SSL.WantWriteError:
self._handshake_writing = True
return
except SSL.SysCallError as e:
err_num = abs(e[0])
if err_num in (errno.EBADF, errno.ENOTCONN, errno.EPERM):
return self.close(exc_info=True)
raise
except SSL.Error as err:
try:
peer = self.socket.getpeername()
except Exception:
peer = '(not connected)'
logger.warning("SSL Error on %s %s: %s",
self.socket.fileno(), peer, err)
return self.close(exc_info=True)
except AttributeError:
return self.close(exc_info=True)
else:
self._ssl_accepting = False
verify_mode = self.socket.get_context().get_verify_mode()
if (verify_mode != SSL.VERIFY_NONE and
self._server_hostname is not None):
try:
verify_hostname(self.socket, self._server_hostname)
except VerificationError as e:
logger.warning("Invalid SSL certificate: {0}".format(e))
self.close(exc_info=True)
return
self._run_ssl_connect_callback()
def _makeContext(self):
ctx = SSL.Context(self.method)
ctx.set_app_data(_SSLApplicationData())
if self.certificate is not None and self.privateKey is not None:
ctx.use_certificate(self.certificate)
ctx.use_privatekey(self.privateKey)
# Sanity check
ctx.check_privatekey()
verifyFlags = SSL.VERIFY_NONE
if self.verify:
verifyFlags = SSL.VERIFY_PEER
if self.requireCertificate:
verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
if self.verifyOnce:
verifyFlags |= SSL.VERIFY_CLIENT_ONCE
if self.caCerts:
store = ctx.get_cert_store()
for cert in self.caCerts:
store.add_cert(cert)
def _trackVerificationProblems(conn,cert,errno,depth,preverify_ok):
# retcode is the answer OpenSSL's default verifier would have
# given, had we allowed it to run.
if not preverify_ok:
ctx.get_app_data().problems.append(OpenSSLVerifyError(cert, errno, depth))
return preverify_ok
ctx.set_verify(verifyFlags, _trackVerificationProblems)
if self.verifyDepth is not None:
ctx.set_verify_depth(self.verifyDepth)
if self.enableSingleUseKeys:
ctx.set_options(SSL.OP_SINGLE_DH_USE)
if self.fixBrokenPeers:
ctx.set_options(self._OP_ALL)
if self.enableSessions:
sessionName = md5.md5("%s-%d" % (reflect.qual(self.__class__), _sessionCounter())).hexdigest()
ctx.set_session_id(sessionName)
return ctx
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
key_file_passphrase=None):
"""
Creates SSL context containing certificate and key file locations.
"""
ssl_context = SSL.Context(method)
# Key file defaults to certificate file if present.
if cert_file:
ssl_context.use_certificate_file(cert_file)
if key_file_passphrase:
passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
key_file_passphrase
ssl_context.set_passwd_cb(passwd_cb)
if key_file:
ssl_context.use_privatekey_file(key_file)
elif cert_file:
ssl_context.use_privatekey_file(cert_file)
if pem_file or ca_dir:
ssl_context.load_verify_locations(pem_file, ca_dir)
def _callback(conn, x509, errnum, errdepth, preverify_ok):
"""Default certification verification callback.
Performs no checks and returns the status passed in.
"""
return preverify_ok
verify_callback = _callback
if verify_peer:
ssl_context.set_verify_depth(9)
if url:
set_peer_verification_for_url_hostname(ssl_context, url)
else:
ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
else:
ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)
return ssl_context
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
key_file_passphrase=None):
"""
Creates SSL context containing certificate and key file locations.
"""
ssl_context = SSL.Context(method)
# Key file defaults to certificate file if present.
if cert_file:
ssl_context.use_certificate_file(cert_file)
if key_file_passphrase:
passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
key_file_passphrase
ssl_context.set_passwd_cb(passwd_cb)
if key_file:
ssl_context.use_privatekey_file(key_file)
elif cert_file:
ssl_context.use_privatekey_file(cert_file)
if pem_file or ca_dir:
ssl_context.load_verify_locations(pem_file, ca_dir)
def _callback(conn, x509, errnum, errdepth, preverify_ok):
"""Default certification verification callback.
Performs no checks and returns the status passed in.
"""
return preverify_ok
verify_callback = _callback
if verify_peer:
ssl_context.set_verify_depth(9)
if url:
set_peer_verification_for_url_hostname(ssl_context, url)
else:
ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
else:
ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)
return ssl_context
def make_ssl_context(key_file=None, cert_file=None, pem_file=None, ca_dir=None,
verify_peer=False, url=None, method=SSL.TLSv1_METHOD,
key_file_passphrase=None):
"""
Creates SSL context containing certificate and key file locations.
"""
ssl_context = SSL.Context(method)
# Key file defaults to certificate file if present.
if cert_file:
ssl_context.use_certificate_file(cert_file)
if key_file_passphrase:
passwd_cb = lambda max_passphrase_len, set_prompt, userdata: \
key_file_passphrase
ssl_context.set_passwd_cb(passwd_cb)
if key_file:
ssl_context.use_privatekey_file(key_file)
elif cert_file:
ssl_context.use_privatekey_file(cert_file)
if pem_file or ca_dir:
ssl_context.load_verify_locations(pem_file, ca_dir)
def _callback(conn, x509, errnum, errdepth, preverify_ok):
"""Default certification verification callback.
Performs no checks and returns the status passed in.
"""
return preverify_ok
verify_callback = _callback
if verify_peer:
ssl_context.set_verify_depth(9)
if url:
set_peer_verification_for_url_hostname(ssl_context, url)
else:
ssl_context.set_verify(SSL.VERIFY_PEER, verify_callback)
else:
ssl_context.set_verify(SSL.VERIFY_NONE, verify_callback)
return ssl_context
def _makeContext(self):
ctx = self._contextFactory(self.method)
ctx.set_options(self._options)
ctx.set_mode(self._mode)
if self.certificate is not None and self.privateKey is not None:
ctx.use_certificate(self.certificate)
ctx.use_privatekey(self.privateKey)
for extraCert in self.extraCertChain:
ctx.add_extra_chain_cert(extraCert)
# Sanity check
ctx.check_privatekey()
verifyFlags = SSL.VERIFY_NONE
if self.verify:
verifyFlags = SSL.VERIFY_PEER
if self.requireCertificate:
verifyFlags |= SSL.VERIFY_FAIL_IF_NO_PEER_CERT
if self.verifyOnce:
verifyFlags |= SSL.VERIFY_CLIENT_ONCE
self.trustRoot._addCACertsToContext(ctx)
# It'd be nice if pyOpenSSL let us pass None here for this behavior (as
# the underlying OpenSSL API call allows NULL to be passed). It
# doesn't, so we'll supply a function which does the same thing.
def _verifyCallback(conn, cert, errno, depth, preverify_ok):
return preverify_ok
ctx.set_verify(verifyFlags, _verifyCallback)
if self.verifyDepth is not None:
ctx.set_verify_depth(self.verifyDepth)
if self.enableSessions:
name = "%s-%d" % (reflect.qual(self.__class__), _sessionCounter())
sessionName = md5(networkString(name)).hexdigest()
ctx.set_session_id(sessionName.encode('ascii'))
if self.dhParameters:
ctx.load_tmp_dh(self.dhParameters._dhFile.path)
ctx.set_cipher_list(self._cipherString.encode('ascii'))
if self._ecCurve is not None:
try:
self._ecCurve.addECKeyToContext(ctx)
except BaseException:
pass # ECDHE support is best effort only.
if self._acceptableProtocols:
# Try to set NPN and ALPN. _acceptableProtocols cannot be set by
# the constructor unless at least one mechanism is supported.
_setAcceptableProtocols(ctx, self._acceptableProtocols)
return ctx