def response_handler(self, msg):
ident, resp_bytes = msg
resp = json_decode(to_unicode(resp_bytes))
app_log.debug('resp: %s', resp)
subid = self.subscriptions.get('commandExecute')
if subid is not None:
self.write_message(json_encode({
'type': 'subscription_data',
'id': subid,
'payload': {
'data': resp
}
}))
python类json_encode()的实例源码
def generate(self, **kwargs):
"""Generate this template with the given arguments."""
namespace = {
"escape": escape.xhtml_escape,
"xhtml_escape": escape.xhtml_escape,
"url_escape": escape.url_escape,
"json_encode": escape.json_encode,
"squeeze": escape.squeeze,
"linkify": escape.linkify,
"datetime": datetime,
"_tt_utf8": escape.utf8, # for internal use
"_tt_string_types": (unicode_type, bytes),
# __name__ and __loader__ allow the traceback mechanism to find
# the generated source code.
"__name__": self.name.replace('.', '_'),
"__loader__": ObjectDict(get_source=lambda name: self.code),
}
namespace.update(self.namespace)
namespace.update(kwargs)
exec_in(self.compiled, namespace)
execute = namespace["_tt_execute"]
# Clear the traceback module's cache of source data now that
# we've generated a new template (mainly for this module's
# unittests, where different tests reuse the same name).
linecache.clearcache()
return execute()
def finish(self):
response_body = utf8(json_encode(self.chunk_lengths))
self.connection.write_headers(
ResponseStartLine('HTTP/1.1', 200, 'OK'),
HTTPHeaders({'Content-Length': str(len(response_body))}))
self.connection.write(response_body)
self.connection.finish()
def test_json_encode(self):
# json deals with strings, not bytes. On python 2 byte strings will
# convert automatically if they are utf8; on python 3 byte strings
# are not allowed.
self.assertEqual(json_decode(json_encode(u("\u00e9"))), u("\u00e9"))
if bytes is str:
self.assertEqual(json_decode(json_encode(utf8(u("\u00e9")))), u("\u00e9"))
self.assertRaises(UnicodeDecodeError, json_encode, b"\xe9")
def write(self, chunk):
"""Writes the given chunk to the output buffer.
To write the output to the network, use the flush() method below.
If the given chunk is a dictionary, we write it as JSON and set
the Content-Type of the response to be ``application/json``.
(if you want to send JSON as a different ``Content-Type``, call
set_header *after* calling write()).
Note that lists are not converted to JSON because of a potential
cross-site security vulnerability. All JSON output should be
wrapped in a dictionary. More details at
http://haacked.com/archive/2009/06/25/json-hijacking.aspx/ and
https://github.com/facebook/tornado/issues/1009
"""
if self._finished:
raise RuntimeError("Cannot write() after finish()")
if not isinstance(chunk, (bytes, unicode_type, dict)):
message = "write() only accepts bytes, unicode, and dict objects"
if isinstance(chunk, list):
message += ". Lists not accepted for security reasons; see http://www.tornadoweb.org/en/stable/web.html#tornado.web.RequestHandler.write"
raise TypeError(message)
if isinstance(chunk, dict):
chunk = escape.json_encode(chunk)
self.set_header("Content-Type", "application/json; charset=UTF-8")
chunk = utf8(chunk)
self._write_buffer.append(chunk)
def generate(self, **kwargs):
"""Generate this template with the given arguments."""
namespace = {
"escape": escape.xhtml_escape,
"xhtml_escape": escape.xhtml_escape,
"url_escape": escape.url_escape,
"json_encode": escape.json_encode,
"squeeze": escape.squeeze,
"linkify": escape.linkify,
"datetime": datetime,
"_tt_utf8": escape.utf8, # for internal use
"_tt_string_types": (unicode_type, bytes),
# __name__ and __loader__ allow the traceback mechanism to find
# the generated source code.
"__name__": self.name.replace('.', '_'),
"__loader__": ObjectDict(get_source=lambda name: self.code),
}
namespace.update(self.namespace)
namespace.update(kwargs)
exec_in(self.compiled, namespace)
execute = namespace["_tt_execute"]
# Clear the traceback module's cache of source data now that
# we've generated a new template (mainly for this module's
# unittests, where different tests reuse the same name).
linecache.clearcache()
return execute()
def test_json_encode(self):
# json deals with strings, not bytes. On python 2 byte strings will
# convert automatically if they are utf8; on python 3 byte strings
# are not allowed.
self.assertEqual(json_decode(json_encode(u("\u00e9"))), u("\u00e9"))
if bytes is str:
self.assertEqual(json_decode(json_encode(utf8(u("\u00e9")))), u("\u00e9"))
self.assertRaises(UnicodeDecodeError, json_encode, b"\xe9")
def write(self, chunk):
"""Writes the given chunk to the output buffer.
To write the output to the network, use the flush() method below.
If the given chunk is a dictionary, we write it as JSON and set
the Content-Type of the response to be ``application/json``.
(if you want to send JSON as a different ``Content-Type``, call
set_header *after* calling write()).
Note that lists are not converted to JSON because of a potential
cross-site security vulnerability. All JSON output should be
wrapped in a dictionary. More details at
http://haacked.com/archive/2009/06/25/json-hijacking.aspx/ and
https://github.com/facebook/tornado/issues/1009
"""
if self._finished:
raise RuntimeError("Cannot write() after finish()")
if not isinstance(chunk, (bytes, unicode_type, dict)):
message = "write() only accepts bytes, unicode, and dict objects"
if isinstance(chunk, list):
message += ". Lists not accepted for security reasons; see http://www.tornadoweb.org/en/stable/web.html#tornado.web.RequestHandler.write"
raise TypeError(message)
if isinstance(chunk, dict):
chunk = escape.json_encode(chunk)
self.set_header("Content-Type", "application/json; charset=UTF-8")
chunk = utf8(chunk)
self._write_buffer.append(chunk)
def generate(self, **kwargs):
"""Generate this template with the given arguments."""
namespace = {
"escape": escape.xhtml_escape,
"xhtml_escape": escape.xhtml_escape,
"url_escape": escape.url_escape,
"json_encode": escape.json_encode,
"squeeze": escape.squeeze,
"linkify": escape.linkify,
"datetime": datetime,
"_tt_utf8": escape.utf8, # for internal use
"_tt_string_types": (unicode_type, bytes),
# __name__ and __loader__ allow the traceback mechanism to find
# the generated source code.
"__name__": self.name.replace('.', '_'),
"__loader__": ObjectDict(get_source=lambda name: self.code),
}
namespace.update(self.namespace)
namespace.update(kwargs)
exec_in(self.compiled, namespace)
execute = namespace["_tt_execute"]
# Clear the traceback module's cache of source data now that
# we've generated a new template (mainly for this module's
# unittests, where different tests reuse the same name).
linecache.clearcache()
return execute()
def finish(self):
response_body = utf8(json_encode(self.chunk_lengths))
self.connection.write_headers(
ResponseStartLine('HTTP/1.1', 200, 'OK'),
HTTPHeaders({'Content-Length': str(len(response_body))}))
self.connection.write(response_body)
self.connection.finish()
def test_json_encode(self):
# json deals with strings, not bytes. On python 2 byte strings will
# convert automatically if they are utf8; on python 3 byte strings
# are not allowed.
self.assertEqual(json_decode(json_encode(u("\u00e9"))), u("\u00e9"))
if bytes is str:
self.assertEqual(json_decode(json_encode(utf8(u("\u00e9")))), u("\u00e9"))
self.assertRaises(UnicodeDecodeError, json_encode, b"\xe9")
def finish(self, chunk=None):
if chunk is not None:
chunk = json_encode(chunk)
super(RESTfulHandler, self).finish(chunk)
def respond_json(method):
@wraps(method)
def call(self, *args, **kwargs):
data = method(self, *args, **kwargs)
if data:
self.set_header('Content-Type', 'application/json')
self.write(json_encode(data))
return call
def respond_json_async(method):
@wraps(method)
async def call(self, *args, **kwargs):
future = method(self, *args, **kwargs)
if future:
data = await future
if data:
self.set_header('Content-Type', 'application/json')
self.write(json_encode(data))
return call
def get(self):
with self._connect() as connection:
items = sources(connection)
self.write(json_encode(items))
def get(self, url):
with self._connect() as connection:
try:
item = source(connection, url)
self.write(json_encode(item))
except IndexError:
self.set_status(404, "Can't find '%s'" % url)
def get(self):
with self._connect() as connection:
items = sources(connection)
self.write(json_encode(items))
def get(self, url):
with self._connect() as connection:
try:
item = source(connection, url)
self.write(json_encode(item))
except IndexError:
self.set_status(404, "Can't find '%s'" % url)
def reply(self, data):
self.set_header('Content-Type', 'application/json; charset=UTF-8')
self.finish(json_encode(data))
test_monitors.py 文件源码
项目:simple-environment-monitor-system
作者: diegorubin
项目源码
文件源码
阅读 24
收藏 0
点赞 0
评论 0
def test_create(self):
body = json_encode({
'type': 'TextMonitor', 'label': 'server',
'url': SERVICE_URL,
'data': {'expected': 'LIVE'}
})
response = self.fetch('/api/monitors', method='POST', body=body)
self.assertEqual(response.code, 200)
self.assertEqual(len(Monitor().all()), 1)