def broadcast(tensor, src, group=group.WORLD):
"""Broadcasts the tensor to the whole group.
``tensor`` must have the same number of elements in all processes
participating in the collective.
Arguments:
tensor (Tensor): Data to be sent if ``src`` is the rank of current
process, and tensor to be used to save received data otherwise.
src (int): Source rank.
group (optional): Group of the collective.
"""
assert torch.distributed._initialized == _INITIALIZED_PG, \
"collective only supported in process-group mode"
return torch._C._dist_broadcast(tensor, src, group)
python类distributed()的实例源码
def reduce_multigpu(tensor_list, dst, op=reduce_op.SUM, group=group.WORLD):
"""Reduces the tensor data on multiple GPUs across all machines. Each tensor
in tensor_list should reside on a separate GPU
Only the GPU of tensor_list[0] on the process with rank ``dst`` is
going to receive the final result.
Only nccl backend is currently supported
tensors should only be GPU tensors
Arguments:
tensor_list (List[Tensor]): Input and output GPU tensors of the
collective . The function operates in-place.
dst (int): Destination rank
op (optional): One of the values from ``torch.distributed.reduce_op``
enum. Specifies an operation used for element-wise reductions.
group (optional): Group of the collective.
"""
assert torch.distributed._initialized == _INITIALIZED_PG, \
"collective only supported in process-group mode"
warnings.warn("""
================================================================================
WARNING
================================================================================
reduce__multigpu is still experimental. The API will change without
notice and we're can't guarantee full correctness and expected performance yet.
We'll announce it once it's ready.
""")
return torch._C._dist_reduce_multigpu(tensor_list, dst, op, group)
def all_gather(tensor_list, tensor, group=group.WORLD):
"""Gathers tensors from the whole group in a list.
Arguments:
tensor_list (list[Tensor]): Output list. It should contain
correctly-sized tensors to be used for output of the collective.
tensor (Tensor): Tensor to be broadcast from current process.
group (optional): Group of the collective.
"""
assert torch.distributed._initialized == _INITIALIZED_PG, \
"collective only supported in process-group mode"
if _backend != dist_backend.NCCL:
return torch._C._dist_all_gather(tensor_list, tensor, group)
else:
return all_gather_multigpu([tensor_list], [tensor], group)
def scatter(tensor, **kwargs):
"""Scatters a list of tensors to all processes in a group.
Each process will receive exactly one tensor and store its data in the
``tensor`` argument.
Arguments:
tensor (Tensor): Output tensor.
src (int): Source rank. Required in all processes except the one that
is sending the data.
scatter_list (list[Tensor]): List of tensors to scatter. Required only
in the process that is sending the data.
group (optional): Group of the collective.
"""
assert torch.distributed._initialized == _INITIALIZED_PG, \
"collective only supported in process-group mode"
my_rank = get_rank()
src = kwargs.pop('src', my_rank)
scatter_list = kwargs.pop('scatter_list', None)
_group = kwargs.pop('group', group.WORLD)
if kwargs:
raise RuntimeError("got unexpected kwargs")
if src == my_rank:
if scatter_list is None:
raise RuntimeError("scatter_list is a required argument in scatter source")
return torch._C._dist_scatter_send(scatter_list, tensor, _group)
else:
if scatter_list:
raise RuntimeError("non-empty can be given only to scatter source")
return torch._C._dist_scatter_recv(tensor, src, _group)
def barrier(group=group.WORLD):
"""Synchronizes all processes.
This collective blocks processes until the whole group enters this function.
Arguments:
group (optional): Group of the collective.
"""
assert torch.distributed._initialized == _INITIALIZED_PG, \
"collective only supported in process-group mode"
return torch._C._dist_barrier(group)
def _clear_group_cache(group=group.WORLD):
"""Clear the created distributed group's cached resource
Only nccl backend is currently supported
Cached resource includes NCCL communicators and CUDA events
Arguments:
group (optional): Group of the collective.
"""
return torch._C._dist_clear_group_cache(group)
def _register_stream(stream):
if not _initialized:
raise RuntimeError("torch.distributed needs to be initialized first")
return torch._C._dist_register_stream(stream)
def init_process_group(backend, init_method='env://', **kwargs):
"""Initializes the distributed package.
Arguments:
backend (str): Name of the backend to use. Depending on build-time configuration
valid values include: ``tcp``, ``mpi`` and ``gloo``.
init_method (str, optional): URL specifying how to initialize the package.
world_size (int, optional): Number of processes participating in the job.
rank (int, optional): Rank of the current process.
group_name (str, optional): Group name. See description of init methods.
To enable ``backend == mpi``, PyTorch needs to built from source on a system that
supports MPI.
"""
world_size = kwargs.pop('world_size', -1)
group_name = kwargs.pop('group_name', '')
rank = kwargs.pop('rank', -1)
assert len(kwargs) == 0, "got unexpected keyword arguments: %s" % ",".join(kwargs.keys())
if not is_available():
raise RuntimeError("PyTorch built without distributed support")
global _initialized
if _initialized:
raise RuntimeError("trying to initialize torch.distributed twice!")
# Checking and assigning the distributed backend
global _backend
if backend == "tcp":
_backend = dist_backend.TCP
elif backend == "mpi":
_backend = dist_backend.MPI
elif backend == "gloo":
_backend = dist_backend.GLOO
elif backend == "nccl":
_backend = dist_backend.NCCL
else:
raise RuntimeError("Invalid distributed backend name: " + backend)
torch._C._dist_init_process_group(backend, init_method, world_size,
group_name, rank)
_initialized = _INITIALIZED_PG
if _backend == dist_backend.NCCL:
warnings.warn("""
================================================================================
WARNING
================================================================================
NCCL backend is still experimental. The APIs will change without
notice and we're can't guarantee full correctness and expected performance yet.
We'll announce it once it's ready.
""")
atexit.register(destroy_process_group)
if not torch._C._dist_init_extension(False, reduce_op, group):
raise RuntimeError("distributed module initialization failed")
def all_gather_multigpu(output_tensor_lists,
input_tensor_list,
group=group.WORLD):
"""Gathers tensors from the whole group in a list.
Each tensor in tensor_list should reside on a separate GPU
Only nccl backend is currently supported
tensors should only be GPU tensors
Arguments:
output_tensor_lists (List[List[Tensor]]): Output lists. It should
contain correctly-sized tensors on each GPU to be used for output of
the collective.
input_tensor_list (List[Tensor]): List of tensors(on different GPUs) to
be broadcast from current process.
group (optional): Group of the collective.
"""
assert torch.distributed._initialized == _INITIALIZED_PG, \
"collective only supported in process-group mode"
warnings.warn("""
================================================================================
WARNING
================================================================================
all_gather_multigpu is still experimental. The API will change without
notice and we're can't guarantee full correctness and expected performance yet.
We'll announce it once it's ready.
""")
flatten_tensor_list = []
for output_tensor_list in output_tensor_lists:
flatten_tensor_list.append(_flatten_dense_tensors(output_tensor_list))
ret = torch._C._dist_all_gather_multigpu(flatten_tensor_list,
input_tensor_list,
group)
for output_tensor_list, flatten_tensor in zip(output_tensor_lists,
flatten_tensor_list):
for tensor, value in zip(output_tensor_list,
_unflatten_dense_tensors(flatten_tensor,
output_tensor_list)):
tensor.copy_(value)
return ret