SKIL/工作流程/Spark上的分布式训练

Spark上的分布式训练

随着复杂度的增加,深度学习模型可以获得高度的计算密集度。处理它的标准方法是使用更快的硬件(GPU)、代码优化或在分布式计算集群(如Spark)上训练网络。

建立一个Spark集群对数据科学家来说是一个难题,因为它需要大量的内存和集群配置。幸运的是,在SKIL实验中的Zeppelin笔记本提供了一个已经配置好的SparkContext,它可以被用于DL4J中分布式网络训练的Spark包装器所使用。

使用Spark集群

你将需要在后端安装一个Spark集群,并将分布式存储连接到该集群,比如HDFS。如果你已经完成了所有设置,你可以访问Spark解释器为你的笔记本配置它。
在这里,我们将重点介绍主要的工作流程和概念,以使分布式训练在skil内的Spark集群上工作。有关使用DL4J的ApacheSpark分布式训练的详细概述,请访问此处
如果你已经熟悉了这些概念,你可以跳到以scala编写的Zeppelin笔记本的示例部分。

工作流程和概念

下面是分布式训练的主要概念和工作流程。

围绕网络配置的Spark包装器

与DL4J的多层网络MultiLayerNetwork和计算图ComputationGraph类类似,DL4J定义了两个用于在Spark上训练神经网络的类:

  • SparkDl4jMultiLayer, MultiLayerNetwork的包装器
  • SparkComputationGraph, ComputationGraph的包装器

因为这两个类是围绕标准单机器类的包装器,所以网络配置过程(即创建MultiLayerConfigurationComputationGraphConfiguration)在标准和分布式训练中是相同的。但是, Spark分布式与本地训练在两个方面有所不同:如何加载数据,以及如何设置训练(需要一些特定于集群的额外配置)。

TrainingMaster 类

DL4J中的TrainingMaster是一个抽象(接口),允许将多个不同的训练实现与SparkDl4jMultiLayerSparkComputationGraph一起使用。
目前,DL4J有一个实现,即ParameterAveragingTrainingMaster。基本的TrainingMaster如下:

import org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster

//spark训练配置: see http://deeplearning4j.org/spark for
//解释这些配置选项
//指定每个DataSet对象中有多少个示例
val tm = new ParameterAveragingTrainingMaster.Builder(dataSetObjectSize) 
    //平均和重新分布参数的频率
    .averagingFrequency(5) 
    //如何处理异步预取多个小批量。0禁用预取,较大的值在预取时使用更多内存。
    .workerPrefetchNumBatches(2) 
    //每个工作机线程的最小批处理大小:每个工作机线程中用于每个参数更新的示例数
    .batchSizePerWorker(batchSize) 
    .build();
image.gif

可以在此处找到有关TrainingMaster Builder配置的更多信息。

分布式训练工作流程

在SKIL中的Spark集群上训练一个网络的典型工作流程如下:

  1. 从可用的SparkContext(sc)创建一个JavaSparkContext
  2. 加载训练/测试数据并将其转换为RDD(JavaRDD[DataSet])。
  3. 像通常使用MultiLayerNetworkComputationGraph类一样配置训练网络。
  4. 设置TrainingMaster ,以配置如何处理分布式训练。
  5. 包装你的网络配置 SparkDl4jMultiLayer 用于包装 **MultiLayerNetwork **或 SparkComputationGraph 用于包装 ComputationGraph.
  6. 调用 fit 方法用于在你的数据上训练。
  7. 在测试数据上评估你的模型。

示例

1. 初始化JavaSparkContext

你可以从一个提供的SparkContext (sc)中创建一个JavaSparkContext

import org.apache.spark.api.java.JavaRDD
import org.apache.spark.api.java.JavaSparkContext
//从zeppelin的上下文sc创建spark context (sc)
val jsc = JavaSparkContext.fromSparkContext(sc) 
image.gif

2. 加载数据并创建RDD

加载数据并创建JavaRDD[DataSet].

import org.deeplearning4j.datasets.iterator.impl.CifarDataSetIterator
import org.nd4j.linalg.dataset.DataSet

//将数据加载到内存中,然后并行化
//一般来说,这不是一个好方法-但对于本例来说,使用起来很简单
val batchSize = 32

