def test_celery__TransactionAwareTask____call____2(
celery_session_worker, interaction):
"""It aborts the transaction and retries in case of an ConflictError."""
result = conflict_task.delay()
transaction.commit()
with pytest.raises(Exception) as err:
result.get()
assert 'MaxRetriesExceededError' == err.value.__class__.__name__
评论列表
文章目录