test_celery.py 文件源码

python
阅读 20 收藏 0 点赞 0 评论 0

项目:z3c.celery 作者: ZeitOnline 项目源码 文件源码
def test_celery__TransactionAwareTask____call____2__cov(
        interaction, eager_celery_app):
    """It aborts the transaction and retries in case of a ConflictError.

    As it is hard to collect coverage for sub-processes we use this test for
    coverage only.
    """
    configure_zope = 'z3c.celery.celery.TransactionAwareTask.configure_zope'
    with mock.patch(configure_zope), \
            mock.patch('transaction.abort',
                       side_effect=transaction.abort) as abort, \
            mock.patch('time.sleep') as sleep:

        zope.security.management.endInteraction()
        with pytest.raises(celery.exceptions.MaxRetriesExceededError):
            conflict_task(_run_asynchronously_=True)

    assert abort.called
    assert 2 == sleep.call_count  # We have max_retries=1 for this task
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号