def test_manual_transaction(celery_worker, task_app_request, dbsession, demo_user):
"""Manual transaction lifecycles within task work."""
with transaction.manager:
# Do a dummy database write
u = dbsession.query(User).first()
demotasks.modify_username_manual_transaction.apply_async(kwargs={"user_id": u.id}, tm=transaction.manager)
# Let the task travel in Celery queue
time.sleep(1.0)
# Task should not fire unless we exit the transaction
assert u.username != "set by celery"
# Let the transaction commit
time.sleep(0.5)
# Task has now fired after transaction was committed
with transaction.manager:
u = dbsession.query(User).first()
assert u.username == "set by celery"
评论列表
文章目录