def test_bulk_not_stats_only(self):
return_value = [succeed([ITEM_SUCCESS,
ITEM_SUCCESS,
ITEM_FAILED,
ITEM_FAILED,
ITEM_FAILED])]
self.bulk_utility.streaming_bulk = MagicMock(return_value=return_value)
success, errors = yield self.bulk_utility.bulk(None, stats_only=False)
self.assertEqual(success, 2)
self.assertEqual([ERROR_MSG] * 3, errors)
评论列表
文章目录