def test_can_subscribe_to_more_than_one_trigger(sub_mgr):
non_local = {'trigger_count': 0}
query = 'subscription multiTrigger($filterBoolean: Boolean,\
$uga: String){testFilterMulti(filterBoolean: $filterBoolean,\
a: $uga, b: 66)}'
def callback(err, payload):
if err:
sys.exit(err)
else:
try:
if payload is None:
assert True
else:
assert payload.data.get('testFilterMulti') == 'good_filter'
non_local['trigger_count'] += 1
except AssertionError as e:
sys.exit(e)
if non_local['trigger_count'] == 2:
sub_mgr.pubsub.greenlet.kill()
def publish_and_unsubscribe_handler(sub_id):
sub_mgr.publish('not_a_trigger', {'filterBoolean': False})
sub_mgr.publish('trigger_1', {'filterBoolean': True})
sub_mgr.publish('trigger_2', {'filterBoolean': True})
sub_mgr.pubsub.greenlet.join()
sub_mgr.unsubscribe(sub_id)
p1 = sub_mgr.subscribe(query, 'multiTrigger', callback,
{'filterBoolean': True,
'uga': 'UGA'}, {}, None, None)
p2 = p1.then(publish_and_unsubscribe_handler)
p2.get()
test_subscription_manager.py 文件源码
python
阅读 26
收藏 0
点赞 0
评论 0
评论列表
文章目录