Pytorch 分布式训练

2023-05-16

文章目录

  • 分布式训练
    • Overview
    • DP or DDP
      • DP
      • DDP
    • TCP 初始化
      • Moco TCP 初始化例子
    • ENV 初始化
    • 可选后端
    • 进程间通信操作
    • Template

区分概念:Machine vs. Device. 多机(Machine) 是指在不同主机上训练。每个主机有自己的 ip 。通过向 pytorch 提供 Master 的 ip:port 同步各个机器。多卡 (Device) 是指在同一台机器上有多张显卡。

分布式训练

采用 .to(cuda:%d) 方法指定GPU。eg.

cuda1 = torch.device("cuda:1")
data = data.to(cuda1)

Overview

三种分布式训练方法:

  • DDP(Distributed Data Parallel,使用最多)

Single-Program; Multi-data training。会在每一个进程中拷贝一份模型,并且会用不同的数据拟合不同进程中的模型。DDP 会同步不同进程中模型的参数。

  • RPC

适用更通用的训练框架,此时 DDP 无法满足要求。RPC 将自动微分器(autograd) 拓展到其它远程机器上。

  • Collective Communication

进程间通讯。用于收集不同进程(卡)上的数据做整体处理。不同后端支持的操作不同。

DP or DDP

DP

  • torch.nn.DataParallel

Single Machine, Multi GPU。 改动代码少。但是由于其本质是 Single Process,Multi-Thread,受到 Python GIL(全局解释锁)影响,运行速度受限。

eg.

# 先预先选定一个 GPU
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu") 

# ... 构建模型
model = nn.DataParallel(model)
model.to(device)

# Train Loop
for data in data_loader:
	input = data.to(device)
	output = model(input)

总结:DP 将数据自动分成等大小的若干份(与 GPU 数量有关)。在不同 GPU 上传入模型(多线程)。每个模型运行过程中的 Batch 是 batch_size / num_gpu。等到每个 GPU 上模型结束后,DP 再收集所有的结果,拼接起来,所以上面 output 的 Batch 仍然是 batch_size

DDP

使用 DDP 时,需要开启多个线程,并在每个线程中创建一个 DDP 对象。由于 DDP 是多进程,所以相较于 DP ,需要先初始化进程组(init_process_group),这就需要用到 Pytorch 多进程模块。以下是初始化进程的例子:

import torch.multiprocessing as mp

# setup environment
def setup(rank, world_size):
    os.environ['MASTER_ADDR'] = 'localhost'
    os.environ['MASTER_PORT'] = '12355'
    dist.init_process_group("gloo", rank=rank, world_size=world_size)
    torch.manual_seed(42)
    
# clean up environment
def cleanup():
    dist.destroy_process_group()
    
# 具体的训练过程, world_size 是总共的线程数
def train(rank, world_size):
    setup(rank, world_size)
    device = torch.device("cuda:%d"%(rank)) # 一般每个进程占用一块 GPU
    
    # build a simple model
    model = nn.Linear(10, 10).to(device)
    # construct DDP model
    ddp_model = DDP(model, device_ids=[rank])

	# ... Forward / Backward / Update Optimizer / Save etc
 
    cleanup()
    
# main 中调用
def run_multi(fn, world_size):
    mp.spawn(fn, args=(world_size,), nprocs=world_size, join=True)

if __name__ == '__main__':
   run_multi(train, 4)
  1. DDP 创建了当前进程中模型的引用,并将 rank 0 进程模型的 state_dict 复制到其余所有进程内的模型。之后,DDP在每一个进程内创建Reducer,负责同步梯度。Reducer 会为每个参数创建 autograd hook。当参数的梯度就绪时,这些 hooks 会被调用。Reducer 会将参数按组从后往前排列(Bucket),每一组大小由 bucket_cap_mb 指定。
  2. 如果设置 find_unused_parametersDDP 会分析前向传播时模型中哪些参数没有用到。反向传播时仅会在模型的一个子图上运行(忽略没有用到的参数)。没有用到的参数会直接被标记为就绪,让 Reducer 同步。
  3. 反向传播时,当一组中的所有参数梯度全部就绪时,Reducer 阻塞调用 allreduce 计算所有进程组梯度的均值,然后写入当前 param.grad。所以当 .backward() 执行完后,所有的 .grad 就已经是同步,可以用 Optimizer 更新。

