def test_bulk_verbose_output(self):
output = [ITEM_SUCCESS,
ITEM_SUCCESS,
ITEM_FAILED,
ITEM_FAILED,
ITEM_FAILED]
return_value = [succeed(output)]
self.bulk_utility.streaming_bulk = MagicMock(return_value=return_value)
items = yield self.bulk_utility.bulk(None, verbose=True)
self.assertEqual([ITEM_SUCCESS] * 2,
[x for x in items if x == ITEM_SUCCESS])
self.assertEqual([ITEM_FAILED] * 3,
[x for x in items if x == ITEM_FAILED])
评论列表
文章目录