def on_message(self, message):
data = json_decode(message)
subid = data.get('id')
if data.get('type') == 'subscription_start':
self.on_subscribe(subid, data)
elif data.get('type') == 'subscription_end':
self.on_unsubscribe(subid, data)
else:
raise ValueError('Invalid type: {0}'.format(data.get('type')))
python类json_decode()的实例源码
subscription_handler.py 文件源码
项目:react-tornado-graphql-example
作者: yatsu
项目源码
文件源码
阅读 19
收藏 0
点赞 0
评论 0
def response_handler(self, msg):
ident, resp_bytes = msg
resp = json_decode(to_unicode(resp_bytes))
app_log.debug('resp: %s', resp)
subid = self.subscriptions.get('commandExecute')
if subid is not None:
self.write_message(json_encode({
'type': 'subscription_data',
'id': subid,
'payload': {
'data': resp
}
}))
def test_types(self):
headers = {"Cookie": "foo=bar"}
response = self.fetch("/typecheck?foo=bar", headers=headers)
data = json_decode(response.body)
self.assertEqual(data, {})
response = self.fetch("/typecheck", method="POST", body="foo=bar", headers=headers)
data = json_decode(response.body)
self.assertEqual(data, {})
# This is kind of hacky, but run some of the HTTPServer tests through
# WSGIContainer and WSGIApplication to make sure everything survives
# repeated disassembly and reassembly.
def fetch_json(self, *args, **kwargs):
response = self.fetch(*args, **kwargs)
response.rethrow()
return json_decode(response.body)
def test_query_string_encoding(self):
response = self.fetch("/echo?foo=%C3%A9")
data = json_decode(response.body)
self.assertEqual(data, {u("foo"): [u("\u00e9")]})
def test_empty_query_string(self):
response = self.fetch("/echo?foo=&foo=")
data = json_decode(response.body)
self.assertEqual(data, {u("foo"): [u(""), u("")]})
def test_types(self):
headers = {"Cookie": "foo=bar"}
response = self.fetch("/typecheck?foo=bar", headers=headers)
data = json_decode(response.body)
self.assertEqual(data, {})
response = self.fetch("/typecheck", method="POST", body="foo=bar", headers=headers)
data = json_decode(response.body)
self.assertEqual(data, {})
def test_double_slash(self):
# urlparse.urlsplit (which tornado.httpserver used to use
# incorrectly) would parse paths beginning with "//" as
# protocol-relative urls.
response = self.fetch("//doubleslash")
self.assertEqual(200, response.code)
self.assertEqual(json_decode(response.body), {})
def test_uncompressed(self):
response = self.fetch('/', method='POST', body='foo=bar')
self.assertEquals(json_decode(response.body), {u('foo'): [u('bar')]})
def test_gzip(self):
response = self.post_gzip('foo=bar')
self.assertEquals(json_decode(response.body), {u('foo'): [u('bar')]})
def fetch_chunk_sizes(self, **kwargs):
response = self.fetch('/', method='POST', **kwargs)
response.rethrow()
chunks = json_decode(response.body)
self.assertEqual(len(self.BODY), sum(chunks))
for chunk_size in chunks:
self.assertLessEqual(chunk_size, self.CHUNK_SIZE,
'oversized chunk: ' + str(chunks))
self.assertGreater(chunk_size, 0,
'empty chunk: ' + str(chunks))
return chunks
def fetch_json(self, path):
return json_decode(self.fetch(path).body)
def fetch_json(self, *args, **kwargs):
response = self.fetch(*args, **kwargs)
response.rethrow()
return json_decode(response.body)
def test_types(self):
cookie_value = to_unicode(create_signed_value(self.COOKIE_SECRET,
"asdf", "qwer"))
response = self.fetch("/typecheck/asdf?foo=bar",
headers={"Cookie": "asdf=" + cookie_value})
data = json_decode(response.body)
self.assertEqual(data, {})
response = self.fetch("/typecheck/asdf?foo=bar", method="POST",
headers={"Cookie": "asdf=" + cookie_value},
body="foo=bar")
def test_pos(self):
response = self.fetch('/pos/foo')
response.rethrow()
data = json_decode(response.body)
self.assertEqual(data, {'args': ['foo'], 'kwargs': {}})
def test_kw(self):
response = self.fetch('/kw/foo')
response.rethrow()
data = json_decode(response.body)
self.assertEqual(data, {'args': [], 'kwargs': {'path': 'foo'}})
def test_catch_error(self):
response = self.fetch('/')
self.assertEqual(json_decode(response.body),
{'arg_name': 'foo',
'log_message': 'Missing argument foo'})
def test_flow_control_fixed_body(self):
response = self.fetch('/', body='abcdefghijklmnopqrstuvwxyz',
method='POST')
response.rethrow()
self.assertEqual(json_decode(response.body),
dict(methods=['prepare', 'data_received',
'data_received', 'data_received',
'post']))
def test_flow_control_compressed_body(self):
bytesio = BytesIO()
gzip_file = gzip.GzipFile(mode='w', fileobj=bytesio)
gzip_file.write(b'abcdefghijklmnopqrstuvwxyz')
gzip_file.close()
compressed_body = bytesio.getvalue()
response = self.fetch('/', body=compressed_body, method='POST',
headers={'Content-Encoding': 'gzip'})
response.rethrow()
self.assertEqual(json_decode(response.body),
dict(methods=['prepare', 'data_received',
'data_received', 'data_received',
'post']))