Intel-BigDL DistriOptimizer内部过程分析
Intel深度学习库BigDL在分布式模式下进行Model的训练是非常简单的,用户只需要提供需要训练的Model,训练集(RDD[Sample] or DataSet),损失函数(Criterion),batchSize(若训练集是DataSet则不需要提供),选择需要使用的优化方法(默认是SGD),然后通过val optimizer = new Optimizer(...)
创建一个Optimizer实例,最后调用optimizer.optimize()
方法即可。根据具体的需要,还可以在训练过程中进行验证(需提供验证集,validationMethod,batchSize等),训练过程可视化(通过TrainSummary和ValidationSummary)。
BigDL分布式训练过程
BigDL中Model的训练主要分为如下几步:
- 创建Optimizer实例并设置自定义的训练的正常结束条件,优化方法,是否需要可视化等。
- 调用Optimizer实例的optimize()方法开始训练。
- 初始化训练参数。
- 对于每个IIteration。
- 在Driver端初始化累加器,metrics等。
- 在Executor上获取最新的参数,并将随机获取的batch划分为_subModelNumber个miniBatch。
- 根据当前model和miniBatch计算Gradient。
- Aggregate Gradiens。
- 根据梯度进行参数更新。
- 根据触发条件进行Validation,保存checkpoint和Summary等。
- 获取训练好的参数并返回训练后的Model。
这个过程的流程如如下:
其中最重要的就是第2步了,下面将详细进行讲解。
创建Optimizer实例
在创建Optimizer实例时,BigDL会根据传入的参数类型返回特定的Optimizer,目前BigDL中实现的Optimizer有DistriOptimizer和LocalOptimizer两种,其中使用最多的是DistriOptimizer,下面的讲解也只针对DistriOptimizer进行。
// create optimizer
val optimizer = Optimizer(
model = model,
sampleRDD = trainSamples,
criterion = new ClassNLLCriterion[Float](),
batchSize = param.batchSize
)
在创建Optimizer实例时传入参数和返回的Optimizer实例的对应关系如下:
object Optimizer(){
def applay(model,sampleRDD,criterion,batchsize,feapad=null,labpad=null)=new DistriOptimizer[T](model,DataSet.rdd(sampleRDD)->SampleToMiniBatch(batchsize,_feapad,_labpad).asInstanceOf[DistributeDataSet[MiniBatch[T]]],criterion).asInstanceOf[Optimizer[T,MiniBatch[T]]
def applay(model,sampleRDD,criterion,batchsize,minBatchImp1)=new DistriOptimizer[T](model,DataSet.rdd(sampleRDD)->SampleToMiniBatch(miniBatchImp1,batchsize,None).asInstanceOf[DistributeDataSet[MiniBatch[T]]],criterion).asInstanceOf[Optimizer[T,MiniBatch[T]]
def applay(model,dataset,criterion)= dataset match {
case d:DistributedDataset[T]=> new DistriOptimizer[T](model,dataset.asInstanceOf[DistributedDataSet[MiniBatch[T]]],criterion).asInstanceOf[Optimizer[T,D]]
case d :LocalDataSet[_] =>
new LocalOptimizer[T](model,dataset.asInstanceOf[LocalDataSet[MiniBatch[T]],criterion).asInstanceOf[Optimizer[T,D]])
}
Model初始化
参数初始化过程需要完成的工作包括:
- 清除optimMethod的历史信息,并初始化某些(超)参数;
- 获取不同optimMethod和subModel之间的对应关系,以
subModuleName ->(storageOffset,length,AllReduceParameter)
的形式返回Model的weights和bias; - 对训练集进行类型转换并Cache到内存中
prepareInput()
; - 初始化模型参数,并按照训练集的Partition构建对应的RDD[Cache];
- Option:建立checkpoint的保存目录。
其中最重要的就是根据训练集生成对应的RDD[Cache],并对Model参数进行初始化。需要特别提出的是在同一个Worker上的Model共享相同的Parameters(Weights and Bias)即使用相同的存储空间但是Gradients则不共享。
- 清除历史记录并设置(超)参数。
// clear history
optimMethods.values.foreach { optimMethod =>
optimMethod.clearHistory()
}
// set HyperParameters
if (optimMethods.size == 1) {
optimMethods.head._2.loadFromTable(state)
}
- 以
subModelName -> AllReduceParameter
的形式返回Model参数。
AllReduceParameter 表示存储在block manager中的parameter。在分布式模式下,每个worker从block manager同步parameter,此时Block manager相当于是parameter server。 Tensor 会被划分为`partitionNum`块(chunk),每一块存储到对应的node(spark executor)。gradient也采用同样的方法存储到不同的node。这样gradient aggregation 和parameter update就可能独立进行。
// AllReduceParameter中定义的主要函数有
def getWeights(localParameter: Tensor[T]): FutureResult[Int]
def aggregateGradientPartition(avgNumbers: Int): Unit
def putGradients(parameter: Tensor[T]): Unit
def sendWeightPartition(): Unit
获取subModelName和AllReduceParameter对应关系的代码如下:
val parameters = if (optimMethods.size != 1) {
val p = optimMethods.map{case (subModuleName, optimMethods) =>
val subModule = model(subModuleName)
val subModuleWeights = subModule.get.getParameters()._1
(subModuleName, subModuleWeights)
}
val sortedWeights = p.values.toArray.sortWith((a, b) => a.storageOffset() < b.storageOffset())
val compactWeights = Module.isCompact(sortedWeights)
p.map{case (subModuleName, weights) =>
(subModuleName, AllReduceParameter.newParameter[T](
partitionNum, weights.nElement(), weights.storageOffset()))
}
} else if (optimMethods.contains(model.getName())) {
Map(model.getName() -> AllReduceParameter.newParameter[T](
partitionNum, modelParameters._1.nElement()))
} else {
throw new IllegalArgumentException(...)
}
- 对训练集进行类型转换并Cache到内存中。
if (!dataset.asInstanceOf[DistributedDataSet[MiniBatch[T]]].isCached){
DistriOptimizer.logger.info("caching training rdd ...")
DistriOptimizer.prepareInput(this.dataset,this.validationDataSet)
}
- 模型参数初始化并构建RDD[Cache]。
在DistriOptimizer中通过调用DistriOptimizer.initThreadModels(...)来实现模型参数的划分,参数初始化和RDD[Cache]的生成。具体代码如下:
// Init engine and cache models, weights, gradients, criterions, state tables and validation methods on worker nodes.
val broadcast = sc.broadcast((criterion, state, validationMethods, optimMethod))
// Notes: All models returned by modelBroadcast.value() share the same weight&bias, while gradWeight&gradBias is unshared.
val modelBroadcast = ModelBroadcast[T]().broadcast(sc, model)
然后在每个worker上执行
val partitionId = TaskContext.getPartitionId
val (broadcastCriterion, broadcastState, broadcastMethod,broadcastOptim) = broadcast.value
进行Singleton检查 //true if current execution is a singleton on the JVM
val cached = (0 until _subModelNumber).map { _ =>
val localModel = modelBroadcast.value(true)
// differentiate partition models from each other by partition ID
setModelId(localModel, partitionId)
val localCriterion = broadcastCriterion.cloneCriterion()
val localState = broadcastState.clone()
val localMethod =
if (broadcastMethod.isDefined) Some(broadcastMethod.get.map(_.clone())) else None
val (weights, grads) = localModel.getParameters()
(localModel, weights, grads, localCriterion, localState, localMethod)
}.toArray
// 初始化参数,调用 AllReduceParameter.init(...)
val weights = cached.head._2
parameters.foreach(v =>
v._2.init(weights.narrow(1, v._2.paramOffset, v._2.size))
)
val models = RDD[Cache[T]] .persist() // Memory_only
其中Cache是一个case class,具体结构如下:
// model cached in memeroy using following format
/**
* Optimizer cache some metadata on each executor
*
* @param localModels cached models
* @param modelWeights weights of the cached models
* @param modelGradients gradients of the cached models
* @param localCriterions cached criterion
* @param localStates cached state
* @param moduleTimeList module running time
* @param localMethods cached validation methods
* @param optimMethods cached optim methods
* @tparam T Tensor element type
*/
case class Cache[T](
localModels: Array[Module[T]],
modelWeights: Array[Tensor[T]],
modelGradients: Array[Tensor[T]],
localCriterions: Array[Criterion[T]],
localStates: Array[Table],
var moduleTimeList: Array[Long] = null,
localMethods: Array[Option[Array[ValidationMethod[T]]]],
optimMethods: Map[String, OptimMethod[T]]
)
在这个过程中最重要的就是进行Model参数的初始化和参数的存储,在正式进行训练的时候,dataRDD与models(RDD[Cache[T]]进行zipPartitions操作,从而在每个Executor上完成梯度的计算。
最终经过参数初始化之后的RDD[Cache[T]]结构如下:
- Option:建立checkpoint的存储目录
if (checkpointPath.isDefined) {
val file = checkpointPath.get + "/" +
new SimpleDateFormat("yyyyMMdd_HHmmss").format(Calendar.getInstance().getTime())
new File(file).mkdir()
checkpointPath = Some(file)
}
分布式训练
BigDL中以Iteration为训练的基本单位,首先需要在Driver端进行训练过程信息的初始化,如设置当前已经训练的多少个Iteration,多少个Epoch,当前的loss是多少等。这些信息都以Table的形式保存在optimMethod.state和driverState中。
分布式训练本身是一个比较复杂的过程,为了方便用户能实时跟踪模型的训练过程,BigDL还需要收集训练过程中的多种信息,在正式开始训练之前也需要对这些进行初始化。
当上面的工作都完成之后,就正式进入训练了。对应每个迭代,首先需要初始化累加器(lossSum和recordsSum),一些必要的状态信息也需要进行初始化。然后调用dataRDD.zipPartitions(models,preservesPartitioning=true){(data,modelIter) => ...}
开始在每个Executor上执行具体的计算任务。
同步最新的参数到worker
Note: All models in cached
share the same storage for weights, so we only need to copy the weights from parameter server into the first model's weights.
val weightsResults = parameters.values.map(p =>
p.getWeights(cached.modelWeights.head.narrow(1, p.paramOffset,p.size))
).toArray
划分miniBatch
对于每个Task,BigDL会从当前Partition中随机获取一个Batch,然后将它划分为多个miniBatch,miniBatch的个数是当前Executor能并行的MapTask数——即当前Executor的core数(_subModelNumber)。
val cached = modelIter.next()
val batch = data.next()
...
var b = 0
while (b < _subModelNumber) {
miniBatchBuffer(b) = batch.slice(b * stackSize + 1, stackSize)
b += 1
}
梯度计算-model-forward-backward-Job
在参数同步和miniBatch划分完成之后就开始正式开始调用forward和backward方法来进行梯度的计算了。
具体代码如下:
val trainingThreads = Engine.default.invokeAndWait2((0 until _subModelNumber).map(i =>() => {
val trainStart = System.nanoTime()
val localModel = cached.localModels(i)
localModel.training()
val localCriterion = cached.localCriterions(i)
val input = miniBatchBuffer(i).getInput()
val target = miniBatchBuffer(i).getTarget()
val output = localModel.forward(input)
lossArray(i) = ev.toType[Double](localCriterion.forward(output, target))
val errors = localCriterion.backward(output, target)
localModel.backward(input, errors)
cached.moduleTimeList(i + pre) = System.nanoTime() - trainStart + weightSyncTime
i
}
当这个Partition中的梯度就算完成之后,BigDL会把_subModelNumber个MapTask的梯度聚合(sum)到一起,然后将最终的结果保存到BlockManager中,以待后续其他worker拉取梯度并更新参数时使用。
这里的gradient aggregate是在同一个executor中进行。
//获取已经计算完成的梯度
val finishedGradients = finishedThreads.map(cached.modelGradients(_))
// 梯度聚合(sum)
if (finishedThreads.nonEmpty) {
parameters.values.foreach { p =>
time = System.nanoTime()
val pOffset = p.paramOffset
val pLength = p.size
val taskSize = pLength / _subModelNumber
val extraTask = pLength % _subModelNumber
// Aggregate multi-model's gradient to the first model's gradient
val parallelNum = if (taskSize == 0) extraTask else _subModelNumber
Engine.default.invokeAndWait((0 until parallelNum).map(tid => () => {
val offset = pOffset + tid * taskSize + math.min(tid, extraTask)
val length = taskSize + (if (tid < extraTask) 1 else 0)
var i = 1
while (i < finishedGradients.length) {
finishedGradients(0).narrow(1, offset, length)
.add(finishedGradients(i).narrow(1, offset, length))
i += 1
}
}))
//将梯度保存到BlockManager中
// Put first finished model's gradient who aggregated
// all other models' gradient to AllReduceParameter
p.putGradients(finishedGradients(0).narrow(1, pOffset, pLength))
}else{
// zero gradient in BlockManager when no thread finished.
cached.modelGradients(0).zero()
parameters.values.foreach{p =>
p.putGradients(cached.modelGradients(0).narrow(1, p.paramOffset, p.size))}
参数更新-Parameter-synchronization-Job
Aggregate gradients
在进行梯度合并的时候,BigDL先将每个Task计算得到的gradient划分为n(n为Task的数目)份,然后启动n个Task来进行梯度聚合,每个task只负责处理对应的部分,如Task1只负责聚合每个local gradient中的第一部分。
这里的gradient aggregate是在不同Executor间进行的:
// enough records were processed for this batch, so update the model
val value = lossSum.value / numFinishedModelUpdates
driverState("numFinishedModel") = numFinishedModelUpdates
// isGradientUpdated is flag to mark whether gradient is updated. May changed in the future.
driverState("isGradientUpdated") = false
// parameterProcesser like L2NormClippingProcessor may aggregate gradient,
// and change the value of isGradientUpdated in driverState.
parameters.foreach { p =>
parameterProcessers.foreach(_.collectGlobalData(models, p._2, metrics, driverState))
}
val isGradientUpdated = driverState[Boolean]("isGradientUpdated")
val stateBroadcast = sc.broadcast(driverState)
...
}.reduce(_ + _) //梯度计算 Spark Job
进行gradients aggregate的具体代码如下:
/**
* Retrieve gradients for the slice of the model that this node is responsible for from all the
* other nodes. A new thread is created for each separate node. The gradients are then summed
* and then stored in decompressed form in `gradientPartition`.
* @param avgNumbers average numbers.
*/
def aggregateGradientPartition(avgNumbers: Int): Unit = {
require(partitionId < partitionNum, s"This parameter was created with $partitionNum " +
s"partitions. It cannot be used on RDDs with > $partitionNum partitions.")
// 从不同的node fetch 对应的gradients
val params = new Array[CompressedTensor[T]](partitionNum)
val sgThreads = (0 until partitionNum).map { pid =>
new Callable[Int] {
override def call(): Int = {
try {
val blockId = getGradientBlockId(pid, partitionId)
val tmp = BlockManagerWrapper.getLocalOrRemoteBytes(blockId).get
params(pid) = SerializerInstance.create(tmp)
BlockManagerWrapper.unlock(blockId)
pid
} catch {
case t: Throwable =>
logger.error("Error: " + ExceptionUtils.getStackTrace(t))
throw t
}
}
}
}
syncPool.invokeAll(sgThreads.asJava)
// 不同部分的梯度进行求和
val length = taskSize + (if (partitionId < extraSize) 1 else 0)
val poolSize = Engine.default.getPoolSize
val innerTaskSize = length / poolSize
val innerExtraSize = length % poolSize
val availableTask = if (innerTaskSize == 0) innerExtraSize else poolSize
computePool.invokeAll((0 until availableTask).map(tid =>
new Callable[Int] {
override def call(): Int = {
val innerStart = tid * innerTaskSize + math.min(innerExtraSize, tid)
val innerLength = innerTaskSize + (if (tid < innerExtraSize) 1 else 0)
params.reduce { (l, r) =>
l.add(r.bytes(innerStart, innerLength), innerStart, innerLength)
}
tid
}
}
).asJava)
params.head.deCompress(gradientPartition)
gradientPartition.div(ev.fromType(avgNumbers))
}
当所有的Gradient都完成聚合之后,先使用parameterProcesser对gradient进行处理,目前BigDL中实现了ConstanClippingProcessor和L2NormClippingProcessor,其中使用ConstantClippingProcessor不会更新driverState("isGradientUpdated")的值。
上述处理完成之后将进行参数的更新,同时还需要更新optimMethods的信息,代码如下:
models.mapPartitions { modelIter =>
val modelCache = modelIter.next()
// 如果gradient aggregate没有完成,那么需要等待其完成在进行参数更新
if (!isGradientUpdated) {
val getG = System.nanoTime()
// 这里完成不同Executor间的gradient聚合 parameters.values.foreach(_.aggregateGradientPartition(numFinishedModelUpdates))
driverMetrics.add("aggregrateGradientParition average executor",
System.nanoTime() - getG)
}
parameters.foreach { p =>
parameterProcessers.foreach(_.processParameters(p._2, modelCache, driverState))
}
// 开始对参数进行更新
modelCache.optimMethods.foreach{case (name,optimMethod =>
// 更新状态信息optimMethod.state
// 计算新的weight
...
val p = parameters(name)
optimMethod.optimize(_ => (ev.fromType(value),p.gradientPartition),p.weightPartition)
// 将更新后的weight保存到对应的BlockManager
p.sendWeightPartition()
}
Iterator.empty
}.count() // 参数更新 Spark Job
更新超参数
参数更新完成之后,还需要进行超参数的更新和输出这个Iteration的一些训练信息。
// 更新超参数
optimMethods.foreach{ v =>
v._2.updateHyperParameter()
}
// 打印训练的中间信息 epoch iteration neval learningRate Loss Throughput records等
除此之外,BigDL还需要计算Threshold,计算完成之后还需要清除一些存储在不同node的时间信息。
if (dropPercentage > 0.0 && iteration > warmupIterationNum &&
iteration % computeThresholdbatchSize == 0) {
val moduleTimeList = models.mapPartitions { iter =>
iter.next().moduleTimeList.iterator}.collect()
val k = (dropPercentage * computeThresholdbatchSize * driverSubModelNum).toInt
if (k > dropModelNumBatch) {
threshold = Util.kthLargest(moduleTimeList, 0, moduleTimeList.length-1,
k - dropModelNumBatch)
} else {
threshold = (threshold * 1.01).toLong
}
logger.info("threshold: " + threshold)
// clear moduleTimeList in each node
models.mapPartitions { iter =>
val timeList = iter.next.moduleTimeList
var i = 0
while (i < timeList.length) {
timeList(i) = 0
i += 1
}
Iterator.empty
}.count()
dropModelNumBatch = 0
}
训练集shuffle
当已经训练过的Records数等于训练集的总数时,BigDL认为这完成了一个Epoch的训练,然后需要对整个训练集进行shuffle操作,重新分配每个Partition的Records。
// 完成一个epoch进行数据shuffle
if (recordsProcessedThisEpoch >= numSamples) {
...
dataset.shuffle()
ataRDD = dataset.data(train = true)
recordsProcessedThisEpoch = 0
}
注意:因为在每个IterationBigDL是从Partition中随机获取一个batch进行训练,故当recordsProcessedThisEpoch >= numSamples
时,实际上这个epoch可能没有覆盖所有的samples。
Validation
当触发条件满足时,BigDL会根据创建Optimizer实例时设置的ValidattionDataset,ValidationMethod进行validate,然后进行validationSummary。
// 调用optimizer.optimize()之前设置
optimizer.setValidation(
trigger = Trigger.everyEpoch,
sampleRDD = testSamples,
vMethods = Array(new Top1Accuracy, new Top5Accuracy, new Loss),
batchSize = param.batchSize
)
//判断是否要进行validation
val trigger = validationTrigger.get
if (!trigger(state)) {
return
}
// validate
val results = ZippedPartitionsWithLocalityRDD(models, validateRDD)((modelIter, dataIter) => {
val cached = modelIter.next()
val vMethodsArr = cached.localMethods
val workingModels = cached.localModels
workingModels.foreach(_.evaluate())
dataIter.map(batch => {
...
Engine.default.invokeAndWait(
(0 until parallelism).map(b =>
...
// validation Summary
if(validationSummary.isDefined) {
results.foreach { r =>
val result = r._1.result
validationSummary.get.addScalar(r._2.toString(), result._1,
state[Int]("neval") - 1
)
}
}
TrainSummary
BigDL可以使用tensorboard可视化训练过程,默认会收集Loss,LearningRate和Threshold信息,其他的用户可以通过TrainSummary进行设置,官方目前仅仅支持LearningRate, Loss, Throughput, Parameters(包括weight,gradWeight,bias,gradBias和一些running status(eg runningMean and runningVar in BatchNormalization)。用户也可以继承抽象类Summary实现自己的TrainSummary和ValidationSummary。
// 调用optimizer.optimize()之前设置
val trainSummary = TrainSummary(param.summaryPath.get,param.appname)
val validationSummary = ValidationSummary(param.summaryPath.get,param.appname)
trainSummary.setSummaryTrigger("LearningRate",Trigger.everyEpoch)
trainSummary.setSummaryTrigger("Parameters",Trigger.everyEpoch)
optimizer.setTrainSummary(trainSummary)
optimizer.setValidationSummary(validationSummary)
...
val parametersTrigger = trainSummary.getSummaryTrigger("Parameters")
if (parametersTrigger.isDefined && parametersTrigger.get(driverState)) {
// Parallelize to create Histogram
....
val scalarTrigger = trainSummary.getScalarTriggers()
// Not parallelizable, because driverState is changing each iteration.
scalarTrigger.foreach { v =>
if (v._2(driverState)) {
trainSummary.addScalar(
v._1, driverState[Float](v._1), currentIteration)
checkpoint
根据用户设置的触发条件,BigDL可以在训练过程中保存checkpoint,训练中断后也可以通过checkpoint来恢复训练。
// 调用optimizer.optimize()之前设置
// save checkpoint
if(param.checkpoint.isDefined){
optimizer.setCheckpoint(param.checkpoint.get,Trigger.severalIteration(param.checkpointIteration))
}
if(param.overWriteCheckpoint){
optimizer.overWriteCheckpoint()
}
...
cacheTrigger.foreach { trigger =>
cachePath.foreach { path =>
if (trigger(state)) {
saveModel(getModel(models, parameters, trainingModel), cachePath, isOverWrite, s".${state[Int]("neval")}")
optimMethods.foreach{case (name, optimMethod) =>
saveOptimMethod(optimMethod, cachePath, isOverWrite, s"-$name.${state[Int]("neval")}") }
Fetch parameters and return trainedModel
当训练结束之后,BigDL会Fetch Model当前的Parameters到Driver,并将其copy到trianingModel,然后将训练好的Model返回,这个过程是通过调用DistriOptimizer.getModel(models,parameters,model)
来实现的。
// Fetch current model parameters to driver, and copy to trainingModel.
val partitionNum = models.partitions.length
val extraState = models.map(_.localModels.head.getExtraParameter()).first()
trainingModel.setExtraParameter(extraState)
// make sure gradient is as the same length as weight
val parameterArray = trainingModel.parameters()
(0 until parameterArray._2.length).foreach(i =>
parameterArray._2(i).resizeAs(parameterArray._1(i))
)
val (parameter, gradientParameter) = trainingModel.getParameters()
parameters.foreach { case (moduleName, p) =>
val currentModule = trainingModel(moduleName)
require(currentModule.isDefined, s"Couldn't find $moduleName in $trainingModel")
val (weights, gradients) = models.mapPartitions(iter => {
val cached = iter.next()
val curPartitionId = TaskContext.getPartitionId()
Iterator.single((Map(curPartitionId -> p.weightPartition),
Map(curPartitionId -> p.gradientPartition)))
}).reduce((a, b) => (a._1 ++ b._1, a._2 ++ b._2))
val taskSize = p.size / partitionNum
require(taskSize != 0, "parameter length should not less than partition number")
val extraSize = p.size % partitionNum
(0 until partitionNum).map(pid => {
val start = p.paramOffset + pid * taskSize + math.min(pid, extraSize)
val length = taskSize + (if (pid < extraSize) 1 else 0)
parameter.narrow(1, start, length).copy(weights(pid))
gradientParameter.narrow(1, start,length).copy(gradients(pid))
})
}
trainingModel
到此BigDL的分布式训练过程就结束了,用户只需要调用下面的方法即可。
val trainedModel = optimizer.optimize()
Reference:
[1] BigDL: A Distributed Deep Learning Framework for Big Data