def close(self, code=1000, reason=''):
"""
Call this method to initiate the websocket connection
closing by sending a close frame to the connected peer.
The ``code`` is the status code representing the
termination's reason.
Once this method is called, the ``server_terminated``
attribute is set. Calling this method several times is
safe as the closing frame will be sent only the first
time.
.. seealso:: Defined Status Codes http://tools.ietf.org/html/rfc6455#section-7.4.1
"""
if not self.server_terminated:
self.server_terminated = True
try:
self._write(self.stream.close(code=code, reason=reason).single(mask=self.stream.always_mask))
except Exception as ex:
logger.error("Error when terminating the connection: %s", str(ex))
python类html()的实例源码
def get_ssl_certificate(self, binary_form=False):
"""Returns the client's SSL certificate, if any.
To use client certificates, the HTTPServer must have been constructed
with cert_reqs set in ssl_options, e.g.::
server = HTTPServer(app,
ssl_options=dict(
certfile="foo.crt",
keyfile="foo.key",
cert_reqs=ssl.CERT_REQUIRED,
ca_certs="cacert.crt"))
By default, the return value is a dictionary (or None, if no
client certificate is present). If ``binary_form`` is true, a
DER-encoded form of the certificate is returned instead. See
SSLSocket.getpeercert() in the standard library for more
details.
http://docs.python.org/library/ssl.html#sslsocket-objects
"""
try:
return self.connection.stream.socket.getpeercert(
binary_form=binary_form)
except ssl.SSLError:
return None
def get_ssl_certificate(self, binary_form=False):
"""Returns the client's SSL certificate, if any.
To use client certificates, the HTTPServer must have been constructed
with cert_reqs set in ssl_options, e.g.::
server = HTTPServer(app,
ssl_options=dict(
certfile="foo.crt",
keyfile="foo.key",
cert_reqs=ssl.CERT_REQUIRED,
ca_certs="cacert.crt"))
By default, the return value is a dictionary (or None, if no
client certificate is present). If ``binary_form`` is true, a
DER-encoded form of the certificate is returned instead. See
SSLSocket.getpeercert() in the standard library for more
details.
http://docs.python.org/library/ssl.html#sslsocket-objects
"""
try:
return self.connection.stream.socket.getpeercert(
binary_form=binary_form)
except ssl.SSLError:
return None
def __init__(self, max_window=4, timeout=8, proxy='', ssl_ciphers=None, max_retry=2):
# http://docs.python.org/dev/library/ssl.html
# http://blog.ivanristic.com/2009/07/examples-of-the-information-collected-from-ssl-handshakes.html
# http://src.chromium.org/svn/trunk/src/net/third_party/nss/ssl/sslenum.c
# http://www.openssl.org/docs/apps/ciphers.html
# openssl s_server -accept 443 -key CA.crt -cert CA.crt
# set_ciphers as Modern Browsers
BaseHTTPUtil.__init__(self, GC.LINK_OPENSSL, os.path.join(cert_dir, 'cacerts'), ssl_ciphers)
self.max_window = max_window
self.max_retry = max_retry
self.timeout = timeout
self.proxy = proxy
self.tcp_connection_time = LRUCache(512 if self.gws else 4096)
self.ssl_connection_time = LRUCache(512 if self.gws else 4096)
if self.gws and GC.GAE_ENABLEPROXY:
self.gws_front_connection_time = LRUCache(128)
self.gws_front_connection_time.set('ip', LRUCache(128), noexpire=True)
self.create_ssl_connection = self.create_gws_connection_withproxy
#if self.proxy:
# dns_resolve = self.__dns_resolve_withproxy
# self.create_connection = self.__create_connection_withproxy
# self.create_ssl_connection = self.__create_ssl_connection_withproxy
def set_cert_credentials_provider(self, cert_credentials_provider):
# History issue from Yun SDK where AR9331 embedded Linux only have Python 2.7.3
# pre-installed. In this version, TLSv1_2 is not even an option.
# SSLv23 is a work-around which selects the highest TLS version between the client
# and service. If user installs opensslv1.0.1+, this option will work fine for Mutual
# Auth.
# Note that we cannot force TLSv1.2 for Mutual Auth. in Python 2.7.3 and TLS support
# in Python only starts from Python2.7.
# See also: https://docs.python.org/2/library/ssl.html#ssl.PROTOCOL_SSLv23
if self._use_wss:
ca_path = cert_credentials_provider.get_ca_path()
self._paho_client.tls_set(ca_certs=ca_path, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23)
else:
ca_path = cert_credentials_provider.get_ca_path()
cert_path = cert_credentials_provider.get_cert_path()
key_path = cert_credentials_provider.get_key_path()
self._paho_client.tls_set(ca_certs=ca_path,certfile=cert_path, keyfile=key_path,
cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23)
def index():
'''
Index page for listing all possibilities
'''
htmlpage = '''
<html>
<HEAD>
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/3.3.7/css/bootstrap.min.css" integrity="sha384-BVYiiSIFeK1dGmJRAkycuHAHRg32OmUcww7on3RYdg4Va+PmSTsz/K68vbdEjh4u" crossorigin="anonymous">
</HEAD>
<BODY>
<div class='container'>
<h1>
Welcome to flask tests, here are links to test func calls:
</h1>
<p>
<a href='https://"+lhost+":"+str(lport)+"/listCert'> List of certificate in the pki database </a>
</p>
<p>
<a href='https://"+lhost+":"+str(lport)+"/help'> PKI documentation</a>
</p>
</div>
</BODY>
</html>
'''
# allow us to modify headers
resp = make_response(htmlpage)
resp.headers['Server'] = 'PyKI amaibach API'
return(resp)
def help():
'''
Documentation page
'''
info = open(curScriptDir + "/help.html", 'rt')
helpContent = info.read()
info.close()
# allow us to modify headers
resp = make_response(helpContent)
resp.headers['Server'] = 'PyKI amaibach API'
return(resp)
def closed(self, code, reason=None):
"""
Called when the websocket stream and connection are finally closed.
The provided ``code`` is status set by the other point and
``reason`` is a human readable message.
.. seealso:: Defined Status Codes http://tools.ietf.org/html/rfc6455#section-7.4.1
"""
pass
def get_cluster_version(dcos_url):
"""Given a cluster url, return its version string, such as 1.7, 1.8,
1.8.8, 1.9, 1.9-dev, etc"""
version_url = "%s/%s" % (dcos_url, "dcos-metadata/dcos-version.json")
# Since our test clusters have weird certs that won't validate, turn off
# validation
try:
# this will handle a variety of versions, including some 2.7.x
# (haven't investigated which) and all recent 3.x
noverify_context = ssl.SSLContext()
except:
if hasattr(ssl, 'PROTOCOL_TLS'):
# 2.7.13
noverify_context = ssl.SSLContext(ssl.PROTOCOL_TLS)
elif hasattr(ssl, 'OP_NO_SSLv2'):
# 2.7.9-2.7.12
logger.warn("Old python; Asking for something better than sslv2 or 3")
noverify_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
# explicitly switch these off. This is recommended for ancient
# versions as per https://docs.python.org/2/library/ssl.html#protocol-versions
noverify_context.options |= ssl.OP_NO_SSLv2
noverify_context.options |= ssl.OP_NO_SSLv3
else:
# before 2.7.9
logger.error("*VERY* old python. Very weak/unsafe encryption ahoy.")
noverify_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
response = urllib.request.urlopen(version_url, context=noverify_context)
encoding = 'utf-8' # default
try:
#python3
provided_encoding = response.headers.get_content_charset()
if provided_encoding:
encoding = provided_encoding
except:
pass
json_s = response.read().decode(encoding)
ver_s = json.loads(json_s)['version']
return ver_s
def create_ssl_context(**kwargs):
"""
A helper function around creating an SSL context
https://docs.python.org/3/library/ssl.html#context-creation
Accepts kwargs in the same manner as `create_default_context`.
"""
ctx = ssl.create_default_context(**kwargs)
return ctx
def connect(self, keepAliveInterval=30):
if keepAliveInterval is None :
self._log.error("connect: None type inputs detected.")
raise TypeError("None type inputs detected.")
if not isinstance(keepAliveInterval, int):
self._log.error("connect: Wrong input type detected. KeepAliveInterval must be an integer.")
raise TypeError("Non-integer type inputs detected.")
# Return connect succeeded/failed
ret = False
# TLS configuration
if self._useWebsocket:
# History issue from Yun SDK where AR9331 embedded Linux only have Python 2.7.3
# pre-installed. In this version, TLSv1_2 is not even an option.
# SSLv23 is a work-around which selects the highest TLS version between the client
# and service. If user installs opensslv1.0.1+, this option will work fine for Mutal
# Auth.
# Note that we cannot force TLSv1.2 for Mutual Auth. in Python 2.7.3 and TLS support
# in Python only starts from Python2.7.
# See also: https://docs.python.org/2/library/ssl.html#ssl.PROTOCOL_SSLv23
self._pahoClient.tls_set(ca_certs=self._cafile, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23)
self._log.info("Connection type: Websocket")
else:
self._pahoClient.tls_set(self._cafile, self._cert, self._key, ssl.CERT_REQUIRED, ssl.PROTOCOL_SSLv23) # Throw exception...
self._log.info("Connection type: TLSv1.2 Mutual Authentication")
# Connect
self._pahoClient.connect(self._host, self._port, keepAliveInterval) # Throw exception...
self._pahoClient.loop_start()
TenmsCount = 0
while(TenmsCount != self._connectdisconnectTimeout * 100 and self._connectResultCode == sys.maxsize):
TenmsCount += 1
time.sleep(0.01)
if(self._connectResultCode == sys.maxsize):
self._log.error("Connect timeout.")
self._pahoClient.loop_stop()
raise connectTimeoutException()
elif(self._connectResultCode == 0):
ret = True
self._log.info("Connected to AWS IoT.")
self._log.debug("Connect time consumption: " + str(float(TenmsCount) * 10) + "ms.")
else:
self._log.error("A connect error happened: " + str(self._connectResultCode))
self._pahoClient.loop_stop()
raise connectError(self._connectResultCode)
return ret
def _get_from_pending(self):
"""
The SSL socket object provides the same interface
as the socket interface but behaves differently.
When data is sent over a SSL connection
more data may be read than was requested from by
the ws4py websocket object.
In that case, the data may have been indeed read
from the underlying real socket, but not read by the
application which will expect another trigger from the
manager's polling mechanism as if more data was still on the
wire. This will happen only when new data is
sent by the other peer which means there will be
some delay before the initial read data is handled
by the application.
Due to this, we have to rely on a non-public method
to query the internal SSL socket buffer if it has indeed
more data pending in its buffer.
Now, some people in the Python community
`discourage <https://bugs.python.org/issue21430>`_
this usage of the ``pending()`` method because it's not
the right way of dealing with such use case. They advise
`this approach <https://docs.python.org/dev/library/ssl.html#notes-on-non-blocking-sockets>`_
instead. Unfortunately, this applies only if the
application can directly control the poller which is not
the case with the WebSocket abstraction here.
We therefore rely on this `technic <http://stackoverflow.com/questions/3187565/select-and-ssl-in-python>`_
which seems to be valid anyway.
This is a bit of a shame because we have to process
more data than what wanted initially.
"""
data = b""
pending = self.sock.pending()
while pending:
data += self.sock.recv(pending)
pending = self.sock.pending()
return data
def connect(self, keepAliveInterval=30):
if keepAliveInterval is None :
self._log.error("connect: None type inputs detected.")
raise TypeError("None type inputs detected.")
if not isinstance(keepAliveInterval, int):
self._log.error("connect: Wrong input type detected. KeepAliveInterval must be an integer.")
raise TypeError("Non-integer type inputs detected.")
# Return connect succeeded/failed
ret = False
# TLS configuration
if self._useWebsocket:
# History issue from Yun SDK where AR9331 embedded Linux only have Python 2.7.3
# pre-installed. In this version, TLSv1_2 is not even an option.
# SSLv23 is a work-around which selects the highest TLS version between the client
# and service. If user installs opensslv1.0.1+, this option will work fine for Mutal
# Auth.
# Note that we cannot force TLSv1.2 for Mutual Auth. in Python 2.7.3 and TLS support
# in Python only starts from Python2.7.
# See also: https://docs.python.org/2/library/ssl.html#ssl.PROTOCOL_SSLv23
self._pahoClient.tls_set(ca_certs=self._cafile, cert_reqs=ssl.CERT_REQUIRED, tls_version=ssl.PROTOCOL_SSLv23)
self._log.info("Connection type: Websocket")
else:
self._pahoClient.tls_set(self._cafile, self._cert, self._key, ssl.CERT_REQUIRED, ssl.PROTOCOL_SSLv23) # Throw exception...
self._log.info("Connection type: TLSv1.2 Mutual Authentication")
# Connect
self._pahoClient.connect(self._host, self._port, keepAliveInterval) # Throw exception...
self._pahoClient.loop_start()
TenmsCount = 0
while(TenmsCount != self._connectdisconnectTimeout * 100 and self._connectResultCode == sys.maxsize):
TenmsCount += 1
time.sleep(0.01)
if(self._connectResultCode == sys.maxsize):
self._log.error("Connect timeout.")
self._pahoClient.loop_stop()
raise connectTimeoutException()
elif(self._connectResultCode == 0):
ret = True
self._log.info("Connected to AWS IoT.")
self._log.debug("Connect time consumption: " + str(float(TenmsCount) * 10) + "ms.")
else:
self._log.error("A connect error happened: " + str(self._connectResultCode))
self._pahoClient.loop_stop()
raise connectError(self._connectResultCode)
return ret