def test_rejects_unparsable_message(server):
node_script = '''
module.paths.push('{0}');
WebSocket = require('ws');
const GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions';
const client = new WebSocket('ws://localhost:{1}/socket',
GRAPHQL_SUBSCRIPTIONS);
client.onmessage = (message) => {{
let msg = JSON.parse(message.data)
console.log(JSON.stringify({{[msg.type]: msg}}))
client.close();
}};
client.onopen = () => {{
client.send('HI');
}}
'''.format(
os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)
p = subprocess.Popen(
['node', '-e', node_script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
q = queue.Queue()
t = threading.Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True
t.start()
time.sleep(.2)
ret_values = {}
while True:
try:
_line = q.get_nowait()
if isinstance(_line, bytes):
line = _line.decode()
line = json.loads(line)
ret_values[list(line.keys())[0]] = line[list(line.keys())[0]]
except ValueError:
pass
except queue.Empty:
break
assert ret_values['subscription_fail']
assert len(ret_values['subscription_fail']['payload']['errors']) > 0
test_subscription_transport.py 文件源码
python
阅读 32
收藏 0
点赞 0
评论 0
评论列表
文章目录