def test_celery__TransactionAwareTask____call____2__cov(
interaction, eager_celery_app):
"""It aborts the transaction and retries in case of a ConflictError.
As it is hard to collect coverage for sub-processes we use this test for
coverage only.
"""
configure_zope = 'z3c.celery.celery.TransactionAwareTask.configure_zope'
with mock.patch(configure_zope), \
mock.patch('transaction.abort',
side_effect=transaction.abort) as abort, \
mock.patch('time.sleep') as sleep:
zope.security.management.endInteraction()
with pytest.raises(celery.exceptions.MaxRetriesExceededError):
conflict_task(_run_asynchronously_=True)
assert abort.called
assert 2 == sleep.call_count # We have max_retries=1 for this task
评论列表
文章目录