test_executor.py 文件源码

python
阅读 27 收藏 0 点赞 0 评论 0

项目:incubator-ariatosca 作者: apache 项目源码 文件源码
def execute_and_assert(executor, storage=None):
    expected_value = 'value'
    successful_ctx = MockContext(
        storage, task_kwargs=dict(function=_get_function(mock_successful_task))
    )
    failing_ctx = MockContext(
        storage, task_kwargs=dict(function=_get_function(mock_failing_task))
    )
    task_with_inputs_ctx = MockContext(
        storage,
        task_kwargs=dict(function=_get_function(mock_task_with_input),
                         arguments={'input': models.Argument.wrap('input', 'value')})
    )

    for ctx in [successful_ctx, failing_ctx, task_with_inputs_ctx]:
        executor.execute(ctx)

    @retrying.retry(stop_max_delay=10000, wait_fixed=100)
    def assertion():
        assert successful_ctx.states == ['start', 'success']
        assert failing_ctx.states == ['start', 'failure']
        assert task_with_inputs_ctx.states == ['start', 'failure']
        assert isinstance(failing_ctx.exception, MockException)
        assert isinstance(task_with_inputs_ctx.exception, MockException)
        assert task_with_inputs_ctx.exception.message == expected_value
    assertion()
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号