SKIL/工作流程/执行作业

执行作业

作业是在后台对连接的SKIL资源运行的计算。作业可以有两种类型:

  1. 训练
  2. 推理

运行作业

完成将外部资源连接到skil之后,可以通过以下两种方式对其执行训练/推理作业。

A. CLI
B. REST 端

skil jobs命令管理通过SKIL CLI执行作业的工作流程。等效的REST端点也可用。
作业工作流程如下:

  1. 创建一个作业。
  2. 提交在步骤1中创建的作业以运行。作业在后台开始运行。
  3. 定期检查正在运行的作业的状态。
  4. 从已完成的作业下载输出文件。
  5. 从作业列表中删除已完成的作业(可选)。

可以使用SKIL同时创建和运行多个作业。它们的执行顺序将取决于底层计算资源。

1. 创建作业

对于创建作业,skil提供以下内容:
cli:skil jobs create<args>。
REST端点:post -/jobs/<type>。其中<type>指的是作业类型,可以是训练或推理。
两种变体的格式如下:

cli

skil jobs create --type <training/inference> --storageResourceId <storage_resource_id> --computeResourceId <compute_resource_id> --jobArgs <job_arguments> --outputFileName <output_file_name>
image.gif

REST - cURL

curl -d '{"computeResourceId": <compute_resource_id>, "storageResourceId": <storage_resource_id>, "jobArgs": "<skil_spark_main_args>", "outputFileName": "<output_file_name>"}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<type>

# <type> => one of ["training", "inference"]
image.gif

参数如下:

| 参数 | 详情 | 类型 |
| computeResourceId | 计算资源的ID。 | Long |
| storageResourceId | 存储资源的ID。 | Long |
| jobArgs |

作业参数

示例:

<pre>"jobArgs":"-mo /var/skil/neuralnet.zip -tm /var/skil/parameteraveraging.json -dsp io.skymind.skil.train.spark.MnistProvider --evalType evaluation --numEpochs 1 --outputPath /tmp/output5049941383652659983.zip"</pre>

| String |
| outputFileName | 从作业生成的输出文件应该具有的名称。 | String |
| type | 作业类型["training", "inference"]之一 | String |

2.运行工作
要运行作业,请使用以下任一项:
cli:skil jobs run--id<jobid>。
REST端点:post-/jobs/<id>/run。创建作业时会获取作业ID。
它们的格式是:

CLI

skil jobs run --id <jobId>
image.gif

REST 端

curl -d '{}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<id>/run
image.gif

参数如下

| 参数 | 详情 | 类型 |
| id | 作业ID,创建作业后获得 | Long |

3.获取作业状态
注意
由YARM资源支持的作业目前不支持状态检查。
要接收作业状态,请使用以下任一项:
cli:skil jobs status --id<jobid>。
REST端点:get-/jobs/<id>/refresh。创建作业时会获得作业ID。
它们的格式是:

skil jobs status --id <jobId>
image.gif

REST 端

curl -d '{}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<id>/refresh
image.gif

参数如下

| 参数 | 详情 | 类型 |
| id | 作业ID,创建作业后获得 | Long |

4.从作业中获取输出
目前,只有通过REST端点才支持下载作业的输出。
注意
只有当作业的运行状态为“完成”时,此操作才有效。
相关端点及其格式如下:
REST端点:post -/jobs/<id>/outputfile。创建作业时会获取作业ID。

REST 端

curl -d '{"localDownloadPath": "<local_download_path>"}' -H "Authorization: Bearer <auth_token>" -H "Content-Type: application/json" -X POST http://localhost:9008/jobs/<id>/outputfile
image.gif

参数如下

| 参数 | 详情 | 类型 |
| localDownloadPath | 要下载输出的本地文件路径。 | String |
| id | 作业ID,创建作业后获得 | Long |

5.删除完成的作业(可选)
要删除作业,请使用以下任一项:
cli:skil jobs rm --id<jobid>。
REST端点:DELETE -http://localhost:9008/jobs/<id>。创建作业时会获取作业ID。
它们的格式是:

CLI

skil jobs rm --id <jobId>
image.gif

REST 端

curl -H "Authorization: Bearer <auth_token>" -X DELETE http://localhost:9008/jobs/<id>
image.gif

参数如下