val iterTrain = new CifarDataSetIterator(batchSize, 50000, true)
val iterTest = new CifarDataSetIterator(batchSize, 10000, false)

val trainDataList = new java.util.ArrayList[DataSet]
val testDataList = new java.util.ArrayList[DataSet]

while (iterTrain.hasNext()) {
    trainDataList.add(iterTrain.next())
}
while (iterTest.hasNext()) {
    testDataList.add(iterTest.next())
}

val trainData = jsc.parallelize(trainDataList)
val testData = jsc.parallelize(testDataList)
image.gif

3. 配置网络

网络配置将与标准设置相同。

import org.deeplearning4j.nn.conf.NeuralNetConfiguration
import org.deeplearning4j.nn.api.OptimizationAlgorithm
import org.nd4j.linalg.activations.Activation
import org.deeplearning4j.nn.weights.WeightInit
import org.nd4j.linalg.learning.config.Adam
import org.nd4j.linalg.lossfunctions.LossFunctions
import org.deeplearning4j.nn.conf.inputs.InputType
import org.deeplearning4j.nn.conf._
import org.deeplearning4j.nn.conf.layers._

//----------------------------------
//创建网络配置并进行网络训练
val conf = new NeuralNetConfiguration.Builder()
            .seed(123)
            .cacheMode(CacheMode.DEVICE)
            .updater(new Adam(1e-2))
            .biasUpdater(new Adam(1e-2*2))
            //归一化以防止梯度消失或爆炸
            .gradientNormalization(GradientNormalization.RenormalizeL2PerLayer) 
            .optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT)
            .l1(1e-4)
            .l2(5 * 1e-4)
            .list()
            .layer(0, new ConvolutionLayer.Builder(Array[Int](4, 4), Array[Int](1, 1), Array[Int](0, 0)).name("cnn1").convolutionMode(ConvolutionMode.Same)
                .nIn(3).nOut(64).weightInit(WeightInit.XAVIER_UNIFORM).activation(Activation.RELU)//.learningRateDecayPolicy(LearningRatePolicy.Step)
                .biasInit(1e-2).build())
            .layer(1, new ConvolutionLayer.Builder(Array[Int](4, 4), Array[Int](1, 1), Array[Int](0, 0)).name("cnn2").convolutionMode(ConvolutionMode.Same)
                .nOut(64).weightInit(WeightInit.XAVIER_UNIFORM).activation(Activation.RELU)
                .biasInit(1e-2).build())
            .layer(2, new SubsamplingLayer.Builder(PoolingType.MAX, Array[Int](2, 2)).name("maxpool2").build())
            .layer(3, new ConvolutionLayer.Builder(Array[Int](4, 4), Array[Int](1, 1), Array[Int](0, 0)).name("cnn3").convolutionMode(ConvolutionMode.Same)
                .nOut(96).weightInit(WeightInit.XAVIER_UNIFORM).activation(Activation.RELU)
                .biasInit(1e-2).build())
            .layer(4, new ConvolutionLayer.Builder(Array[Int](4, 4), Array[Int](1, 1), Array[Int](0, 0)).name("cnn4").convolutionMode(ConvolutionMode.Same)
                .nOut(96).weightInit(WeightInit.XAVIER_UNIFORM).activation(Activation.RELU)
                .biasInit(1e-2).build())
            .layer(5, new ConvolutionLayer.Builder(Array[Int](3, 3), Array[Int](1, 1), Array[Int](0, 0)).name("cnn5").convolutionMode(ConvolutionMode.Same)
                .nOut(128).weightInit(WeightInit.XAVIER_UNIFORM).activation(Activation.RELU)
                .biasInit(1e-2).build())
            .layer(6, new ConvolutionLayer.Builder(Array[Int](3, 3), Array[Int](1, 1), Array[Int](0, 0)).name("cnn6").convolutionMode(ConvolutionMode.Same)
                .nOut(128).weightInit(WeightInit.XAVIER_UNIFORM).activation(Activation.RELU)
                .biasInit(1e-2).build())
            .layer(7, new ConvolutionLayer.Builder(Array[Int](2, 2), Array[Int](1, 1), Array[Int](0, 0)).name("cnn7").convolutionMode(ConvolutionMode.Same)
                .nOut(256).weightInit(WeightInit.XAVIER_UNIFORM).activation(Activation.RELU)
                .biasInit(1e-2).build())
            .layer(8, new ConvolutionLayer.Builder(Array[Int](2, 2), Array[Int](1, 1), Array[Int](0, 0)).name("cnn8").convolutionMode(ConvolutionMode.Same)
                .nOut(256).weightInit(WeightInit.XAVIER_UNIFORM).activation(Activation.RELU)
                .biasInit(1e-2).build())
            .layer(9, new SubsamplingLayer.Builder(PoolingType.MAX, Array[Int](2, 2)).name("maxpool8").build())
            .layer(10, new DenseLayer.Builder().name("ffn1").nOut(1024).updater(new Adam(1e-3)).biasInit(1e-3).biasUpdater(new Adam(1e-3*2)).build())
            .layer(11,new DropoutLayer.Builder().name("dropout1").dropOut(0.2).build())
            .layer(12, new DenseLayer.Builder().name("ffn2").nOut(1024).biasInit(1e-2).build())
            .layer(13,new DropoutLayer.Builder().name("dropout2").dropOut(0.2).build())
            .layer(14, new OutputLayer.Builder(LossFunctions.LossFunction.NEGATIVELOGLIKELIHOOD)
                .name("output")
                .nOut(10)
                .activation(Activation.SOFTMAX)
                .build())
            .backprop(true)
            .pretrain(false)
            .setInputType(InputType.convolutional(32, 32, 3))
            .build();
