def test_task_cancelled(mocked_plugin, tpool, configure_model, freeze_time):
tsk = create_task()
polled = {"a": False}
def side_effect(*args, **kwargs):
if polled["a"]:
return False
polled["a"] = True
return True
mocked_plugin.alive.side_effect = side_effect
tpool.submit(tsk)
tpool.cancel(tsk._id)
time.sleep(2)
tsk.refresh()
assert tsk.executor_host == platform.node()
assert tsk.executor_pid == 100
assert tsk.time_cancelled == int(freeze_time.return_value)
assert not tsk.time_completed
assert not tsk.time_failed
assert not tpool.global_stop_event.is_set()
assert tsk._id not in tpool.data
评论列表
文章目录