本系列介绍分布式优化器,分为三篇文章,分别是基石篇,DP/DDP/Horovod 之中数据并行的优化器,PyTorch 分布式优化器,按照深度递进。本文介绍PyTorch 分布式优化器和PipeDream之中的优化器,主要涉及模型并行(流水线并行)。
PyTorch分布式其他文章如下:
深度学习利器之自动微分(1)
深度学习利器之自动微分(2)
[源码解析]深度学习利器之自动微分(3) --- 示例解读
[源码解析]PyTorch如何实现前向传播(1) --- 基础类(上)
[源码解析]PyTorch如何实现前向传播(2) --- 基础类(下)
[源码解析] PyTorch如何实现前向传播(3) --- 具体实现
[源码解析] Pytorch 如何实现后向传播 (1)---- 调用引擎
[源码解析] Pytorch 如何实现后向传播 (2)---- 引擎静态结构
[源码解析] Pytorch 如何实现后向传播 (3)---- 引擎动态逻辑
[源码解析] PyTorch 如何实现后向传播 (4)---- 具体算法
[源码解析] PyTorch 分布式(1)------历史和概述
[源码解析] PyTorch 分布式(2) ----- DataParallel(上)
[源码解析] PyTorch 分布式(3) ----- DataParallel(下)
[源码解析] PyTorch 分布式(4)------分布式应用基础概念
[源码解析] PyTorch分布式(5) ------ DistributedDataParallel 总述&如何使用
[源码解析] PyTorch分布式(6) ---DistributedDataParallel -- 初始化&store
[源码解析] PyTorch 分布式(7) ----- DistributedDataParallel 之进程组
[源码解析] PyTorch 分布式(8) -------- DistributedDataParallel之论文篇
[源码解析] PyTorch 分布式(9) ----- DistributedDataParallel 之初始化
[源码解析] PyTorch 分布式(10)------DistributedDataParallel 之 Reducer静态架构
[源码解析] PyTorch 分布式(11) ----- DistributedDataParallel 之 构建Reducer和Join操作
[源码解析] PyTorch 分布式(12) ----- DistributedDataParallel 之 前向传播
[源码解析] PyTorch 分布式(13) ----- DistributedDataParallel 之 反向传播
[源码解析] PyTorch 分布式 Autograd (1) ---- 设计
[源码解析] PyTorch 分布式 Autograd (2) ---- RPC基础
[源码解析] PyTorch 分布式 Autograd (3) ---- 上下文相关
[源码解析] PyTorch 分布式 Autograd (4) ---- 如何切入引擎
[源码解析] PyTorch 分布式 Autograd (5) ---- 引擎(上)
[源码解析] PyTorch 分布式 Autograd (6) ---- 引擎(下)
[源码解析] PyTorch分布式优化器(1)----基石篇
[源码解析] PyTorch分布式优化器(2)----数据并行优化器
为了更好的说明,本文代码会依据具体情况来进行相应精简。
之前无论是 DP, DDP,或者 Horovod,实质上的都是处理数据并行,比如 DDP 将相同的模型复制到所有 GPU,其中每个 GPU 使用输入数据的不同分区。虽然它可以显着加速训练过程,但它不适用于模型太大而无法放入单个 GPU 的某些用例。于是人们引入了模型并行(model parallel)。
与此对应,优化器也需要做不同的修改以适应模型并行的需求。为了更好的分析,本文首先介绍单机模型并行,然后介绍PyTorch分布式优化器。
下面文字翻译自 https://pytorch.org/tutorials/intermediate/model_parallel_tutorial.html ,加入了一些自己的思考和理解。
模型并行被广泛用于分布式训练。与相比,模型并行将单个模型拆分到不同的 GPU 上,而不是在每个 GPU 上复制整个模型(具体来说,假设一个模型 包含 10 层,当使用,每个 GPU 将拥有这 10 层的全部副本,而当在两个 GPU 上使用模型并行时,每个 GPU 可以托管 5 层)。
模型并行的高级思想是将模型的不同子网络放置在不同的设备上,并相应地实现该方法以便跨设备移动中间输出。由于单个设备上只有模型的一部分在运行,因此一组设备可以共同服务于一个更大的模型。
在这篇文章中,我们不会尝试构建巨大的模型并将它们压缩到有限数量的 GPU 中。相反,这篇文章侧重于展示模型并行的想法。读者可以将这些想法应用到实际应用中。
让我们从一个包含两个线性层的玩具模型开始。要在两个 GPU 上运行这个模型,只需将每个线性层放在不同的 GPU 上,并相应地移动输入和中间输出以匹配层设备。
的代码看起来与在单个 GPU 上的实现方式非常相似。只是修改了两个部分:网络构造部分和forward部分。
- 方法使用了两个语句用来在适当的设备上放置线性层,这样就把整个网络拆分成两个部分,然后就可以分别运行在不同的GPU之上。
- forward 方法使用了两个语句用来在适当的设备上放置张量,这样可以把一个layer的输出结果通过tensor.to的语义拷贝到另一个layer所在的GPU上。
这是模型中唯一需要更改的地方。和会可以应付这种情况,它们自动接管梯度,仿佛模型是一个GPU之上。在调用损失函数时,您只需要确保标签与网络的输出在同一设备上。
这里最重要的是 labels = torch.randn(20, 5).to('cuda:1'),这保证了标签在 cuda:1'。
回忆一下之前forward的代码:self.net2(x.to('cuda:1'))。这两行代码确保标签与输出在同一设备 cuda:1' 上。
初始化之后如下:
forward 操作和设定label之后如下,现在输出和label都在GPU 1 之上:
还可以通过更改几行代码把一个现有的单 GPU 模块转换到在多个 GPU 上运行。下面的代码展示了如何分解 到两个 GPU之上。基本想法是继承现有模块,并在构建过程中将层拆分为两个 GPU。然后,重载方法以便把两个子网络拼接起来,具体是通过相应地移动中间输出来完成。
上述实现解决了模型太大而无法放入单个 GPU 的情况下的问题。但是,您可能已经注意到,即使您的模型适合这种情况,它也许会比在单个 GPU 上运行要慢。这是因为,在任何时候,两个 GPU 中只有一个在工作,而另一个坐在那里什么也不做。在 和 之中需要把中间输出从拷贝到 ,这将进一步引起性能恶化。
让我们运行一个实验,以更从一个可以量化地角度来了解执行时间。在这个实验中,我们通过运行随机输入和标签来训练和现有 。训练后,模型不会产生任何有用的预测,但我们可以对执行时间有一个合理的了解。
上述方法使用用作损失函数,使用作为优化器。它模仿 图像的训练,这些图像被组织成 3 个批次,每批次包含 120 个图像。然后,我们使用来运行 10 次,并且用标准差来绘制执行时间。
结果表明,模型并行需要的执行时间比但GPU实现需要的时间长 。所以我们可以得出结论,在 GPU 之间来回复制张量大约有 7% 的开销。
2.3.1 目前状况
我们总结一下目前状况:
- 虽然有多块GPU,但是在整个执行过程中的每一个时刻,只有一个GPU在计算,其他GPU处于空闲状态。
- 另外还有中间计算结果在GPU之间的拷贝工作,这也使得性能恶化。
因此我们需要针对这两个问题进行针对性处理:
- 让所有 GPU 都动起来。
- 减少拷贝传输时间。
2.3.2 解决方案
两个问题解决方案如下:
让所有 GPU 都动起来的一种选择是加入流水线机制:将每个批次进一步划分,组成一个分割(split )管道,这样当一个分割到达第二个子网络时,可以将接下来的分割送入第一个子网络。这样,两个连续的分割(split )就可以在两个 GPU 上同时运行。
为什么可以做到这一点?这是因为 CUDA 的异步并行执行逻辑。
- CUDA 的一些操作是异步的,比如:核发射,设备间数据拷贝,主机和设备内拷贝小存储块等等。
- 几乎所有具有计算能力1.1及更高计算能力的CUDA设备都支持并发复制和核执行,即数据拷贝和数值计算可以并行。
- 一些计算能力2.x的设备可并发执行多个内核。
- 在一些计算能力2.x的设备上,两个方向的拷贝可以并行(GPU到CPU,CPU到GPU)。
如何减少拷贝传输时间?这个可以使用一些硬件和软件的结合来增加带宽减少延迟,比如:
- 硬件层面包括:单机内部的PCIe、NVlink、NVSwitch;多机之间的RDMA网络(IB或RoCE)。
- 软件堆栈包括:GPUDirect的一系列技术:P2P(Peer-to-Peer),RDMA,Async,Storage等。
PyTorch使用了NCCL库(基于CUDA计算)。
在接下来的实验中,我们进一步将每个"120 个图像批次" 分成 "20 个图像分割(split)"。由于 PyTorch 异步启动 CUDA 操作,因此实现不需要产生多个线程来实现并发。
请注意,设备到设备张量复制操作会在源设备和目标设备上的当前流上进行同步。如果创建多个流,则必须确保复制操作正确同步。在完成复制操作之前写入源张量或读取/写入目标张量可能会导致未定义的行为。上述实现仅在源设备和目标设备上使用默认流,因此没有必要强制执行额外的同步操作。
实验结果表明,把流水线输入加入到 ResNet50 的模型并行之后,训练过程加快了大约。虽然它离理想的 100% 加速还很远。由于我们在流水线并行实现中引入了一个新参数,因此尚不清楚此新参数如何影响整体训练时间。直观地说,使用小的会导致许多微小的 CUDA 核启动,而使用大结果会导致在第一次和最后一次拆分期间产生相对较长的空闲时间。两者都不是最优的。这个特定实验可能有一个最佳配置。让我们尝试通过使用几个不同的值运行实验来找到它。
结果表明,设置为 12 实现了最快的训练速度,从而导致加速。我们仍有机会进一步加快训练进程。例如,目前所有上的操作都放在其默认流上。这意味着下一个拆分的计算不能与上一个拆分的复制操作重叠。但是,由于 prev 和 next 拆分(split)是不同的张量,因此将一个张量的计算与另一个张量的拷贝重叠起来是没有问题的。这种实现需要在两个GPU上使用多个流,并且不同的子网结构需要不同的流管理策略。由于没有一个适用于所有模型并行用例的通用的多流解决方案,我们不会在本教程中讨论它。
这篇文章展示了几个性能测量。在您自己的机器上运行相同的代码时,您可能会看到不同的性能结果,因为结果取决于底层硬件和软件。要为您的环境获得最佳性能,正确的方法是首先生成结果曲线,并根据曲线来确定最佳分割大小,然后将该分割大小应用到管道输入之上。
我们已经了解了单机之上的模型并行,接下来就要看模型跨越多个服务器的分布式模型并行训练。
我们先设想一下如果自己实现分布式优化器,应该如何处理。
假如模型分为三个部分,有三个主机可以训练。
我们会显式的把这三个部分分别部署到三个主机之上,在三个主机之上都有一套自己的训练代码,每个训练代码之中都有自己的本地优化器负责优化本地子模型的参数。
但是这样有几个问题需要我们解决:
- 如何划分模型到不同机器上?如何把代码分割到不同机器上?
- 如何跨机器把前向传播,后向传播连接在一起?
- 各个机器之间是同步运行还是异步运行?
- 如果是同步,如何让整个系统用同一个步骤运行?
- 如何把这些优化器结合在一起?还是优化器各做各的,彼此没有任何联系?
- 如何尽力让用户少修改代码?
- 如何能让开发者感觉就是开发本地版本代码?
经过思考就会发现,这里面错综复杂。如果我们自己基于 PyTorch 来实现,你会发现这可能最终结果是一个 PipeDream。于是我们看看 PyTorch 如何处理。
PyTorch 使用 RPC 来解决这些问题。
3.2.1 四大天王
前文我们提到了,PyTorch的分布式框架使用了四大天王:
- **远程过程调用 (RPC) ** 使用给定的参数在指定的worker上运行函数并获取返回值或创建对返回值的引用。有三个主要的 API: (同步)、 (异步)和 (异步并返回对远程返回值的引用)。
- 如果用户代码在没有返回值的情况下无法继续,请使用同步 API。
- 否则,使用异步 API 获取 Future,并在调用者需要返回值时等待 Future。
- API适用如下情况:需要在远程创建某些内容但从不需要将其获取给调用者。
- 远程引用 (RRef) 是指向本地或远程对象的分布式共享指针,就是本地或者跨机器的变量引用。
- **Distributed Autograd **将所有参与前向传播 worker的本地 autograd 引擎缝合在一起,并在后向传播期间自动联系它们以计算梯度。在进行前向传递如果需要跨越多台机器时,这尤其有用,例如分布式模型并行训练、参数服务器训练等。 有了这个特性,用户代码不再需要担心如何跨 RPC 边界发送梯度和应该以什么顺序启动本地 autograd 引擎,如果前向传递中有嵌套和相互依赖的 RPC 调用,这可能会变得非常复杂。
- 分布优化器的构造需要一个 (例如,,等)和一个RRefs的参数列表。即,在每个不同的Ref所有者之上创建一个 实例,然后运行相应更新参数。当用户进行分布式前向和后向传播时,参数和梯度将分散在多个 worker 中,因此需要对每个相关 worker 进行优化。Distributed Optimizer 将所有这些本地优化器合而为一,并提供了简洁的构造函数和API。
3.2.2 逻辑关系
我们使用官方图示,可以看到 PyTorch 分布式包的内部架构和逻辑关系。分布式优化器基于另外三者之上。
我们会在后续结合代码进行讲解如何使用。
首先说明一下,为了清晰的分析,我们后续忽略所有 script 相关部分。
的使用方法如下:
- 获取要优化的远程参数列表 ()。 这些也可以是包装在本地RRef中的本地参数。
- 将 类作为本地优化器来运行所有的RRef owner。
- 分布式优化器在每个 worker 节点上创建其本地优化器的实例,并持有这些本地优化器的 RRef。
- 当调用 时,分布式优化器使用 RPC 在适当的远程 worker 上远程执行所有本地优化器。 必须获得一个分布式autograd 作为输入,本地优化器将把梯度保存在相关的context之中。
- 如果多个并发的分布式优化器同时更新工作器上的相同参数,则这些更新将通过锁序列化。
看起来有点抽象,我们需要一步一步分析。
综上所述,以下是使用分布式 autograd 和分布式优化器的简单端到端示例。 如果将代码放入名为“ dist_autograd_simple.py”的文件中,则可以使用命令运行该代码:
DistributedOptimizer 得到了分散在 workers 之上参数的远端引用,然后对于这些参数在本地运行优化器。
对于单个worker来说,如果它接受到来自相同或不同客户端的的并发调用,则这些调用将会在这个worker之上串行进行,因为每个worker的优化器一次只能处理一组梯度。
DistributedOptimizer 的定义其实看不到啥东西,这是因为 Python 的语言特性,我们没办法在统一地方看到类的成员变量,但是有一个 functional_optim_map 值得我们关注。 这里是把每个内置优化器又配置了一个对应的新优化器,比如 optim.Adagrad 对应的是 _FunctionalAdagrad,我们就选择一个新优化器看看。
4.3.1_FunctionalSGD
optim.SGD 对应的是 _FunctionalSGD。其代码位于 torch/distributed/optim/functional_sgd.py。具体是定义一个与TorchScript兼容的函数式SGD优化器,PyTorch 将以函数的方式使用这些优化器。在更新参数时,PyTorch 不使用 param.grad,而是显式地允许分布式优化器将梯度传递给 step 函数。注意:此优化器应该仅由分布式优化器内部使用,而不是向用户公开。
4.4.1 初始化
这部分代码主要对应了:分布式优化器在每个 worker 节点上创建其本地Optimizer的实例,并将持有这些本地优化器的 RRef。具体结合我们之前示例代码来看,params_rref 就是需要优化的参数列表,每个会对应一个优化器,就是 DistributedOptimizer 生成了所有节点上的优化器,以 rpc.RRef(_LocalOptimizer) 形式保存在 self.remote_optimizers 之中。
4.4.2 生成优化器 _LocalOptimizer
是生成了 。
_LocalOptimizer 是本地优化器,其运行在远端worker节点之上,master 拥有这些优化器的代理。
4.4.3 等待完成
用 _wait_for_all 等待异步完成。
对应的逻辑如下:
- ref1, ref2 是远端待优化的参数,都是 torch.rand((3, 3))。
- optim_rref1,optim_rref2 分别是 Node 2,Node 3上本地优化器的 rref。
DistributedOptimizer 在优化时候,会遍历保存的优化器,逐一调用 _local_optimizer_step。
为什么可以在Node 1 之上统一调用这些远端优化器?因为最后更新所有参数完毕之后,才能调用下一轮前向传播,所以可以统一调用然后等待都完成。
4.5.1 本地优化
_local_optimizer_step 就是得到 _LocalOptimizer,然后调用其 step。
_LocalOptimizer 的 step 首先获取分布式梯度,然后用这个梯度进行参数优化。
4.5.2 获取分布式梯度
get_gradients 的 Python 代码其实没有意义。
其对应 C++ 的位于 torch/csrc/jit/runtime/register_distributed_ops.cpp。是调用了上下文的函数。
C++世界的 getGradients 代码如下:
在 torch/csrc/distributed/autograd/context/context.h之中有:
所以我们逻辑拓展如下:
- DistributedOptimizer 调用 optim_rref1 和 optim_rref2 的 step 方法在远端 worker 之上进行运行,优化。
- Worker 1 和 worker 2 之上的 分别获得对本地 进行优化。
- 优化结果在 之中的累积。
这样,整个模型的各个子模型就在各个 Node 之上以统一的步骤进行训练/优化。
最后,我们来看看 PipeDream,看看它是怎么实现分布式优化器的,我们探寻的思路是:
- 因为PipeDream是在每个worker之上启动全部代码,所以每个本地优化器如何确定自己要优化的参数?
- 优化时候如何更新参数?
我们先提前说一下:
- 每个node的module不同,所以每个优化器的待优化参数是本地module的参数。
- 每个node优化自己负责的部分module。
我们需要从头梳理。
5.1.1 main 方法
来到 runtime/translation/main_with_runtime.py。这里首先构建一个 StageRuntime,然后用 StageRuntime 的参数来构建优化器。
5.1.2 构建runtime
StageRuntime 的 initialize 函数会构建 module,这里通过本 node 的stage 来构建自己的 modules。
我们从前面文章中摘录。
stage_to_module_map 就是设置 stage 到 modules 的关系,目的是为了得到本stage所对应的modules。
本stage(数值为 3)对应的是 index 为 3,4 的两个 module,就是下面的 3 ,3.
具体代码是:
比如模型被分配到两个node之上,每个node两个layers,这里 Node 2有一个DDP数据并行。
每个 Node 的模型参数就是不同的,Node 1 的待优化参数是 Layer 1,Layer 2 的参数;Node 2 的待优化参数是 Layer 3,Layer 4 的参数。
5.1.3 SGDWithWeightStashing
然后用 runtime 的 master_parameters 和 model_parameters 构建本地优化器 SGDWithWeightStashing。
OptimizerWithWeightStashing 是 SGDWithWeightStashing 的基类。
基类 OptimizerWithWeightStashing 会生成一个原生优化器,赋值在 base_optimizer。
逻辑拓展如下,每个优化器使用自己 Node 的参数进行优化。
5.2.2 整体优化
整体是异步运行,也就是异步优化。
5.2.2 优化器优化
优化直接使用 SGDWithWeightStashing 的 step 方法。其最后也是 class OptimizerWithWeightStashing(torch.optim.Optimizer) 的 step 方法。
具体如下:
至此,分布式优化器系列完成,在后续分析ZeRO时候,我们还会介绍 PyTorch ZeroRedundancyOptimizer,估计要等待几周之后了。我们从下一篇开始,介绍 PyTorch 分布式 的几个官方文档应用例子,以此来把 PyTorch 分布式整个逻辑串联起来看看在实际之中应该如何应用,敬请期待。
torch.optim.optimizer源码阅读和灵活使用
pytorch源码阅读(二)optimizer原理
pytorch 优化器(optim)不同参数组,不同学习率设置的操作
Pytorch——momentum动量
各种优化方法总结比较(sgd/momentum/Nesterov/adagrad/adadelta)
【优化器】优化器算法及PyTorch实现(一):永不磨灭的SGD
以optim.SGD为例介绍pytorch优化器
Pytorch学习笔记08----优化器算法Optimizer详解(SGD、Adam)
pytorch中使用torch.optim优化神经网络以及优化器的选择 - pytorch中文网
pytorch优化器详解:SGD
聊聊GPU通信那些事
https://developer.nvidia.com/gpudirect
https://www.nvidia.cn/data-center/magnum-io/