workflow.py 文件源码

python
阅读 23 收藏 0 点赞 0 评论 0

项目:tukio 作者: optiflows 项目源码 文件源码
def _get_workflow_from_task(task):
    """
    Looks for an instance of `Workflow` linked to the task or a method of
    `Workflow` among the done callbacks of the asyncio task. Returns None if
    not found.
    If the task was triggered from within a workflow it MUST have a `workflow`
    attribute that points to it and at least one done callback that is a method
    of `Workflow`.
    """
    if isinstance(task, TukioTask):
        workflow = task.workflow
    if workflow:
        return workflow

    for cb in task._callbacks:
        # inspect.getcallargs() gives access to the implicit 'self' arg of
        # the bound method but it is marked as deprecated since
        # Python 3.5.1 and the new `inspect.Signature` object does NOT do
        # the job :((
        if inspect.ismethod(cb):
            inst = cb.__self__
        elif isinstance(cb, functools.partial):
            try:
                inst = cb.func.__self__
            except AttributeError:
                continue
        else:
            continue
        if isinstance(inst, Workflow):
            return inst
    return None
评论列表
文章目录


问题


面经


文章

微信
公众号

扫码关注公众号