注意:Construtor, Forward Pass, Backward Pass 都有同步点,保证每个进程都能同时到达这些步骤。采用 dist.barrier() 阻塞同步所有进程。

  • 手动进行进程间通信
# 还有一个进程间通信的方法
def average_gradient(model):
    size = float(dist.get_world_size())
    for param in model.parameters():
        dist.all_reduce_multigpu([param.grad.data], op=dist.ReduceOp.SUM)  # 将所有进程中 Tensor 求和并存储于个进程中
        param.grad.data /= size
  • save and load

初始化的时候,将一个线程中的模型随机初始化并保存,然后其余线程导入这个模型
保存时,只需要保存一个线程中的模型 (一般为 rank 0)

# 仅保存 rank 0 的模型
if rank == 0:
	torch.save(ddp_model.state_dict(), CHECKPOINT_PATH)

# 将 rank 0 映射到当前进程的 GPU
map_location = {'cuda:%d'% 0:'cuda:%d'%rank}
ddp_model.load_state_dict(
	torch.load(CHECKPOINT_PATH, map_location=map_location)
)

optimizer.zero_grad()
  • 多机训练

group : 进程组, 也是一个 world。world_size 所有的进程数。

local_rank : 每台机器上进程编号。global_rank :在所有进程中,该进程的编号。

具体参考这里。

最后在 DDP 构造函数那里卡住。可以参考这里。最后觉得应该是 NCCL 和 CUDA 版本不对。

TCP 初始化

不同主机上多 GPU 训练(TCP 方式):

import torch.distributed as dist
import torch.utils.data.distributed

# ......
parser = argparse.ArgumentParser(description='PyTorch distributed training on cifar-10')
parser.add_argument('--rank', default=0,
                    help='rank of current process')
parser.add_argument('--word_size', default=2,
                    help="word size")
parser.add_argument('--init_method', default='tcp://127.0.0.1:23456',
                    help="init-method")
args = parser.parse_args()

# 初始化组中第rank个进程, icp 方法下所有 ip:port 必须与主进程保持一致
if mp.get_start_method(allow_none=True) is None:
    mp.set_start_method('spawn')
dist.init_process_group(backend='nccl', init_method=args.init_method, rank=args.rank, world_size=args.word_size)

# the sampler process, DS 将数据集划分为几个互不相交的子集
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=download, transform=transform)
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, sampler=train_sampler)

# ......
net = Net()
net = net.cuda()
net = torch.nn.parallel.DistributedDataParallel(net)

每台主机上可以开启多个进程。但是,若未为每个进程分配合适的 GPU,则同机不同进程可能会共用 GPU,应该坚决避免这种情况。直接用 python 解释器启动各个脚本

Moco TCP 初始化例子

以 Moco 为例展示:

  1. 主进程启动分进程。使用 torch.multiprocessing.spawn()
# Use torch.multiprocessing.spawn to launch distributed processes: the
# main_worker process function
mp.spawn(main_worker, nprocs=ngpus_per_node, args=(ngpus_per_node, args))

在每台机器上启动 ngpus_per_node 进程。使用 spawn 的函数要求传入 f(i, *args)i 代表第 i 个进程。

  1. 子进程设置 GPU