| 参数 | 详情 | 类型 |
| id | 作业ID,创建作业后获得 | Long |

创建并运行作业同样可以在笔记本中执行如下示例代码来完成

import io.skymind.auth.JWTUtil;
import io.skymind.jobs.client.JobClient;
import io.skymind.jobs.model.JobModel;
import io.skymind.jobs.model.JobRun;
import io.skymind.jobs.model.JobType;
import io.skymind.resource.client.SKILResourceClient;
import io.skymind.resource.model.Resource;
import io.skymind.resource.model.subtypes.ResourceDetails;
import io.skymind.resource.model.subtypes.compute.SparkResourceDetails;
import io.skymind.resource.model.subtypes.storage.HDFSResourceDetails;
import io.skymind.skil.train.spark.MnistKeyedProvider;
import org.apache.spark.storage.StorageLevel;
import org.datavec.api.transform.serde.JsonMappers;
import org.deeplearning4j.nn.api.OptimizationAlgorithm;
import org.deeplearning4j.nn.conf.MultiLayerConfiguration;
import org.deeplearning4j.nn.conf.NeuralNetConfiguration;
import org.deeplearning4j.nn.conf.inputs.InputType;
import org.deeplearning4j.nn.conf.layers.ConvolutionLayer;
import org.deeplearning4j.nn.conf.layers.DenseLayer;
import org.deeplearning4j.nn.conf.layers.OutputLayer;
import org.deeplearning4j.nn.conf.layers.SubsamplingLayer;
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
import org.deeplearning4j.nn.weights.WeightInit;
import org.deeplearning4j.spark.impl.paramavg.ParameterAveragingTrainingMaster;
import org.deeplearning4j.util.ModelSerializer;
import org.nd4j.linalg.activations.Activation;
import org.nd4j.linalg.learning.config.Nesterovs;
import org.nd4j.linalg.lossfunctions.LossFunctions;
import com.mashape.unirest.http.Unirest;
import scala.collection.JavaConversions._