image.gif

4. 配置TrainingMaster

使用 ParameterAveragingTrainingMaster.Builder 来配置TrainingMaster

import org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster

//Spark训练配置: 查看 http://deeplearning4j.org/spark for
//这些配置选项的解释
val tm = new ParameterAveragingTrainingMaster.Builder(batchSize)
    .averagingFrequency(5)
    .workerPrefetchNumBatches(2)
    .batchSizePerWorker(batchSize)
    .build();
image.gif

5. 包装为SparkDl4jMultiLayer

因为网络是一个MultiLayerNetwork配置,所以它被包装在SparkDl4jMultiLayer中。

import org.deeplearning4j.spark.impl.multilayer.SparkDl4jMultiLayer

//创建Spark网络
val sparkNet = new SparkDl4jMultiLayer(jsc, conf, tm)
image.gif

6. 训练网络

在训练数据上调用SparkDl4jMultiLayer#fit。

var x = 0

//执行训练:
for(x <- 0 to 50 ) {
    sparkNet.fit(trainData)
    println("Completed Epoch {}", x)
}
image.gif

7. 评估网络

在训练模型完成后在测试集上调用SparkDl4jMultiLayer#evaluate

//执行评估(分布式)
val evaluation = sparkNet.evaluate(testData)
println("***** Evaluation *****")
println(evaluation.stats())
//删除临时训练文件,现在已经完成了
tm.deleteTempFiles(jsc)
println("***** Example Complete *****")
image.gif

Keras模型配置的分布式训练
要在Keras模型配置上运行分布式训练,需要从Keras模型配置创建一个MultiLayerConfigurationComputationGraphConfiguration。查看此文档了解更多信息,特别是此部分
在scala中,对于“MultiLayerConfiguration”,可以按以下方式进行:

import org.deeplearning4j.nn.modelimport.keras.KerasModelImport

val modelConfig = KerasModelImport.importKerasSequentialConfiguration("PATH TO YOUR JSON FILE")
image.gif

对于 "ComputationGraphConfiguration":

import org.deeplearning4j.nn.modelimport.keras.KerasModelImport

val computationGraphConfig = KerasModelImport.importKerasModelConfiguration("PATH TO YOUR JSON FILE")
image.gif

之后,可以按照这里的步骤操作,跳过步骤3。

下一步做什么?

Javadocs

以下是这里解释的重要类的javadocs:

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 216,651评论 6 501
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 92,468评论 3 392
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 162,931评论 0 353
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 58,218评论 1 292
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 67,234评论 6 388
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,198评论 1 299
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,084评论 3 418
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 38,926评论 0 274
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,341评论 1 311
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 37,563评论 2 333
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 39,731评论 1 348
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,430评论 5 343
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,036评论 3 326
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 31,676评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,829评论 1 269
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 47,743评论 2 368
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 44,629评论 2 354

推荐阅读更多精彩内容