python类distributed()的实例源码

__init__.py 文件源码 项目:pytorch 作者: pytorch 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def broadcast(tensor, src, group=group.WORLD):
    """Broadcasts the tensor to the whole group.

    ``tensor`` must have the same number of elements in all processes
    participating in the collective.

    Arguments:
        tensor (Tensor): Data to be sent if ``src`` is the rank of current
            process, and tensor to be used to save received data otherwise.
        src (int): Source rank.
        group (optional): Group of the collective.
    """
    assert torch.distributed._initialized == _INITIALIZED_PG, \
        "collective only supported in process-group mode"
    return torch._C._dist_broadcast(tensor, src, group)
__init__.py 文件源码 项目:pytorch 作者: pytorch 项目源码 文件源码 阅读 30 收藏 0 点赞 0 评论 0
def reduce_multigpu(tensor_list, dst, op=reduce_op.SUM, group=group.WORLD):
    """Reduces the tensor data on multiple GPUs across all machines. Each tensor
    in tensor_list should reside on a separate GPU

    Only the GPU of tensor_list[0] on the process with rank ``dst`` is
    going to receive the final result.

    Only nccl backend is currently supported
    tensors should only be GPU tensors

    Arguments:
        tensor_list (List[Tensor]): Input and output GPU tensors of the
            collective . The function operates in-place.
        dst (int): Destination rank
        op (optional): One of the values from ``torch.distributed.reduce_op``
            enum.  Specifies an operation used for element-wise reductions.
        group (optional): Group of the collective.
    """
    assert torch.distributed._initialized == _INITIALIZED_PG, \
        "collective only supported in process-group mode"

    warnings.warn("""
    ================================================================================
                                        WARNING
    ================================================================================
    reduce__multigpu is still experimental. The API will change without
    notice and we're can't guarantee full correctness and expected performance yet.
    We'll announce it once it's ready.
    """)

    return torch._C._dist_reduce_multigpu(tensor_list, dst, op, group)
__init__.py 文件源码 项目:pytorch 作者: pytorch 项目源码 文件源码 阅读 37 收藏 0 点赞 0 评论 0
def all_gather(tensor_list, tensor, group=group.WORLD):
    """Gathers tensors from the whole group in a list.

    Arguments:
        tensor_list (list[Tensor]): Output list. It should contain
            correctly-sized tensors to be used for output of the collective.
        tensor (Tensor): Tensor to be broadcast from current process.
        group (optional): Group of the collective.
    """
    assert torch.distributed._initialized == _INITIALIZED_PG, \
        "collective only supported in process-group mode"
    if _backend != dist_backend.NCCL:
        return torch._C._dist_all_gather(tensor_list, tensor, group)
    else:
        return all_gather_multigpu([tensor_list], [tensor], group)
__init__.py 文件源码 项目:pytorch 作者: pytorch 项目源码 文件源码 阅读 41 收藏 0 点赞 0 评论 0
def scatter(tensor, **kwargs):
    """Scatters a list of tensors to all processes in a group.

    Each process will receive exactly one tensor and store its data in the
    ``tensor`` argument.

    Arguments:
        tensor (Tensor): Output tensor.
        src (int): Source rank. Required in all processes except the one that
            is sending the data.
        scatter_list (list[Tensor]): List of tensors to scatter. Required only
            in the process that is sending the data.
        group (optional): Group of the collective.
    """
    assert torch.distributed._initialized == _INITIALIZED_PG, \
        "collective only supported in process-group mode"
    my_rank = get_rank()
    src = kwargs.pop('src', my_rank)
    scatter_list = kwargs.pop('scatter_list', None)
    _group = kwargs.pop('group', group.WORLD)
    if kwargs:
        raise RuntimeError("got unexpected kwargs")
    if src == my_rank:
        if scatter_list is None:
            raise RuntimeError("scatter_list is a required argument in scatter source")
        return torch._C._dist_scatter_send(scatter_list, tensor, _group)
    else:
        if scatter_list:
            raise RuntimeError("non-empty can be given only to scatter source")
        return torch._C._dist_scatter_recv(tensor, src, _group)
__init__.py 文件源码 项目:pytorch 作者: pytorch 项目源码 文件源码 阅读 38 收藏 0 点赞 0 评论 0
def barrier(group=group.WORLD):
    """Synchronizes all processes.

    This collective blocks processes until the whole group enters this function.

    Arguments:
        group (optional): Group of the collective.
    """
    assert torch.distributed._initialized == _INITIALIZED_PG, \
        "collective only supported in process-group mode"
    return torch._C._dist_barrier(group)