def main_worker(gpu, ngpus_per_node, args):
    args.gpu = gpu # 依据进程号设置 GPU

	if args.distributed:
		# ENV 启动
        if args.dist_url == "env://" and args.rank == -1:
            args.rank = int(os.environ["RANK"])
        # TCP 多机启动,计算当前总进程号
        if args.multiprocessing_distributed:
            # For multiprocessing distributed training, rank needs to be the
            # global rank among all the processes
            args.rank = args.rank * ngpus_per_node + gpu
        # 初始化进程组
        dist.init_process_group(backend=args.dist_backend, init_method=args.dist_url,
                                world_size=args.world_size, rank=args.rank)
   	if args.distributed:
        # For multiprocessing distributed, DistributedDataParallel constructor
        # should always set the single device scope, otherwise,
        # DistributedDataParallel will use all available devices.
        if args.gpu is not None:
            torch.cuda.set_device(args.gpu)
            model.cuda(args.gpu)
            # When using a single GPU per process and per
            # DistributedDataParallel, we need to divide the batch size
            # ourselves based on the total number of GPUs we have
            args.batch_size = int(args.batch_size / ngpus_per_node)
            args.workers = int((args.workers + ngpus_per_node - 1) / ngpus_per_node)
            # 将当前模型分配到对应 GPU id 上。
            model = torch.nn.parallel.DistributedDataParallel(model, device_ids=[args.gpu])
  1. 数据划分

分布式训练时,每个进程都会创建数据集。但是每个进程只获取数据集中的一个子集用来训练。

# Sampler 会自动获取进程组信息,详细见 Pytorch 文档。
train_sampler = torch.utils.data.distributed.DistributedSampler(train_dataset)
# 总的 Batch Size 是各卡 Batch Size 之和。
train_loader = torch.utils.data.DataLoader(
        train_dataset, batch_size=args.batch_size, shuffle=(train_sampler is None),
        num_workers=args.workers, pin_memory=True, sampler=train_sampler, drop_last=True)

为了保证能够复现精度,一般会在各个 Epoch 开始时调用 train_sampler.set_epoch(epoch)

  1. 启动
python main_lincls.py \
  -a resnet50 \
  --lr 30.0 \
  --batch-size 256 \
  --pretrained [your checkpoint path]/checkpoint_0199.pth.tar \
  # Master [ip]:[port]
  --dist-url 'tcp://localhost:10001' --multiprocessing-distributed \
  # world-size -- 所有机器; Rank 当前机器编号
  --world-size 1 --rank 0 \

如果两台机器

# Machine 1
python main_lincls.py
  # Master [ip]:[port]
  --dist-url 'tcp://192.168.1.1:10001' --multiprocessing-distributed \
  # world-size -- 所有机器; Rank 当前机器编号
  --world-size 2 --rank 0 \
# Machine 2
python main_lincls.py
  # Master [ip]:[port]
  --dist-url 'tcp://192.168.1.1:10001' --multiprocessing-distributed \
  # world-size -- 所有机器; Rank 当前机器编号
  --world-size 2 --rank 1 \

ENV 初始化

import torch.distributed as dist
import torch.utils.data.distributed

# ......
import argparse
parser = argparse.ArgumentParser()
# 注意这个参数,必须要以这种形式指定,即使代码中不使用。因为 launch 工具默认传递该参数
parser.add_argument("--local_rank", type=int)
args = parser.parse_args()

# ......
dist.init_process_group(backend='nccl', init_method='env://')

# ......
trainset = torchvision.datasets.CIFAR10(root='./data', train=True, download=download, transform=transform)
train_sampler = torch.utils.data.distributed.DistributedSampler(trainset)
trainloader = torch.utils.data.DataLoader(trainset, batch_size=batch_size, sampler=train_sampler)

# ......
# 根据 local_rank,配置当前进程使用的 GPU
net = Net()
device = torch.device('cuda', args.local_rank)
net = net.to(device)
net = torch.nn.parallel.DistributedDataParallel(net, device_ids=[args.local_rank], output_device=args.local_rank)

启动方式

python -m torch.distributed.launch --nproc_per_node=2 --nnodes=3 --node_rank=0 --master_addr="192.168.1.201" --master_port=23456 env_init.py

创建 nnodes 个node, 每个 node 有 nproc_per_node 个进程(一般为 GPU 数量),每个进程独立执行脚本训练。 node rank 确定节点的优先级, 以 0 为主节点,使用其 addr:port 作为 master 的参数 (可以用为局域网内训练), 会自动分配 node 内的各线程优先级 (local_rank)

