def test_rejects_client_that_does_not_specifiy_a_supported_protocol(server):
node_script = '''
module.paths.push('{0}')
WebSocket = require('ws')
const client = new WebSocket('ws://localhost:{1}/socket')
client.on('close', (code) => {{
console.log(JSON.stringify(code))
}}
);
'''.format(
os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)
p = subprocess.Popen(
['node', '-e', node_script],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT)
q = queue.Queue()
t = threading.Thread(target=enqueue_output, args=(p.stdout, q))
t.daemon = True
t.start()
time.sleep(.2)
ret_values = []
while True:
try:
_line = q.get_nowait()
if isinstance(_line, bytes):
line = _line.decode()
line = json.loads(line)
ret_values.append(line)
except ValueError:
pass
except queue.Empty:
break
assert ret_values[0] == 1002 or 1006
test_subscription_transport.py 文件源码
python
阅读 30
收藏 0
点赞 0
评论 0
评论列表
文章目录