def test_no_req_ids(self, *args):
in_flight = 3
get_holders = self.make_get_holders(1)
max_connection = Mock(spec=Connection, host='localhost',
lock=Lock(),
max_request_id=in_flight - 1, in_flight=in_flight,
is_idle=True, is_defunct=False, is_closed=False)
holder = get_holders.return_value[0]
holder.get_connections.return_value.append(max_connection)
self.run_heartbeat(get_holders)
holder.get_connections.assert_has_calls([call()] * get_holders.call_count)
self.assertEqual(max_connection.in_flight, in_flight)
self.assertEqual(max_connection.send_msg.call_count, 0)
self.assertEqual(max_connection.send_msg.call_count, 0)
max_connection.defunct.assert_has_calls([call(ANY)] * get_holders.call_count)
holder.return_connection.assert_has_calls(
[call(max_connection)] * get_holders.call_count)
test_connection.py 文件源码
python
阅读 29
收藏 0
点赞 0
评论 0
评论列表
文章目录