Updated April 7, 2023
Introduction to PyTorch Distributed
We have the torch. distributed in PyTorch, where researchers and developers can make their computations in parallel mode across each process and clusters in machine learning processes. Here, message passing semantics is leveraged, which helps any process to communicate with other processes through the messages. Different communication backends are used, and the communication doesn’t need to be from the same machine. But, of course, multiple processes should be started in parallel, and coordination tools in the cluster must be enabled.
PyTorch Distributed Overview
There are three main components in the torch. First, distributed as distributed data-parallel training, RPC-based distributed training, and collective communication. Distributed data-parallel training (DDP) is multiple training programs where the model is replicated in each process, and each model will have different input data fed into it. Model replicas are synchronized where DDP does gradient communication, and it helps to fasten the training with the computations of the gradient.
RPC helps in general training structures that cannot be used in DDP. This includes distributed data parallelism, parameter server, and those models where DDP cannot be combined with other training. In addition, remote object lifetime can be managed easily in RPC, and this can be used to extend autograd where machine boundaries can be extended.
Tensors sent across each process in a single group are supported in collective communication. We have collective communication APIs and P2P communication APIs here. Many training scenarios can be handled using DDP and RPC. Here we can manage the averages of the model parameters by making distributed parameter averaging, and here DDP need not be used to communicate with the gradients. Communications can be decoupled easily from each model, and there will be fine control over the communication in both DDP and RPC.
PyTorch Distributed Models
Code:
import os
import sys
import tempfile
import torch
import torch.distributed as dist
import torch.nn as nn
import torch.optim as optim
import torch.multiprocessing as mp
from torch.nn.parallel import DistributedDataParallel as DDP
def setup_model(rank01, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12675'
dist.init_process_group("gloo", rank=rank, world_size=world_size)
def cleanup_model():
dist.destroy_process_group()
class Model(nn.Module):
def __init__(self):
super(Model, self).__init__()
self.net01 = nn.Linear(5, 5)
self.relu = nn.ReLU()
self.net02 = nn.Linear(5, 2)
def forward_pass(self, a):
return self.net02(self.relu(self.net01(a)))
def demo (rank01, world_size):
print(f"Running basic DDP on rank01 {rank01}.")
setup(rank01, world_size)
model = Model().to(rank01)
ddp_model = DDP(model, device_ids=[rank01])
loss_function = nn.MSELoss()
optimize = optim.SGD(ddp_model.parameters(), lr=0.01)
optimize.zero_grad()
out = ddp_model(torch.randn(10, 10))
labels = torch.randn(10, 5).to(rank01)
loss_function(out, labels).backward()
optimize.step()
cleanup()
def run_demo(demo_function, world_size):
mp.spawn(demo_function,
args=(world_size,),
nprocs=world_size,
join=True)
def checkpoint(rank01, world_size):
print(f"Running checkpoint example on rank01 {rank01}.")
setup(rank01, world_size)
model = Model().to(rank01)
ddp_model = DDP(model, device_ids=[rank01])
loss_function = nn.MSELoss()
optimize = optim.SGD(ddp_model.parameters(), lr=0.01)
CHECKPOINT = tempfile.gettempdir() + "https://cdn.educba.com/model.checkpoint"
if rank == 0:
torch.save(ddp_model.state_dict(), CHECKPOINT)
dist.barrier()
map = {'cuda:%d' % 0: 'cuda:%d' % rank}
ddp_model.load_state_dict(
torch.load(CHECKPOINT, map_location=map))
optimize.zero_grad()
out = ddp_model(torch.randn(10, 10))
labels = torch.randn(10, 5).to(rank01)
loss_function = nn.MSELoss()
loss_function(out, labels).backward()
optimize.step()
if rank == 0:
os.remove(CHECKPOINT)
cleanup()
class Model(nn.Module):
def __init__(self, dev0, dev1):
super(Model, self).__init__()
self.dev0 = dev00
self.dev1 = dev01
self.net01 = torch.nn.Linear(5, 5).to(dev00)
self.relu01 = torch.nn.ReLU()
self.net02 = torch.nn.Linear(5, 5).to(dev01)
def forward_pass(self, x):
a = a.to(self.dev00)
a = self.relu(self.net01(a))
a = a.to(self.dev01)
return self.net02(a)
def demo_model(rank01, world_size):
print(f"Running with model parallel example on rank01 {rank01}.")
setup(rank01, world_size)
dev00 = (rank * 2) % world_size
dev01 = (rank * 2 + 1) % world_size
mp_model01 = Model(dev00, dev01)
mp_model = DDP(model)
loss_function = nn.MSELoss()
optimize = optim.SGD(ddp_ model.parameters(), lr=0.01)
optimize.zero_grad()
out = ddp_model(torch.randn(10, 10))
labels = torch.randn(10, 5).to(dev01)
loss_function(outputs, labels).backward_pass()
optimize.step()
cleanup()
PyTorch Distributed Strategy
- In data parallel training, the following points should be noted. First, if training speed doesn’t matter and if the data and model can fit inside one GPU, it is better to use one device training. Single machine multi-GPU can be used where multiple GPUs can be used in a single machine. This helps in fastening the training with fewer code changes in the system. Third, Multimachine and distributed data-parallel can be used if more codes can be written and if the application must be used in various machine boundaries. If errors are present in the model, it is better to use the torch.distributed.elastic, where resources leaving and joining dynamically, will not cause problems.
- We have a data-parallel package in torch with fewer codes and multiple GPUs. Only one line needs to be changed in the code. Model is replicated in all the forward pass, and there is a GIL convention in the code. However, it does not offer better performance.
- RPC has four main categories such as RRef, Distributed Autograd, Distributed Optimizer, and RPC. Given function can be run in a remote worker using RPC, whereas RRef can manage the lifetime of the same. Parameters are updated using Distributed Optimizer using gradients from the autograd engine.
Add the Distributed Initialization
We should initialize the package first so that all the processes will be done together.
torch.distributed.is_available()
We must initialize the process group and the distributed package. Store, rank, and world_size should be stored separately, and the init method should be specified. We should also store the URL parameters and specify the rank and world_size. Following parameters should be needed to initialize the distributed package. Init_method, rank, and world_size parameters are optional, which describes the initialization of the process group, the number of processes in the job, and the rank of the current process.
Build time configurations are needed in the initialization process where parameters such as mpi, gloo, and nccl are used. Each process should have separate access for GPU because it might create deadlocks while using the same GPU for all processes. The timeout parameter is also an optional one where the default value is given as 30 minutes. Connection information is exchanged using the store parameter, and a group name is passed using the group_name parameter. Finally, process group options are captured using pg_options.
Conclusion
Autograd can be enabled in PyTorch Distributed that helps to do collective communications between all the modules and classes. The autograd graph captures functions, and there will not be a deadlock as the functions are managed carefully by autograd. All the functions work perfectly in a backward pass now, and this is the best way to get the work done.
Recommended Articles
We hope that this EDUCBA information on “PyTorch Distributed” was beneficial to you. You can view EDUCBA’s recommended articles for more information.