import java.io.File;
import java.io.IOException;
import java.util.List;

      def getTrainingMasterConfFile(nameNodePort: Int, folder: File): File = {
        val parameterAveragingTrainingMaster: ParameterAveragingTrainingMaster =
          new ParameterAveragingTrainingMaster.Builder(100)
            .batchSizePerWorker(1000)
            .averagingFrequency(3)
            .exportDirectory("hdfs://localhost:" + nameNodePort + new File(
              folder,
              "exportdir").getAbsolutePath)
            .workerPrefetchNumBatches(10)
            .storageLevel(StorageLevel.DISK_ONLY)
            .build()
        val jsonWriteParamAveraging: File =
          new File(folder, "parameteraveraging.json")
        JsonMappers.getMapper
          .writeValue(jsonWriteParamAveraging, parameterAveragingTrainingMaster)
        jsonWriteParamAveraging
      }

      def getModelFile(folder: File): File = {
        val builder
          : MultiLayerConfiguration.Builder = // Training iterations as above
          new NeuralNetConfiguration.Builder()
            .seed(230)
            .l2(0.0005)
            .weightInit(WeightInit.XAVIER)
            .optimizationAlgo(OptimizationAlgorithm.STOCHASTIC_GRADIENT_DESCENT)
            .updater(Nesterovs.builder().learningRate(0.01).momentum(0.9).build())
            .list()
            .layer(0,
                   new ConvolutionLayer.Builder(5, 5)
                     .nIn(1)
                     .stride(1, 1)
                     .nOut(20)
                     .activation(Activation.IDENTITY)
                     .build())
            .layer(1,
                   new SubsamplingLayer.Builder(SubsamplingLayer.PoolingType.MAX)
                     .kernelSize(2, 2)
                     .stride(2, 2)
                     .build())
            .layer(2,
                   new ConvolutionLayer.Builder(5, 5)
                     .stride(1, 1)
                     .nOut(50)
                     .activation(Activation.IDENTITY)
                     .build())
            .layer(3,
                   new SubsamplingLayer.Builder(SubsamplingLayer.PoolingType.MAX)
                     .kernelSize(2, 2)
                     .stride(2, 2)
                     .build())
            .layer(4,
                   new DenseLayer.Builder()
                     .activation(Activation.RELU)
                     .nOut(500)
                     .build())
            .layer(5,
                   new OutputLayer.Builder(
                     LossFunctions.LossFunction.NEGATIVELOGLIKELIHOOD)
                     .nOut(10)
                     .activation(Activation.SOFTMAX)
                     .build())
            .setInputType( //See note below
              InputType.convolutionalFlat(28, 28, 1))
        val network: MultiLayerNetwork = new MultiLayerNetwork(builder.build())
        network.init()
        val neuralNet: File = new File(folder, "neuralnet.zip")
        ModelSerializer.writeModel(network, neuralNet, true)
        neuralNet
      }

    def main() {
        val localSparkHome: String = "/opt/spark"
        val sparkMasterUrl: String = "spark://localhost:7077"
        val hadoopNamenodeHost: String = "localhost"
        val hadoopNamenodePort: Int = 8020
        val SKILURL: String = "http://localhost:9008"
        val JOBS_WORK_DIR: String = "/tmp/skil-jobs"

        val resourceClient: SKILResourceClient = new SKILResourceClient(SKILURL)
        resourceClient.setAuthToken(JWTUtil.generateSystemToken())
        val jobClient: JobClient = new JobClient(SKILURL)
        jobClient.setAuthToken(JWTUtil.generateSystemToken())

        // cleanup all earlier jobs and resources
        val jobs: List[JobModel] = jobClient.getAllJobs
        for (job <- jobs) {
          jobClient.deleteJob(job.getJobId)
        }
        val resources: List[Resource] = resourceClient.getResources
        println(resources)
        for (resource <- resources) {
          resourceClient.deleteResource(resource.getResourceId)
        }

        // Create the compute and storage resources
        val sparkResourceDetails = new SparkResourceDetails(localSparkHome, sparkMasterUrl);
        val hdfsResourceDetails = new HDFSResourceDetails(hadoopNamenodeHost, String.valueOf(hadoopNamenodePort));

        // add the resources to the database
        val computeResource = resourceClient.addResource("SKIL YARN", sparkResourceDetails, "");
        val storageResource = resourceClient.addResource("SKIL HDFS", hdfsResourceDetails, "");

        val jobsFolder = new File(JOBS_WORK_DIR);
        if (!jobsFolder.exists()) jobsFolder.mkdirs();
        val outputPath = new File(jobsFolder.getAbsolutePath(), "output.zip").getAbsolutePath();

        // create a new SKIL training job using these resources
        val jobArgs: Array[String] = Array(
                "--skil.spark.master", "local[*]",
                "--skil.spark.deploy-mode", "client",
                "--skil.spark.conf", "spark.executor.extraJavaOptions=-Dorg.bytedeco.javacpp.maxbytes=6G",
                "--skil.spark.executor-memory", "1g",
                "--skil.spark.total-executor-cores", String.valueOf(1),
                "--skil.spark.conf", "spark.hadoop.fs.defaultFS=hdfs://" + hadoopNamenodeHost + ":" + hadoopNamenodePort,
                "-mo", getModelFile(jobsFolder).getAbsolutePath(),
                "-tm", getTrainingMasterConfFile(hadoopNamenodePort, jobsFolder).getAbsolutePath(),
                "-kdsp",classOf[MnistKeyedProvider].getName,
                "--evalType", "evaluation",
                "--numEpochs", "1",
                "--outputPath", outputPath
        );

        println("jobArgs: " + jobArgs.mkString(" "));
        val testTrainingJob = jobClient.createJob(JobType.TRAINING, computeResource.getResourceId(), storageResource.getResourceId(), jobArgs.mkString(" "));
        println("job created:" + testTrainingJob.toString())
        // run the job

        Unirest.setTimeouts(0, 0);
        var jobEntity = jobClient.runJob(testTrainingJob.getJobId());
        Unirest.setTimeouts(10000, 60000);
        // check the status immediately after submission
        println("jobEntity started running:" + jobEntity.toString());

        // check the status after 90 seconds
        Thread.sleep(90000);
        jobEntity = jobClient.getRunStatus(jobEntity.getRunId());
        println("jobEntity after 90 seconds:" + jobEntity.toString());

        // cleanup
        jobClient.deleteJob(testTrainingJob.getJobId());
        resourceClient.deleteResource(computeResource.getResourceId());
        resourceClient.deleteResource(storageResource.getResourceId());
    }

main()
image.gif
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容