模型并行
将模型部署到很多设备上(设备可能分布在不同机器上)运行,由于模型分割开的各个部分之间有相互依赖关系,因此计算效率不高。所以在模型大小不算太大的情况下一般不使用模型并行。
数据并行
相比较模型并行,数据并行方式能够支持更大的训练规模,提供更好的扩展性,因此数据并行是深度学习最常采用的分布式训练策略。
in-graph replication和between-graph replication 都用于数据并行。
所谓 replication,指的是各个task,replication的对象是模型。
在使用in-graph replication方式时,只有一个client进程(可以在参与训练的CPU或GPU上任选一个task来运行这个client,参与计算的其它tasks不运行这个client)来创建模型(即tf.Graph)及模型的参数(那些tf.Variables,比如权重W和偏置b)。由于参数(W和b)是共享的,该client指定把参数放在/job:ps,即parameter server上(比如 /job:ps/task:0/cpu:0)。模型的计算部分(前向传播,后向传播,loss和梯度计算,等等)也由该client进程定义好,然后client进程把这个计算部分分配到各个GPU device上(这个过程就相当于在各个GPU中复制模型),分配的方式类似函数调用,但每次调用都指定了设备(即 /job:worker/task:0/gpu:0,/job:worker/task:1/gpu:0,等等)。调用时,模型的参数(即W和b)被当作函数的参数输入给不同tasks(通常运行在不同GPU上)运行的模型,以保证这些参数确实是共享的。
如果用between-graph replication方式,则每个task都运行自己的client进程用于创建模型和参数,并将参数pin到parameter server上(比如 /job:ps/task:0/cpu:0),然后各自独立地执行该模型。注意,每个task创建的模型必须一模一样,这很容易做到,因为只要每个task里的这部分代码都一样就行了。问题是,这些task各自创建并pin到parameter server上的模型参数是同样的吗?问这个问题是因为我们现在跑的是数据并行,而模型的参数及其更新都必须由parameter server统一处理。回答是,只要各task使用同样的parameter server设备名(比如都用 /job:ps/task:0/cpu:0)和同样的变量名(那些tf.Variable定义的变量,比如权重和偏置变量), 那么在默认的情况下,它们被分配在parameter server的相同的存储里。
由于in-graph replication的性能不好,现在基本上只使用between-graph replication了。
参数更新方式
数据并行参数更新方式可以是同步的(synchronous),也可以是异步的(asynchronous)。
百度的综述Distributed Hierarchical GPU Parameter Server for Massive Scale Deep Learning Ads Systems 介绍了三种同步模式:
-
BSP(bulk sync parallel),严格对所有worker的更新进行同步
-
SSP(stale sync parallel),对快worker 进行同步
-
ASP(async parallel), 不同步gradient
后两种方式虽然提升了训练效率,但是降低了模型性能
XDL使用的是ASP。在tensorflow中异步训练是默认的并行训练模式。
异步训练
异步训练中,各个设备完成一个mini-batch训练之后,不需要等待其它节点,直接去更新模型的参数。异步训练总体会训练速度会快很多,但是异步训练的一个很严重的问题是梯度失效问题(stale gradients),刚开始所有设备采用相同的参数来训练,但是异步情况下,某个设备完成一步训练后,可能发现模型参数已经被其它设备更新过了,此时这个设备计算出的梯度就过期了。由于梯度失效问题,异步训练可能陷入次优解。
同步训练
所谓同步指的是所有的设备都是采用相同的模型参数来训练,等待所有设备的mini-batch训练完成后,收集它们的梯度后执行模型的一次参数更新。
Tensorflow提供了tf.train.SyncReplicasOptimizer类用于执行同步训练。把异步训练改造成同步训练只需要两步:
在原来的Optimizer上封装SyncReplicasOptimizer,将参数更新改为同步模式;
optimizer = tf.train.SyncReplicasOptimizer(optimizer, replicas_to_aggregate=num_workers)
在MonitoredTrainingSession或者EstimatorSpec的hook中增加sync_replicas_hook:
sync_replicas_hook = optimizer.make_session_run_hook(is_chief, num_tokens=0)
同步训练需要各个设备的计算能力要均衡,而且要求集群的通信也要均衡,慢worker会拖慢整体进度。
tensorflow 1 分布式架构
2017年2月百度在PaddlePaddle平台上首次引入了ring-allreduce的架构,随后将其提交到tensorflow的contrib package中。同年8月,Uber为tensorflow平台开源了一个更加易用和高效的ring allreduce分布式训练库Horovod。
最后,tensorflow官方终于也在1.11版本中支持了allreduce的分布式训练策略CollectiveAllReduceStrategy,其跟estimator配合使用非常方便,只需要构造tf.estimator.RunConfig 对象时传入CollectiveAllReduceStrategy参数即可。
关于 ring-allreduce 之前总结在 分布式架构:ring all-reduce算法。
使用 TensorFlow Estimator API 来编写分布式训练代码
要让tensorflow分布式运行,首先我们需要定义一个由参与分布式计算的机器组成的集群,如下:
cluster = {'chief': ['host0:2222'], 'ps': ['host1:2222', 'host2:2222'], 'worker': ['host3:2222', 'host4:2222', 'host5:2222']}
集群中一般有多个worker,需要指定其中一个worker为主节点(cheif),chief节点会执行一些额外的工作,比如模型导出之类的。在PS分布式架构环境中,还需要定义ps节点。
要运行分布式Estimator模型,只需要设置好TF_CONFIG环境变量即可,可参考如下代码:
# Example of non-chief node:
os.environ['TF_CONFIG'] = json.dumps( {'cluster': cluster, 'task': {'type': 'worker', 'index': 1}})
# Example of chief node:
os.environ['TF_CONFIG'] = json.dumps( {'cluster': cluster, 'task': {'type': 'chief', 'index': 0}})
# Example of evaluator node (evaluator is not part of training cluster)
os.environ['TF_CONFIG'] = json.dumps( {'cluster': cluster, 'task': {'type': 'evaluator', 'index': 0}})
定义好上述环境变量后,调用tf.estimator.train_and_evaluate即可开始分布式训练和评估,其他部分的代码跟开发单机的程序是一样的,可以参考下面的资料:
构建分布式Tensorflow模型系列:Estimator - 知乎