def test_sends_a_keep_alive_signal_in_the_socket(server_with_keep_alive):
node_script = '''
module.paths.push('{0}');
WebSocket = require('ws');
const GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions';
const KEEP_ALIVE = 'keepalive';
const client = new WebSocket('ws://localhost:{1}/socket',
GRAPHQL_SUBSCRIPTIONS);
let yieldCount = 0;
client.onmessage = (message) => {{
let msg = JSON.parse(message.data)
if (msg.type === KEEP_ALIVE) {{
yieldCount += 1;
if (yieldCount > 1) {{
let returnMsg = {{'type': msg.type, 'yieldCount': yieldCount}}
console.log(JSON.stringify(returnMsg))
client.close();
}}
}}
}};
'''.format(
os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)
p = subprocess.Popen(
['node', '-e', node_script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
q = queue.Queue()
t = threading.Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True
t.start()
time.sleep(.5)
while True:
try:
_line = q.get_nowait()
if isinstance(_line, bytes):
line = _line.decode()
ret_value = json.loads(line)
except ValueError:
pass
except queue.Empty:
break
assert ret_value['type'] == 'keepalive'
assert ret_value['yieldCount'] > 1
test_subscription_transport.py 文件源码
python
阅读 29
收藏 0
点赞 0
评论 0
评论列表
文章目录