def test_task_failed_exit(mocked_plugin, tpool, configure_model, freeze_time):
tsk = create_task()
polled = {"a": False}
def side_effect(*args, **kwargs):
if polled["a"]:
return False
polled["a"] = True
return True
mocked_plugin.alive.side_effect = side_effect
mocked_plugin.returncode = os.EX_SOFTWARE
tpool.submit(tsk)
time.sleep(2)
tsk.refresh()
assert tsk.executor_host == platform.node()
assert tsk.executor_pid == 100
assert tsk.time_failed == int(freeze_time.return_value)
assert not tsk.time_cancelled
assert not tsk.time_completed
assert not tpool.global_stop_event.is_set()
assert tsk._id not in tpool.data
评论列表
文章目录