test_subscription_transport.py 文件源码

python
阅读 31 收藏 0 点赞 0 评论 0

项目:graphql-python-subscriptions 作者: hballard 项目源码 文件源码
def test_correctly_sets_the_context_in_on_subscribe(server):
    node_script = '''
        module.paths.push('{0}')
        WebSocket = require('ws')
        const SubscriptionClient =
        require('subscriptions-transport-ws').SubscriptionClient
        const CTX = 'testContext';
        const client = new SubscriptionClient('ws://localhost:{1}/socket')
        client.subscribe({{
            query: `subscription context {{
                context
            }}`,
            variables: {{}},
            context: CTX,
            }}, (error, result) => {{
                client.unsubscribeAll();
                if (error) {{
                  console.log(JSON.stringify(error));
                }}
                if (result) {{
                  console.log(JSON.stringify({{
                    client: {{
                      result: result,
                    }}
                  }}));
                }} else {{
                  // pass
                }}
            }}
        );
    '''.format(
        os.path.join(os.path.dirname(__file__), 'node_modules'), TEST_PORT)

    p = subprocess.Popen(
        ['node', '-e', node_script],
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
        stderr=subprocess.STDOUT)
    time.sleep(.2)
    requests.post(
        'http://localhost:{0}/publish'.format(TEST_PORT), json=['context', {}])
    q = queue.Queue()
    t = threading.Thread(target=enqueue_output, args=(p.stdout, q))
    t.daemon = True
    t.start()
    time.sleep(.2)
    ret_values = {}
    while True:
        try:
            _line = q.get_nowait()
            if isinstance(_line, bytes):
                line = _line.decode()
            line = json.loads(line)
            ret_values[list(line.keys())[0]] = line[list(line.keys())[0]]
        except ValueError:
            pass
        except queue.Empty:
            break
    client = ret_values['client']
    assert client['result']['context']
    assert client['result']['context'] == 'testContext'
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号