test_celery.py 文件源码

python
阅读 25 收藏 0 点赞 0 评论 0

项目:z3c.celery 作者: ZeitOnline 项目源码 文件源码
def test_celery__TransactionAwareTask____call____1__cov(
        interaction, eager_celery_app):
    """It aborts the transaction in case of an error during task execution.

    As it is hard to collect coverage for subprocesses we use this test for
    coverage only.
    """
    task_call = 'celery.Task.__call__'
    configure_zope = 'z3c.celery.celery.TransactionAwareTask.configure_zope'
    with mock.patch(configure_zope), \
            mock.patch(task_call, side_effect=RuntimeError) as task_call, \
            mock.patch('transaction.abort') as abort:

        zope.security.management.endInteraction()
        with pytest.raises(RuntimeError):
            # We want to simulate a run in worker. The RuntimeError is raised
            # by the mock
            eager_task(_run_asynchronously_=True)

    assert task_call.called
    assert abort.called
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号