def test_celery__TransactionAwareTask__run_in_worker__1(
celery_session_worker, storage_file, interaction):
"""It handles specific exceptions in a new transaction after abort."""
job = except_with_handler.delay()
transaction.commit()
with pytest.raises(Exception):
job.get()
with open_zodb_copy(storage_file) as app:
assert [('data', ('a1', 'a2', 1, 4, u'User'))] == list(app.items())
评论列表
文章目录