def test_use_filter_functions_properly(sub_mgr):
query = 'subscription Filter1($filterBoolean: Boolean) {\
testFilter(filterBoolean: $filterBoolean)}'
def callback(err, payload):
if err:
sys.exit(err)
else:
try:
if payload is None:
assert True
else:
assert payload.data.get('testFilter') == 'good_filter'
sub_mgr.pubsub.greenlet.kill()
except AssertionError as e:
sys.exit(e)
def publish_and_unsubscribe_handler(sub_id):
sub_mgr.publish('filter_1', {'filterBoolean': False})
sub_mgr.publish('filter_1', {'filterBoolean': True})
sub_mgr.pubsub.greenlet.join()
sub_mgr.unsubscribe(sub_id)
p1 = sub_mgr.subscribe(query, 'Filter1', callback, {'filterBoolean': True},
{}, None, None)
p2 = p1.then(publish_and_unsubscribe_handler)
p2.get()
test_subscription_manager.py 文件源码
python
阅读 30
收藏 0
点赞 0
评论 0
评论列表
文章目录