可选后端

进程间通信操作

multigpu 代表不同进程间,不同 GPU 上有 shape 相同的 Tensor, 可以通过此求和,求平均。

torch.distributed.new_group 可以将各优先级的进程组建成新组,在这些新组中进行后面的组间信息交流。返回一个 group object

Template

在 PCDet 中可以使用 distributed training.

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

Pytorch 分布式训练 的相关文章

  • Linux 网络编程——UDP编程

    一 概述 UDP 是 User Datagram Protocol 的简称 xff0c 中文名是用户数据报协议 xff0c 是一个简单的面向数据报的运输层协议 xff0c 在网络中用于处理数据包 xff0c 是一种无连接的协议 UDP 不提
  • 给指定的寄存器地址:0x0001eea7 ,赋值

    coretexM0平台上给指定的寄存器地址 xff1a 0x0001eea7 赋值100 怎么实现 xff1f xff08 volatile char xff09 0x0001eea7 61 100 xff1b 常见错误1 xff1a xf
  • 常见cmake命令总结

    常见cmake命令总结 cmake常见命令 cmake minimum required 指定CMake的最小版本要求 cmake minimum required VERSION 2 8 project 定义工程名称 project PR
  • 利用Qt Phonon框架制作音视频播放器

    Phonon严格来说其实非为Qt的library xff0c Phonon原本就是KDE 4的开放源代码多媒体API xff0c 後来与Qt合并与开发 xff0c 所以简单来说就是Qt使用Phonon这个多媒体框架来提供一般影音多媒体档案的
  • 主设备号和次设备号

    Linux的设备管理是和文件系统紧密结合的 xff0c 各种设备都以文件的形式存放在 dev目录下 xff0c 称为设备文件 应用程序可以打开 关闭和读写这些设备文件 xff0c 完成对设备的操作 xff0c 就像操作普通的数据文件一样 为
  • Makefile中的wildcard用法

    在Makefile规则中 xff0c 通配符会被自动展开 但在变量的定义和函数引用时 xff0c 通配符将失效 这种情况下如果需要通配符有效 xff0c 就需要使用函数 wildcard xff0c 它的用法是 xff1a wildcard
  • GPIO

    一 什么是GPIO xff1f 首先应该理解什么是GPIO GPIO xff0c 英文全称为General Purpose IO ports xff0c 也就是通用IO口 在嵌入式系统中常常有数量众多 xff0c 但是结构却比较简单的外部设
  • UART

    一 S3C2410内置的UART控制器 S3C2410内部具有3个独立的UART控制器 xff0c 每个控制器都可以工作在Interrupt xff08 中断 xff09 模式或DMA xff08 直接内存访问 xff09 模式 xff0c
  • vivi源代码最为详细分析(二)

    现在进入bootloader之vivi分析的第二阶段 xff0c 这部分使用C语言实现 xff0c 部分代码采取内嵌汇编的方式 这里需要用到GNU GCC内嵌汇编 的知识 xff0c 这部分基础还没有具备 xff0c 需要学习 下面先按照流
  • vivi源代码最为详细分析(三)

    step 5 xff1a MTD设备初始化 关于什么是MTD xff0c 为什么要使用MTD xff0c MTD技术的架构是什么 xff0c 等等 xff0c 可以参考 Linux MTD源代码分析 xff08 作者 xff1a Jim Z
  • 【笔记】docker-compose.yml 文件更改后重新启动加载更改后的内容

    docker compose yml 文件更改后想使之立刻生效 xff0c 但是不想手动删除已经建立的 container 等信息可以运行下面命令 重新创建容器实现修改 docker compose up force recreate d
  • bootloader开发阶段总结以及可能会碰到的问题

    到今天 xff0c vivi源代码基本分析完毕 对bootloader有了更深层的认识 在此期间 xff0c 仔细阅读了毛德操 胡希明先生编著的 嵌入式系统 采用公开源代码和StrongARM XScale处理器 第七章 xff1a 嵌入式
  • 用模式一实现用户登录功能

    login jsp为显示登录表单和处理登录请求的页面 xff0c 登录成功后跳转到logonSuccess jsp页面 xff0c 登录失败时重新显示登录表单和失败的帮助信息 xff0c 以便用户重新登录 xff1b 如果已登录用户再次访问
  • UDP通信 (C语言实现)

    直接看代码吧 v乛 乛 udp server c 文件信息 文 件 名 udp server c 创 建 人 文件创建日期 年 月 日 描 述 UDP 回射服务器程序
  • jni中使用extern "C"的原因

    首先 cplusplus这个宏是微软自定义宏 xff0c 大小是个整数 xff1a cplusplus This macro is defined when the C 43 43 compiler is in use You can us
  • VmWare虚拟机设置ubuntu和windows之间的共享文件夹

    一般在进行编程作业的时候 xff0c 我们会采用 开发在Windows中编辑源代码 xff0c 在linux中编译 执行源代码 这往往需要需要将在Windows下编辑好的源代码上传到linux系统种进行编译 怎么来进行上传呢 xff1f 其
  • C++的最后一道坎|百万年薪的程序员

    导语 C 43 43 的起源可以追溯到 40 年前 xff0c 但它仍然是当今使用最广泛的编程语言之一 xff0c C 43 43 发明人Bjarne Stroustrup 一开始没想到 C 43 43 会获得如此大的成功 xff0c 他说
  • Modbus 协议

    1 主站 xff1a 可以进行读写操作 从站 xff1a 只能被动响应主站操作 2 一个 Modbus 网络只有一个主站 xff0c 可以多个从站 xff08 主站不用连在两端 xff09 485 通讯 1 接线 最多 254 个站 xff
  • 数据结构对齐

    xfeff xfeff 对齐的算法 xff1a 由于各个平台和编译器的不同 xff0c 现以本人使用的gcc version 3 2 2编译器 xff08 32位x86平台 xff09 为例子 xff0c 来讨论编译器对struct数据结构
  • 关于示波器测485串口波特率的使用方法总结

    之前没有用过示波器 xff0c 更不知道怎么来测试串口的波特率 xff0c 结果遇到一客户说我们产品的波特率达不到9600 xff0c 只有9100 xff0c 为了验证这一说法 xff0c 我们只能自己测试一下 说明 xff1a 产品通过

