文章目录
- 分布式训练
- Overview
- DP or DDP
-
- TCP 初始化
-
- ENV 初始化
- 可选后端
- 进程间通信操作
- Template
区分概念:Machine vs. Device. 多机(Machine) 是指在不同主机上训练。每个主机有自己的 ip 。通过向 pytorch 提供 Master 的 ip:port 同步各个机器。多卡 (Device) 是指在同一台机器上有多张显卡。
分布式训练
采用 .to(cuda:%d)
方法指定GPU。eg.
cuda1 = torch.device("cuda:1")
data = data.to(cuda1)
Overview
三种分布式训练方法:
- DDP(Distributed Data Parallel,使用最多)
Single-Program; Multi-data training。会在每一个进程中拷贝一份模型,并且会用不同的数据拟合不同进程中的模型。DDP 会同步不同进程中模型的参数。
适用更通用的训练框架,此时 DDP 无法满足要求。RPC 将自动微分器(autograd) 拓展到其它远程机器上。
进程间通讯。用于收集不同进程(卡)上的数据做整体处理。不同后端支持的操作不同。
DP or DDP
DP
Single Machine, Multi GPU。 改动代码少。但是由于其本质是 Single Process,Multi-Thread,受到 Python GIL(全局解释锁)影响,运行速度受限。
eg.
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
model = nn.DataParallel(model)
model.to(device)
for data in data_loader:
input = data.to(device)
output = model(input)
总结:DP 将数据自动分成等大小的若干份(与 GPU 数量有关)。在不同 GPU 上传入模型(多线程)。每个模型运行过程中的 Batch 是 batch_size / num_gpu
。等到每个 GPU 上模型结束后,DP 再收集所有的结果,拼接起来,所以上面 output
的 Batch 仍然是 batch_size
DDP
使用 DDP 时,需要开启多个线程,并在每个线程中创建一个 DDP 对象。由于 DDP 是多进程,所以相较于 DP ,需要先初始化进程组(init_process_group
),这就需要用到 Pytorch 多进程模块。以下是初始化进程的例子:
import torch.multiprocessing as mp
def setup(rank, world_size):
os.environ['MASTER_ADDR'] = 'localhost'
os.environ['MASTER_PORT'] = '12355'
dist.init_process_group("gloo", rank=rank, world_size=world_size)
torch.manual_seed(42)
def cleanup():
dist.destroy_process_group()
def train(rank, world_size):
setup(rank, world_size)
device = torch.device("cuda:%d"%(rank))
model = nn.Linear(10, 10).to(device)
ddp_model = DDP(model, device_ids=[rank])
cleanup()
def run_multi(fn, world_size):
mp.spawn(fn, args=(world_size,), nprocs=world_size, join=True)
if __name__ == '__main__':
run_multi(train, 4)
DDP
创建了当前进程中模型的引用,并将 rank 0 进程模型的 state_dict
复制到其余所有进程内的模型。之后,DDP
在每一个进程内创建Reducer
,负责同步梯度。Reducer
会为每个参数创建 autograd hook
。当参数的梯度就绪时,这些 hooks 会被调用。Reducer
会将参数按组从后往前排列(Bucket),每一组大小由 bucket_cap_mb
指定。- 如果设置
find_unused_parameters
,DDP
会分析前向传播时模型中哪些参数没有用到。反向传播时仅会在模型的一个子图上运行(忽略没有用到的参数)。没有用到的参数会直接被标记为就绪,让 Reducer
同步。 - 反向传播时,当一组中的所有参数梯度全部就绪时,
Reducer
阻塞调用 allreduce
计算所有进程组梯度的均值,然后写入当前 param.grad
。所以当 .backward()
执行完后,所有的 .grad
就已经是同步,可以用 Optimizer 更新。
注意:Construtor, Forward Pass, Backward Pass 都有同步点,保证每个进程都能同时到达这些步骤。采用 dist.barrier()
阻塞同步所有进程。
def average_gradient(model):
size = float(dist.get_world_size())
for param in model.parameters():
dist.all_reduce_multigpu([param.grad.data], op=dist.ReduceOp.SUM)
param.grad.data /= size
初始化的时候,将一个线程中的模型随机初始化并保存,然后其余线程导入这个模型
保存时,只需要保存一个线程中的模型 (一般为 rank 0)
if rank == 0:
torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)
map_location = {'cuda:%d'% 0:'cuda:%d'%rank}
ddp_model.load_state_dict(
torch.load(CHECKPOINT_PATH, map_location=map_location)
)
optimizer.zero_grad()
group : 进程组, 也是一个 world。world_size
所有的进程数。
local_rank : 每台机器上进程编号。global_rank :在所有进程中,该进程的编号。
具体参考这里。
最后在 DDP
构造函数那里卡住。可以参考这里。最后觉得应该是 NCCL 和 CUDA 版本不对。
TCP 初始化
不同主机上多 GPU 训练(TCP 方式):
import torch.distributed as dist
import torch.utils.data.distributed
parser = argparse.ArgumentParser(description='PyTorch distributed training on cifar-10')
parser.add_argument('--rank', default=0,
help='rank of current process')
parser.add_argument('--word_size', default=2,
help="word size")
parser.add_argument('--init_method', default='tcp://127.0.0.1:23456',
help="init-method")
args = parser.parse_args()
if mp.get_start_method(allow_none=True) is None:
mp.set_start_method('spawn')
dist.init_process_group(backend='nccl', init_method=args.init_method, rank=args.rank, world_size=args.word_size)
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=download, transform=transform)
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, sampler=train_sampler)
net = Net()
net = net.cuda()
net = torch.nn.parallel.DistributedDataParallel(net)
每台主机上可以开启多个进程。但是,若未为每个进程分配合适的 GPU,则同机不同进程可能会共用 GPU,应该坚决避免这种情况。直接用 python 解释器启动各个脚本
Moco TCP 初始化例子
以 Moco 为例展示:
- 主进程启动分进程。使用
torch.multiprocessing.spawn()
mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))
在每台机器上启动 ngpus_per_node
进程。使用 spawn
的函数要求传入 f(i, *args)
,i
代表第 i 个进程。
- 子进程设置 GPU
def main_worker(gpu, ngpus_per_node, args):
args.gpu = gpu
if args.distributed:
if args.dist_url == "env://" and args.rank == -1:
args.rank = int(os.environ["RANK"])
if args.multiprocessing_distributed:
args.rank = args.rank * ngpus_per_node + gpu
dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
world_size=args.world_size, rank=args.rank)
if args.distributed:
if args.gpu is not None:
torch.cuda.set_device(args.gpu)
model.cuda(args.gpu)
args.batch_size = int(args.batch_size / ngpus_per_node)
args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)
model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
- 数据划分
分布式训练时,每个进程都会创建数据集。但是每个进程只获取数据集中的一个子集用来训练。
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
num_workers=args.workers, pin_memory=True, sampler=train_sampler, drop_last=True)
为了保证能够复现精度,一般会在各个 Epoch 开始时调用 train_sampler.set_epoch(epoch)
- 启动
python main_lincls.py \
-a resnet50 \
--lr 30.0 \
--batch-size 256 \
--pretrained [your checkpoint path]/checkpoint_0199.pth.tar \
--dist-url 'tcp://localhost:10001' --multiprocessing-distributed \
--world-size 1 --rank 0 \
如果两台机器
python main_lincls.py
--dist-url 'tcp://192.168.1.1:10001' --multiprocessing-distributed \
--world-size 2 --rank 0 \
python main_lincls.py
--dist-url 'tcp://192.168.1.1:10001' --multiprocessing-distributed \
--world-size 2 --rank 1 \
ENV 初始化
import torch.distributed as dist
import torch.utils.data.distributed
import argparse
parser = argparse.ArgumentParser()
parser.add_argument("--local_rank", type=int)
args = parser.parse_args()
dist.init_process_group(backend='nccl', init_method='env://')
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=download, transform=transform)
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, sampler=train_sampler)
net = Net()
device = torch.device('cuda', args.local_rank)
net = net.to(device)
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[args.local_rank], output_device=args.local_rank)
启动方式
python -m torch.distributed.launch --nproc_per_node=2 --nnodes=3 --node_rank=0 --master_addr="192.168.1.201" --master_port=23456 env_init.py
创建 nnodes 个node, 每个 node 有 nproc_per_node 个进程(一般为 GPU 数量),每个进程独立执行脚本训练。 node rank 确定节点的优先级, 以 0 为主节点,使用其 addr:port 作为 master 的参数 (可以用为局域网内训练), 会自动分配 node 内的各线程优先级 (local_rank)
可选后端
进程间通信操作
multigpu 代表不同进程间,不同 GPU 上有 shape 相同的 Tensor, 可以通过此求和,求平均。
torch.distributed.new_group 可以将各优先级的进程组建成新组,在这些新组中进行后面的组间信息交流。返回一个 group object
Template
在 PCDet 中可以使用 distributed training.
本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)