def test_celery__TransactionAwareTask__delay__5(celery_session_worker, zcml):
"""It allows to run two tasks in a single session."""
auth = zope.component.getUtility(
zope.authentication.interfaces.IAuthentication)
principal = auth.getPrincipal('example.user')
z3c.celery.celery.login_principal(principal)
result1 = get_principal_title_task.delay()
zope.security.management.endInteraction()
principal = auth.getPrincipal('zope.user')
z3c.celery.celery.login_principal(principal)
result2 = get_principal_title_task.delay()
transaction.commit()
assert 'Ben Utzer' == result1.get()
assert 'User' == result2.get()
评论列表
文章目录