def test_should_call_unsubscribe_when_client_closes_cxn(server_with_mocks):
node_script = '''
module.paths.push('{0}')
WebSocket = require('ws')
const SubscriptionClient =
require('subscriptions-transport-ws').SubscriptionClient
const client = new SubscriptionClient('ws://localhost:{1}/socket')
client.subscribe({{
query: `subscription useInfo($id: String) {{
user(id: $id) {{
id
name
}}
}}`,
operationName: 'useInfo',
variables: {{
id: 3,
}},
}}, function (error, result) {{
// nothing
}}
)
setTimeout(() => {{
client.client.close()
}}, 500)
'''.format(
os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)
try:
subprocess.check_output(
['node', '-e', node_script], stderr=subprocess.STDOUT, timeout=1)
except:
while True:
mock = server_with_mocks.get_nowait()
if mock.name == 'on_unsubscribe':
mock.assert_called_once()
break
test_subscription_transport.py 文件源码
python
阅读 35
收藏 0
点赞 0
评论 0
评论列表
文章目录