def test_retry_loop_does_not_retry_task(mock_retrying_executor):
mock_event = _get_mock_event(is_terminal=True)
mock_retrying_executor.stopping = True
mock_retrying_executor._is_current_attempt = mock.Mock(return_value=True)
mock_retrying_executor.retry = mock.Mock(return_value=False)
mock_retrying_executor.retry_pred = mock.Mock(return_value=False)
mock_retrying_executor.task_retries = mock_retrying_executor.\
task_retries.set(mock_event.task_id, 1)
modified_task_id = mock_event.task_id + '-retry1'
modified_mock_event = mock_event.set(
'task_id',
modified_task_id
)
mock_retrying_executor.src_queue = Queue()
mock_retrying_executor.src_queue.put(modified_mock_event)
mock_retrying_executor.retry_loop()
assert mock_retrying_executor.dest_queue.qsize() == 1
assert len(mock_retrying_executor.task_retries) == 0
评论列表
文章目录