test_subscription_transport.py 文件源码

python
阅读 29 收藏 0 点赞 0 评论 0

项目:graphql-python-subscriptions 作者: hballard 项目源码 文件源码
def test_sends_a_keep_alive_signal_in_the_socket(server_with_keep_alive):

    node_script = '''
        module.paths.push('{0}');
        WebSocket = require('ws');
        const GRAPHQL_SUBSCRIPTIONS = 'graphql-subscriptions';
        const KEEP_ALIVE = 'keepalive';
        const client = new WebSocket('ws://localhost:{1}/socket',
        GRAPHQL_SUBSCRIPTIONS);
        let yieldCount = 0;
        client.onmessage = (message) => {{
            let msg = JSON.parse(message.data)
            if (msg.type === KEEP_ALIVE) {{
                yieldCount += 1;
                if (yieldCount > 1) {{
                let returnMsg = {{'type': msg.type, 'yieldCount': yieldCount}}
                console.log(JSON.stringify(returnMsg))
                client.close();
                }}
            }}
        }};
    '''.format(
        os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)

    p = subprocess.Popen(
        ['node', '-e', node_script],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT)
    q = queue.Queue()
    t = threading.Thread(target=enqueue_output, args=(p.stdout, q))
    t.daemon = True
    t.start()
    time.sleep(.5)
    while True:
        try:
            _line = q.get_nowait()
            if isinstance(_line, bytes):
                line = _line.decode()
            ret_value = json.loads(line)
        except ValueError:
            pass
        except queue.Empty:
            break
    assert ret_value['type'] == 'keepalive'
    assert ret_value['yieldCount'] > 1
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号