def test_passes_through_websocket_request_to_on_subscribe(server):
node_script = '''
module.paths.push('{0}')
WebSocket = require('ws')
const SubscriptionClient =
require('subscriptions-transport-ws').SubscriptionClient
const client = new SubscriptionClient('ws://localhost:{1}/socket')
client.subscribe({{
query: `subscription context {{
context
}}`,
variables: {{}},
}}, (error, result) => {{
if (error) {{
console.log(JSON.stringify(error));
}}
}}
);
'''.format(
os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)
try:
subprocess.check_output(
['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=.2)
except:
while True:
mock = server.get_nowait()
if mock.name == 'on_subscribe':
mock.assert_called_once()
mock.assert_called_with_contains('websocket')
break
test_subscription_transport.py 文件源码
python
阅读 33
收藏 0
点赞 0
评论 0
评论列表
文章目录