test_subscription_manager.py 文件源码

python
阅读 40 收藏 0 点赞 0 评论 0

项目:graphql-python-subscriptions 作者: hballard 项目源码 文件源码
def test_can_subscribe_to_more_than_one_trigger(sub_mgr):
    non_local = {'trigger_count': 0}

    query = 'subscription multiTrigger($filterBoolean: Boolean,\
            $uga: String){testFilterMulti(filterBoolean: $filterBoolean,\
            a: $uga, b: 66)}'

    def callback(err, payload):
        if err:
            sys.exit(err)
        else:
            try:
                if payload is None:
                    assert True
                else:
                    assert payload.data.get('testFilterMulti') == 'good_filter'
                    non_local['trigger_count'] += 1
            except AssertionError as e:
                sys.exit(e)
        if non_local['trigger_count'] == 2:
            sub_mgr.pubsub.greenlet.kill()

    def publish_and_unsubscribe_handler(sub_id):
        sub_mgr.publish('not_a_trigger', {'filterBoolean': False})
        sub_mgr.publish('trigger_1', {'filterBoolean': True})
        sub_mgr.publish('trigger_2', {'filterBoolean': True})
        sub_mgr.pubsub.greenlet.join()
        sub_mgr.unsubscribe(sub_id)

    p1 = sub_mgr.subscribe(query, 'multiTrigger', callback,
                           {'filterBoolean': True,
                            'uga': 'UGA'}, {}, None, None)
    p2 = p1.then(publish_and_unsubscribe_handler)
    p2.get()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号