test_taskpool.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:ceph-lcm 作者: Mirantis 项目源码 文件源码
def test_task_failed_exit(mocked_plugin, tpool, configure_model, freeze_time):
    tsk = create_task()

    polled = {"a": False}

    def side_effect(*args, **kwargs):
        if polled["a"]:
            return False
        polled["a"] = True
        return True

    mocked_plugin.alive.side_effect = side_effect
    mocked_plugin.returncode = os.EX_SOFTWARE

    tpool.submit(tsk)
    time.sleep(2)
    tsk.refresh()
    assert tsk.executor_host == platform.node()
    assert tsk.executor_pid == 100
    assert tsk.time_failed == int(freeze_time.return_value)
    assert not tsk.time_cancelled
    assert not tsk.time_completed
    assert not tpool.global_stop_event.is_set()
    assert tsk._id not in tpool.data
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号