def _get_workflow_from_task(task):
"""
Looks for an instance of `Workflow` linked to the task or a method of
`Workflow` among the done callbacks of the asyncio task. Returns None if
not found.
If the task was triggered from within a workflow it MUST have a `workflow`
attribute that points to it and at least one done callback that is a method
of `Workflow`.
"""
if isinstance(task, TukioTask):
workflow = task.workflow
if workflow:
return workflow
for cb in task._callbacks:
# inspect.getcallargs() gives access to the implicit 'self' arg of
# the bound method but it is marked as deprecated since
# Python 3.5.1 and the new `inspect.Signature` object does NOT do
# the job :((
if inspect.ismethod(cb):
inst = cb.__self__
elif isinstance(cb, functools.partial):
try:
inst = cb.func.__self__
except AttributeError:
continue
else:
continue
if isinstance(inst, Workflow):
return inst
return None
评论列表
文章目录