__init__.py 文件源码 项目:pytorch 作者: pytorch 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def _clear_group_cache(group=group.WORLD):
    """Clear the created distributed group's cached resource

    Only nccl backend is currently supported

    Cached resource includes NCCL communicators and CUDA events

    Arguments:
        group (optional): Group of the collective.
    """
    return torch._C._dist_clear_group_cache(group)
__init__.py 文件源码 项目:pytorch 作者: pytorch 项目源码 文件源码 阅读 26 收藏 0 点赞 0 评论 0
def _register_stream(stream):
    if not _initialized:
        raise RuntimeError("torch.distributed needs to be initialized first")
    return torch._C._dist_register_stream(stream)
__init__.py 文件源码 项目:pytorch 作者: pytorch 项目源码 文件源码 阅读 19 收藏 0 点赞 0 评论 0
def init_process_group(backend, init_method='env://', **kwargs):
    """Initializes the distributed package.

    Arguments:
        backend (str): Name of the backend to use. Depending on build-time configuration
            valid values include: ``tcp``, ``mpi`` and ``gloo``.
        init_method (str, optional): URL specifying how to initialize the package.
        world_size (int, optional): Number of processes participating in the job.
        rank (int, optional): Rank of the current process.
        group_name (str, optional): Group name. See description of init methods.

    To enable ``backend == mpi``, PyTorch needs to built from source on a system that
    supports MPI.

    """
    world_size = kwargs.pop('world_size', -1)
    group_name = kwargs.pop('group_name', '')
    rank = kwargs.pop('rank', -1)
    assert len(kwargs) == 0, "got unexpected keyword arguments: %s" % ",".join(kwargs.keys())

    if not is_available():
        raise RuntimeError("PyTorch built without distributed support")

    global _initialized
    if _initialized:
        raise RuntimeError("trying to initialize torch.distributed twice!")

    # Checking and assigning the distributed backend
    global _backend

    if backend == "tcp":
        _backend = dist_backend.TCP
    elif backend == "mpi":
        _backend = dist_backend.MPI
    elif backend == "gloo":
        _backend = dist_backend.GLOO
    elif backend == "nccl":
        _backend = dist_backend.NCCL
    else:
        raise RuntimeError("Invalid distributed backend name: " + backend)

    torch._C._dist_init_process_group(backend, init_method, world_size,
                                      group_name, rank)
    _initialized = _INITIALIZED_PG

    if _backend == dist_backend.NCCL:
        warnings.warn("""
        ================================================================================
                                            WARNING
        ================================================================================
        NCCL backend is still experimental. The APIs will change without
        notice and we're can't guarantee full correctness and expected performance yet.
        We'll announce it once it's ready.
        """)
        atexit.register(destroy_process_group)

    if not torch._C._dist_init_extension(False, reduce_op, group):
        raise RuntimeError("distributed module initialization failed")
__init__.py 文件源码 项目:pytorch 作者: pytorch 项目源码 文件源码 阅读 20 收藏 0 点赞 0 评论 0
def all_gather_multigpu(output_tensor_lists,
                        input_tensor_list,
                        group=group.WORLD):
    """Gathers tensors from the whole group in a list.
    Each tensor in tensor_list should reside on a separate GPU

    Only nccl backend is currently supported
    tensors should only be GPU tensors

    Arguments:
        output_tensor_lists (List[List[Tensor]]): Output lists. It should
            contain correctly-sized tensors on each GPU to be used for output of
            the collective.
        input_tensor_list (List[Tensor]): List of tensors(on different GPUs) to
            be broadcast from current process.
        group (optional): Group of the collective.
    """
    assert torch.distributed._initialized == _INITIALIZED_PG, \
        "collective only supported in process-group mode"

    warnings.warn("""
    ================================================================================
                                        WARNING
    ================================================================================
    all_gather_multigpu is still experimental. The API will change without
    notice and we're can't guarantee full correctness and expected performance yet.
    We'll announce it once it's ready.
    """)

    flatten_tensor_list = []
    for output_tensor_list in output_tensor_lists:
        flatten_tensor_list.append(_flatten_dense_tensors(output_tensor_list))

    ret = torch._C._dist_all_gather_multigpu(flatten_tensor_list,
                                             input_tensor_list,
                                             group)

    for output_tensor_list, flatten_tensor in zip(output_tensor_lists,
                                                  flatten_tensor_list):
        for tensor, value in zip(output_tensor_list,
                                 _unflatten_dense_tensors(flatten_tensor,
                                                          output_tensor_list)):
            tensor.copy_(value)

    return ret


问题


面经


文章

微信
公众号

扫码关注公众号