def test_cluster_proxy_pool():
with patch('django_nameko.rpc.ClusterRpcProxy') as FakeClusterRpcProxy:
pool = rpc.ClusterRpcProxyPool(dict(), pool_size=2)
pool.start()
assert pool.queue.qsize() == 2
with pool.next() as client:
assert pool.queue.qsize() == 1
client.foo.bar()
assert call().start().foo.bar() in FakeClusterRpcProxy.mock_calls
with pool.next():
assert pool.queue.qsize() == 0
tools.assert_raises(queue_six.Empty, pool.next, timeout=1)
assert pool.queue.qsize() == 1
assert pool.queue.qsize() == 2
pool.stop()
评论列表
文章目录