def simpleVerifyHostname(connection, hostname):
"""
Check only the common name in the certificate presented by the peer and
only for an exact match.
This is to provide I{something} in the way of hostname verification to
users who haven't installed C{service_identity}. This check is overly
strict, relies on a deprecated TLS feature (you're supposed to ignore the
commonName if the subjectAlternativeName extensions are present, I
believe), and lots of valid certificates will fail.
@param connection: the OpenSSL connection to verify.
@type connection: L{OpenSSL.SSL.Connection}
@param hostname: The hostname expected by the user.
@type hostname: L{unicode}
@raise twisted.internet.ssl.VerificationError: if the common name and
hostname don't match.
"""
commonName = connection.get_peer_certificate().get_subject().commonName
if commonName != hostname:
raise SimpleVerificationError(repr(commonName) + "!=" +
repr(hostname))
python类Connection()的实例源码
def clientConnectionForTLS(self, tlsProtocol):
"""
Create a TLS connection for a client.
@note: This will call C{set_app_data} on its connection. If you're
delegating to this implementation of this method, don't ever call
C{set_app_data} or C{set_info_callback} on the returned connection,
or you'll break the implementation of various features of this
class.
@param tlsProtocol: the TLS protocol initiating the connection.
@type tlsProtocol: L{twisted.protocols.tls.TLSMemoryBIOProtocol}
@return: the configured client connection.
@rtype: L{OpenSSL.SSL.Connection}
"""
context = self._ctx
connection = SSL.Connection(context, None)
connection.set_app_data(tlsProtocol)
return connection
def _identityVerifyingInfoCallback(self, connection, where, ret):
"""
U{info_callback
<http://pythonhosted.org/pyOpenSSL/api/ssl.html#OpenSSL.SSL.Context.set_info_callback>
} for pyOpenSSL that verifies the hostname in the presented certificate
matches the one passed to this L{ClientTLSOptions}.
@param connection: the connection which is handshaking.
@type connection: L{OpenSSL.SSL.Connection}
@param where: flags indicating progress through a TLS handshake.
@type where: L{int}
@param ret: ignored
@type ret: ignored
"""
if where & SSL.SSL_CB_HANDSHAKE_START:
connection.set_tlsext_host_name(self._hostnameBytes)
elif where & SSL.SSL_CB_HANDSHAKE_DONE:
try:
verifyHostname(connection, self._hostnameASCII)
except VerificationError:
f = Failure()
transport = connection.get_app_data()
transport.failVerification(f)
def makeConnection(self, transport):
"""
Connect this wrapper to the given transport and initialize the
necessary L{OpenSSL.SSL.Connection} with a memory BIO.
"""
self._tlsConnection = self.factory._createConnection(self)
self._appSendBuffer = []
# Add interfaces provided by the transport we are wrapping:
for interface in providedBy(transport):
directlyProvides(self, interface)
# Intentionally skip ProtocolWrapper.makeConnection - it might call
# wrappedProtocol.makeConnection, which we want to make conditional.
Protocol.makeConnection(self, transport)
self.factory.registerProtocol(self)
if self._connectWrapped:
# Now that the TLS layer is initialized, notify the application of
# the connection.
ProtocolWrapper.makeConnection(self, transport)
# Now that we ourselves have a transport (initialized by the
# ProtocolWrapper.makeConnection call above), kick off the TLS
# handshake.
self._checkHandshakeStatus()
def _checkHandshakeStatus(self):
"""
Ask OpenSSL to proceed with a handshake in progress.
Initially, this just sends the ClientHello; after some bytes have been
stuffed in to the C{Connection} object by C{dataReceived}, it will then
respond to any C{Certificate} or C{KeyExchange} messages.
"""
# The connection might already be aborted (eg. by a callback during
# connection setup), so don't even bother trying to handshake in that
# case.
if self._aborted:
return
try:
self._tlsConnection.do_handshake()
except WantReadError:
self._flushSendBIO()
except Error:
self._tlsShutdownFinished(Failure())
else:
self._handshakeDone = True
if IHandshakeListener.providedBy(self.wrappedProtocol):
self.wrappedProtocol.handshakeCompleted()
def _unbufferPendingWrites(self):
"""
Un-buffer all waiting writes in L{TLSMemoryBIOProtocol._appSendBuffer}.
"""
pendingWrites, self._appSendBuffer = self._appSendBuffer, []
for eachWrite in pendingWrites:
self._write(eachWrite)
if self._appSendBuffer:
# If OpenSSL ran out of buffer space in the Connection on our way
# through the loop earlier and re-buffered any of our outgoing
# writes, then we're done; don't consider any future work.
return
if self._producer is not None:
# If we have a registered producer, let it know that we have some
# more buffer space.
self._producer.resumeProducing()
return
if self.disconnecting:
# Finally, if we have no further buffered data, no producer wants
# to send us more data in the future, and the application told us
# to end the stream, initiate a TLS shutdown.
self._shutdownTLS()
def clientConnectionForTLS(self, protocol):
"""
Construct an OpenSSL server connection from the wrapped old-style
context factory.
@note: Since old-style context factories don't distinguish between
clients and servers, this is exactly the same as
L{_ContextFactoryToConnectionFactory.serverConnectionForTLS}.
@param protocol: The protocol initiating a TLS connection.
@type protocol: L{TLSMemoryBIOProtocol}
@return: a connection
@rtype: L{OpenSSL.SSL.Connection}
"""
return self._connectionForTLS(protocol)
def _applyProtocolNegotiation(self, connection):
"""
Applies ALPN/NPN protocol neogitation to the connection, if the factory
supports it.
@param connection: The OpenSSL connection object to have ALPN/NPN added
to it.
@type connection: L{OpenSSL.SSL.Connection}
@return: Nothing
@rtype: L{None}
"""
if IProtocolNegotiationFactory.providedBy(self.wrappedFactory):
protocols = self.wrappedFactory.acceptableProtocols()
context = connection.get_context()
_setAcceptableProtocols(context, protocols)
return
def _createConnection(self, tlsProtocol):
"""
Create an OpenSSL connection and set it up good.
@param tlsProtocol: The protocol which is establishing the connection.
@type tlsProtocol: L{TLSMemoryBIOProtocol}
@return: an OpenSSL connection object for C{tlsProtocol} to use
@rtype: L{OpenSSL.SSL.Connection}
"""
connectionCreator = self._connectionCreator
if self._creatorInterface is IOpenSSLClientConnectionCreator:
connection = connectionCreator.clientConnectionForTLS(tlsProtocol)
self._applyProtocolNegotiation(connection)
connection.set_connect_state()
else:
connection = connectionCreator.serverConnectionForTLS(tlsProtocol)
self._applyProtocolNegotiation(connection)
connection.set_accept_state()
return connection
def simple_response(self, status, msg=""):
"""Write a simple response back to the client."""
status = str(status)
buf = ["%s %s\r\n" % (self.environ['ACTUAL_SERVER_PROTOCOL'], status),
"Content-Length: %s\r\n" % len(msg),
"Content-Type: text/plain\r\n"]
if status[:3] == "413" and self.response_protocol == 'HTTP/1.1':
# Request Entity Too Large
self.close_connection = True
buf.append("Connection: close\r\n")
buf.append("\r\n")
if msg:
buf.append(msg)
try:
self.wfile.sendall("".join(buf))
except socket.error, x:
if x.args[0] not in socket_errors_to_ignore:
raise
def _send_loop(self, send_method, data, *args):
if self.act_non_blocking:
return send_method(data, *args)
_timeout_exc = socket_timeout('timed out')
while True:
try:
return send_method(data, *args)
except socket.error as e:
eno = get_errno(e)
if eno == errno.ENOTCONN or eno not in SOCKET_BLOCKING:
raise
try:
self._trampoline(self.fd, write=True, timeout=self.gettimeout(),
timeout_exc=_timeout_exc)
except IOClosed:
raise socket.error(errno.ECONNRESET, 'Connection closed by another thread')
def shutdown_safe(sock):
""" Shuts down the socket. This is a convenience method for
code that wants to gracefully handle regular sockets, SSL.Connection
sockets from PyOpenSSL and ssl.SSLSocket objects from Python 2.6
interchangeably. Both types of ssl socket require a shutdown() before
close, but they have different arity on their shutdown method.
Regular sockets don't need a shutdown before close, but it doesn't hurt.
"""
try:
try:
# socket, ssl.SSLSocket
return sock.shutdown(socket.SHUT_RDWR)
except TypeError:
# SSL.Connection
return sock.shutdown()
except socket.error as e:
# we don't care if the socket is already closed;
# this will often be the case in an http server context
if get_errno(e) not in (errno.ENOTCONN, errno.EBADF, errno.ENOTSOCK):
raise
def ssl(sock, keyfile=None, certfile=None):
context = SSL.Context(SSL.SSLv23_METHOD)
if certfile is not None:
context.use_certificate_file(certfile)
if keyfile is not None:
context.use_privatekey_file(keyfile)
context.set_verify(SSL.VERIFY_NONE, lambda *x: True)
timeout = sock.gettimeout()
try:
sock = sock._sock
except AttributeError:
pass
connection = SSL.Connection(context, sock)
ssl_sock = SSLObject(connection)
ssl_sock.settimeout(timeout)
try:
sock.getpeername()
except Exception:
# no, no connection yet
pass
else:
# yes, do the handshake
ssl_sock.do_handshake()
return ssl_sock
def getPeerCert(connection, get_chain=False):
"""Get the PEM-encoded certificate or cert chain of the remote host.
:param connection: A :class:`OpenSSL.SSL.Connection <Connection>`.
:param bool get_chain: If True, get the all certificates in the
chain. Otherwise, only get the remote host's certificate.
:returns: A PEM-encoded x509 certificate. If
:param:`getPeerCert.get_chain <get_chain>` is True, returns a list
of PEM-encoded x509 certificates.
"""
if not get_chain:
x509_cert = connection.get_peer_certificate()
pem_cert = crypto.dump_certificate(crypto.FILETYPE_PEM, x509_cert)
return pem_cert
else:
cert_chain = []
x509_cert_chain = connection.get_peer_cert_chain()
for x509_cert in x509_cert_chain:
pem_cert = crypto.dump_certificate(crypto.FILETYPE_PEM,
x509_cert)
cert_chain.append(pem_cert)
return cert_chain
def __enter__(self):
self.ctx = SSL.Context(SSL.TLSv1_METHOD)
if self.verify:
self.ctx.set_verify(SSL.VERIFY_PEER, self.validate_cert)
self.ctx.load_verify_locations(self.verify)
self.trusted_cert = crypto.load_certificate(crypto.FILETYPE_PEM, file(self.verify).read())
self.ctx.use_privatekey(self.pkey)
self.ctx.use_certificate(self.cert)
self.sock = SSL.Connection(self.ctx, socket.socket(socket.AF_INET, socket.SOCK_STREAM))
self.sock.connect((self.host, self.port))
return self
def __init__(self, registerInstance, server_address, keyFile=DEFAULTKEYFILE, certFile=DEFAULTCERTFILE, logRequests=True):
"""Secure Documenting XML-RPC server.
It it very similar to DocXMLRPCServer but it uses HTTPS for transporting XML data.
"""
DocXMLRPCServer.__init__(self, server_address, SecureDocXMLRpcRequestHandler, logRequests)
self.logRequests = logRequests
# stuff for doc server
try: self.set_server_title(registerInstance.title)
except AttributeError: self.set_server_title('default title')
try: self.set_server_name(registerInstance.name)
except AttributeError: self.set_server_name('default name')
if registerInstance.__doc__: self.set_server_documentation(registerInstance.__doc__)
else: self.set_server_documentation('default documentation')
self.register_introspection_functions()
# init stuff, handle different versions:
try:
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self)
except TypeError:
# An exception is raised in Python 2.5 as the prototype of the __init__
# method has changed and now has 3 arguments (self, allow_none, encoding)
SimpleXMLRPCServer.SimpleXMLRPCDispatcher.__init__(self, False, None)
SocketServer.BaseServer.__init__(self, server_address, SecureDocXMLRpcRequestHandler)
self.register_instance(registerInstance) # for some reason, have to register instance down here!
# SSL socket stuff
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_privatekey_file(keyFile)
ctx.use_certificate_file(certFile)
self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
self.server_bind()
self.server_activate()
# requests count and condition, to allow for keyboard quit via CTL-C
self.requests = 0
self.rCondition = Condition()
def __init__(self, server_address, certfile, keyfile, RequestHandlerClass, bind_and_activate=True):
from OpenSSL import SSL
SocketServer.BaseServer.__init__(self, server_address, RequestHandlerClass)
ctx = SSL.Context(SSL.SSLv23_METHOD)
ctx.use_privatekey_file(keyfile)
ctx.use_certificate_file(certfile)
self.socket = SSL.Connection(ctx, socket.socket(self.address_family, self.socket_type))
if bind_and_activate:
self.server_bind()
self.server_activate()
def __init__(self, *args):
self._ssl_conn = _ssl.Connection(*args)
self._lock = _RLock()
def set_shutdown(self, mode):
"""Set the shutdown state of the Connection.
@param mode: bit vector of either or both of SENT_SHUTDOWN and
RECEIVED_SHUTDOWN
"""
self.__ssl_conn.set_shutdown(mode)
def get_shutdown(self):
"""Get the shutdown state of the Connection.
@return: bit vector of either or both of SENT_SHUTDOWN and
RECEIVED_SHUTDOWN
"""
return self.__ssl_conn.get_shutdown()