def test_celery__TransactionAwareTask____call____1__cov(
interaction, eager_celery_app):
"""It aborts the transaction in case of an error during task execution.
As it is hard to collect coverage for subprocesses we use this test for
coverage only.
"""
task_call = 'celery.Task.__call__'
configure_zope = 'z3c.celery.celery.TransactionAwareTask.configure_zope'
with mock.patch(configure_zope), \
mock.patch(task_call, side_effect=RuntimeError) as task_call, \
mock.patch('transaction.abort') as abort:
zope.security.management.endInteraction()
with pytest.raises(RuntimeError):
# We want to simulate a run in worker. The RuntimeError is raised
# by the mock
eager_task(_run_asynchronously_=True)
assert task_call.called
assert abort.called
评论列表
文章目录