去年总结了一些 tensorflow 1.x 分布式训练的一些知识(tensorflow 1.x 的分布式训练)。最近总结了一些 tf2.x 的分布式训练相关知识。
tf2.x 分布式训练策略
TensorFlow 的 tf.distribute 模块下包含有一系列分布式训练策略,它们都是基于数据并行模式实现的。有些策略目前还在 experimental 模块下,表示它们是实验性质的策略,未来可能会发生变动。
训练分布式模型,只需要将原先的模型代码置于distribution Strategy的Scope()下,即可。
import tensorflow as tf
data_train, _ = tf.keras.datasets.mnist.load_data()
dataset = tf.data.Dataset.from_tensor_slices(data_train) # 该处可接收numpy数组
dataset = dataset.shuffle(buffer_size=60000) # 该处要大于data_train的长度
dataset = dataset.batch(32)
mirrored_strategy = tf.distribution.MirroredStrategy()
# mirrored_strategy策略: 在每一个gpu上训练一个模型,每次更新时需要汇总所有gpu上的梯度。
with mirrored_strategy.scope():
model = tf.keras.Sequential([...])
# tf 2.0中,所有的optimizer都在tf.keras.optimizer下
model.compile(optimizer=tf.keras.optimizer.adam(lr=...)),
loss = "sparse_categorical_crossentropy",
metrics = ['accuracy'])
model.fit(dataset, epoch=5)
单机多卡训练
Mirrored
MirroredStrategy 是一种单机的同步的分布式训练策略。它支持在一台机器的多个 GPU 之间进行分布式训练,它会在每个 GPU 上创建一个模型副本,模型中的每个变量 (Variables) 都会进行镜像复制并放置到相应的 GPU 上,这些变量被称作镜像变量 (MirroredVariable)。
MirroredStrategy 策略通过 AllReduce 算法使得所有镜像变量在每个 GPU 之间保持同步更新, AllReduce 算法默认使用英伟达的 NcclAllReduce ,也可以通过 cross_device_ops 参数修改为其他的 AllReduce 算法,如 HierarchicalCopyAllReduce 。
MirroredStrategy 策略会自动使用所有能被 TensorFlow 发现的 GPU 来做分布式训练,如果只想使用部分的 GPU 则可以通过 devices 参数来指定。
MirroredStrategy 实例的创建代码如下所示:
mirrored_strategy = tf.distribute.MirroredStrategy(
devices=["/gpu:0", "/gpu:1"],
cross_device_ops=tf.distribute.HierarchicalCopyAllReduce(),
)
如果 TensorFlow 没有发现 GPU 则默认会退化为使用 CPU 来进行训练。 MirroredStrategy 的典型使用场景为单机多 GPU 。
MirroredStrategy 的步骤如下:
-
训练开始前,该策略在所有 N 个计算设备上均各复制一份完整的模型;
-
每次训练传入一个批次的数据时,将数据分成 N 份,分别传入 N 个计算设备(即数据并行);
-
N 个计算设备使用本地变量(镜像变量)分别计算自己所获得的部分数据的梯度;
-
使用分布式计算的 All-reduce 操作,在计算设备间高效交换梯度数据并进行求和,使得最终每个设备都有了所有设备的梯度之和;
-
使用梯度求和的结果更新本地变量(镜像变量);
-
当所有设备均更新本地变量后,进行下一轮训练(即该并行策略是同步的)。
CentralStorage
CentralStorageStrategy 也是一种单机的同步的分布式训练策略。但与 MirroredStrategy 策略不同的是,它会将模型的所有变量保存在 CPU 内存上,而不是通过镜像复制的方式保存在每个 GPU 上,所有的计算操作则会在每个 GPU 上以同样的方式执行。
如果机器只有一个 GPU , 那么所有的变量和计算操作都会放在该 GPU 上。在对 CPU 上的变量进行更新前,该策略会先将所有 GPU 副本的上的变量梯度进行聚合,然后应用到 CPU 变量更新中。
CentralStorageStrategy 实例的创建代码如下所示:
central_storage_strategy = tf.distribute.experimental.CentralStorageStrategy()
CentralStorageStrategy 策略在 CPU 与 GPU 通信代价远低于 GPU 与 GPU 之间的通信代价时,较为适用,基本上很少会有这种情况出现。
多机训练策略
MultiWorkerMirroredStrategy
MultiWorkerMirroredStrategy 策略因为要涉及到多个 worker 节点之间的通信交互,因此每个 worker 节点需要提前获知集群中各节点配置信息以便在变量更新时使用。
TensorFlow 中定义集群配置信息的标准方式是使用 TF_CONFIG 环境变量来实现的,该环境变量定义了集群中所有节点的配置信息,包括所有 worker 节点的网络地址,当前 worker 节点的索引 (index) 以及当前 worker 节点的角色 (type)。
示例如下:
os.environ['TF_CONFIG'] = json.dumps({
'cluster': {
'worker': ["localhost:20000", "localhost:20001"]
},
'task': {'type': 'worker', 'index': 0}
})
TF_CONFIG 由 cluster 和 task 两部分组成:
cluster 说明了整个多机集群的结构和每台机器的网络地址(IP + 端口号)。对于每一台机器,cluster 的值都是相同的;
task 说明了当前机器的角色。例如, {'type': 'worker', 'index': 0} 说明当前机器是 cluster 中的第 0 个 worker(即 localhost:20000 )。每一台机器的 task 值都需要针对当前主机进行分别的设置。
以上内容设置完成后,在所有的机器上逐个运行训练代码即可。先运行的代码在尚未与其他主机连接时会进入监听状态,待整个集群的连接建立完毕后,所有的机器即会同时开始训练。
MultiWorkerMirroredStrategy 策略与 MirroredStrategy 策略很相似,可以理解为是 MirroredStrategy 策略的多机的同步的分布式训练版本,它也会在每一台机器上创建所有变量的副本。
多个 worker 节点之间使用 AllReduce 算法来保持模型变量的同步更新, TensorFlow 里将这一操作称为 CollectiveOps。 CollectiveOps 会在 TensorFlow 模型运行时自动根据硬件,网络拓扑以及张量的大小来自动选择合适的 AllReduce 算法来进行网络通信以完成变量更新。
MultiWorkerMirroredStrategy 策略目前有两种可供选择的 CollectiveOps 。 一种为 CollectiveCommunication.RING ,它使用 gRPC 作为通信层实现了基于环的 AllReduce 操作。 另一种为 CollectiveCommunication.NCCL, 它使用了英伟达的 NCCL 库来实现 AllReduce 操作。在实际使用中,可以基于自己的运行环境选择合适的 CollectiveOps,或者使用 CollectiveCommunication.AUTO 交由 TensorFlow 运行时自行选择。
MultiWorkerMirroredStrategy 实例的创建代码如下所示:
multiworker_strategy = tf.distribute.experimental.MultiWorkerMirroredStrategy(
tf.distribute.experimental.CollectiveCommunication.RING)
如果所有 worker 节点都不包含 GPU ,则该策略会退化为使用 CPU 在多个 worker 节点间进行分布式训练。如果集群中的 worker 节点数量只有一个则该策略会退化为 MirroredStrategy 策略。
ParameterServerStrategy
ParameterServerStrategy 是一种多机的异步的分布式训练策略。所以它也需要提前指定 TF_CONFIG 环境变量信息,与 MultiWorkerMirroredStrategy 策略不同的是集群中的节点不全是 worker ,有一部分节点会被指定为 ps 用来存储变量信息。模型的每一个变量都会存储在一个 ps 节点上,所有的计算操作会在所有的 worker 节点上以同样的方式执行。 ParameterServerStrategy 实例的创建代码如下所示:
ps_strategy = tf.distribute.experimental.ParameterServerStrategy()
分布式集群定义
一个典型的 TF_CONFIG 环境变量的值如下所示:
{
"cluster": {
"chief": ["host1:port"],
"worker": ["host2:port", "host3:port"],
"ps": ["host4:port"],
"evaluator": ["host5:port"]
},
"task": {
"type": "worker",
"index": 0
}
}
chief 节点的作用和 worker 节点大致相同,不过它还会做一些额外的工作,比如保存检查点文件 (checkpoints) 以及为 Tensorboard 记录日志文件等,如果不指定 cheif 节点,则默认会以 worker 列表中的第一个节点作为 chief 节点; worker 节点用来执行训练操作; ps 节点用来存储变量,只有在使用 ParameterServerStrategy 训练策略时才需要指定; evaluator 用来执行交叉验证操作,一般也是在使用 ParameterServerStrategy 策略时才会指定。
注意所有节点的 TF_CONFIG 环境变量中的 cluster 信息都是相同的,不同的地方在于 task 部分,而且所有角色 (task type) 的 index 必须从 0 开始,因为 TensorFlow 会根据该 index 从 cluster 下相应角色的列表中读取节点信息。
TF_CONFIG 环境变量可以写入到系统的环境变量中,但前提是该物理节点上只会同时启动一个集群节点实例,在大多数情况下,我们会在 python 程序中通过 os.environ["TF_CONFIG"] 来指定集群的信息以实现按需创建,TensorFlow 运行时会自动解析其中的信息并启动训练任务。
TF 集群分布式训练的难点
集群分布式训练的难点在于每个节点的 TF_CONFIG 环境变量的构建,因为我们不能在每次训练时都去手动指定 ip 和端口(还需确定该端口是否被占用),一两个节点还可以忍受,可如果同时运行多个训练任务,并且每个任务都会使用几十个集群节点,那么手动构造这个环境变量的工作量是巨大的。
我们需要找到一种自动构建 TF_CONFIG 环境变量的方法,一些分布式训练框架可以为我们排忧解难。比如阿里的 x-deeplearning。