def test_transaction_aware_task_success(celery_worker, task_app_request, dbsession, demo_user):
"""Transaction aware tasks works in eager mode.."""
with transaction.manager:
# Do a dummy database write
u = dbsession.query(User).first()
demotasks.modify_username.apply_async([u.id], tm=transaction.manager)
# Let the task travel in Celery queue
time.sleep(2.0)
# Task should not fire unless we exit the transaction
assert u.username != "set by celery"
# Let the transaction commit
time.sleep(0.5)
# Task has now fired after transaction was committed
with transaction.manager:
u = dbsession.query(User).get(1)
assert u.username == "set by celery"
评论列表
文章目录