随机推荐

  • C语言对寄存器封装

    一 封装外设 用C语言代码把外设地址映射用宏定义封装 span class hljs comment 外设基地址 span span class hljs preprocessor define PERIPH BASE unsigned i
  • JavaJDK下载安装与环境配置(Windows 10 超详细的图文版教程 )

    前言 xff1a 对于很多初学者来说 xff0c 我想可能很多人都会遇到JDK环境变量的配置问题 明明就是按照度娘上的教程去一步步配置的 xff0c 但还是会有很多的人出现配置不成功的问题 所以今天在这里分享一下windows 10 系统下
  • win32 获取窗口句柄的方法

    第一种方法是根据窗口句柄值获取窗口句柄 使用spy 43 43 获取指定窗口的窗口句柄值 xff0c 因为句柄值是16进制数 xff0c 所以前面加0x 然后进行强制转换为HWND类型 HWND hWnd 61 HWND 0x0028072
  • 当设置display:inline;时li的宽度无效的解决方法

    若制作导航栏时 xff0c 使用列表li 的定义时 xff0c 若想加上一个背景图 xff0c 这时候若定义li的一个属性为 li display inline width 83px height 30px xff0c 则浏览器会无视后面的
  • js文本框或者按钮鼠标悬停提示说明文字

    html页面中很多元素会用到文本提示 xff0c 当鼠标悬停之后显示一段说明文字 显示说明性文字 function tips id str t 61 getTop document getElementById id document ge
  • localstorage兼容ie8以下浏览器的问题

    最近在做一个网站 xff0c 由于希望尽可能减小服务器的压力 xff0c 也想提高网站的运转速度 xff0c 就想尽可能少的在服务器上读写数据以及下载重复数据 xff0c 需要重复使用的数据 xff0c 就储存在本地 xff0c 能在本地进
  • HTTP请求返回状态码中301与302的状态码区别

    一 xff0e 官方说法 301 xff0c 302 都是HTTP状态的编码 xff0c 都代表着某个URL发生了转移 xff0c 不同之处在于 xff1a 301 redirect 301 代表永久性转移 Permanently Move
  • java防止 csrf 攻击 --- 采用 spring .

    CSRF xff08 Cross site request forgery xff09 xff0c 中文名称 xff1a 跨站请求伪造 xff0c 也被称为 xff1a one click attack session riding xff
  • Gson解析数组多类型元素

    why used gson Gson is a Java library that can be used to convert Java Objects into their JSON representation It can also
  • js中获取时间new Date()详细介绍

    1 var myDate 61 new Date Date 返回当日的日期和时间 getDate 从 Date 对象返回一个月中的某一天 1 31 getDay 从 Date 对象返回一周中的某一天 0 6 getMonth 从 Date
  • 如何让一个行内元素(如一张图片)在div中居中

    xff08 1 xff09 第一种 xff1a 用vertical align lt div class 61 34 method1 34 gt lt span class 61 34 tiptop 34 gt lt span gt lt
  • 释放webpack tree-shaking潜力之webpack-deep-scope-analysis-plugin

    在上周末广州举办的 feday 中 webpack 的核心开发者 Sean 在介绍 webpack 插件系统原理时 隆重介绍了一个中国学生于 Google 夏令营 在导师 Tobias 带领下写的一个 webpack 插件 https gi
  • iframe跨域通信的通用解决方案

    此方案已有新版本 请查看 iframe跨域通信的通用解决方案 第二弹 xff08 终极解决方案 xff09 本文章可做技术学习供继续交流 一 背景 在这个Web页面越来越丰富的时代 xff0c 页面通过iframe嵌入其他的页面也越来越常见
  • C++实现轻量级RPC分布式网络通信框架

    前言 xff1a 2022 4 14更新 xff1a 在我重新回顾这篇文章的时候 xff0c 我觉得里面内容有点乱 xff0c 主要还是因为RPC里面涉及到很多概念和知识点 本来代码内容就已经挺抽象了 xff0c 还要结合各种概念 xff0
  • 浅谈JavaScript设计模式

    创建型模式 xff1a 该模式处理的是用于创建对象的各种机制 工厂方法抽象工厂建造者原型单例 结构型模式 xff1a 考虑的是对象的组成以及对象彼此之间的关系 适配器桥接组合装饰器外观享元代理 行为型模式 xff1a 关注的是对象之间的依赖
  • Ardupilot-SITL仿真模拟调试

    1 配置SITL仿真调试 span class token punctuation span span class token operator span waf configure span class token operator sp
  • PX4——Range Finder 篇

    Range Finder 此处选用的是 Benewake 下的 Lidar 参数设置 General Configuration 除了官方的参数设置外 xff0c 我在 EKF2 中还找到了 EKF2 RNG AID 参数 xff0c 用来
  • STM32 时钟 定时器基础

    STM32 Clock Tree 时钟源 HSI xff1a xff08 High Speed Internal xff09 内部的 RC 震荡电路产生时钟信号 HSE xff1a xff08 High Speed External xff
  • Albumentation

    文章目录 AlbumentationClassificationSegmentationDetectionKeyPoints Augmentation Albumentation 所有实现的变换 变换以及支持的类型 此处 Classific
  • Pytorch 分布式训练

    文章目录 分布式训练OverviewDP or DDPDPDDP TCP 初始化Moco TCP 初始化例子 ENV 初始化可选后端进程间通信操作Template 区分概念 xff1a Machine vs Device 多机 Machin