def test_celery__TransactionAwareTask__run_in_worker__2(
celery_session_worker, storage_file, interaction):
"""It handles a specific exceptions to abort the transaction but still
count as a successful job."""
job = success_but_abort_transaction.delay()
transaction.commit()
assert 'done' == job.get()
with open_zodb_copy(storage_file) as app:
assert 'flub' not in app
评论列表
文章目录