Spark MLlib机器学习开发指南(3)--Pipelines

Spark MLlib机器学习开发指南(3)--Pipelines

翻译自官方文档
如有问题,欢迎留言指正,转载请注明出处

在这个章节,我们介绍管道Pipelines的概念。ML Pipelines提供了一套构建在DataFrame之上的统一的高级API,帮助用户创建和调试实际的机器学习管道。

目录

  • 管道(Pipeline)主要概念
    • DataFrame
    • Pipeline组件
      • 转换器(Transformers)
      • 估计器
    • 管道(Pipeline)组件属性
    • 管道(Pipeline)
      • 怎样工作
      • 细节
      • 参数
    • 保存和加载Pipelines
  • 代码示例
    • 估计器,转换器和参数示例
    • Pipeline示例
    • 模型选择(超参调试)

管道主要概念

MLlib标准的机器学习算法API,使得更容易将多个算法组合成单个管道或工作流。
本节覆盖了Pipelines API关键概念介绍,其中管道(Pipelines)概念主要受到scikit-learn项目的启发

  • DataFrame: 该ML API使用Spark SQL的DataFrame作为ML数据集,可以容纳各种数据类型。例如,DataFrame可以具有存储文本,特征向量,真实标签和预测值
  • Transformer: 转换器是一种可以将一个DataFrame转换为另一个DataFrame的算法。
    例如,ML模型是一个Transformer,它将具有特征的DataFrame转换成具有预测值的DataFrame。
  • Estimator: 估计器是一种可以用DataFrame配合来生成Transformer的算法。
    例如,学习算法是在DataFrame上训练并产生模型的估计器。
  • Pipeline: 管道将多个转换器和估计器连在一起,被指定为一个ML工作流程。
  • Parameter: 所有转换器和估计器现在共享用于指定参数的通用API。

DataFrame

机器学习可以应用于各种各样的数据类型,例如向量,文本,图像和结构化数据。
该API采用Spark SQL的DataFrame来支持各种数据类型。

DataFrame支持许多基本和结构化类型;请参阅Spark SQL数据类型参考,以获取受支持类型的列表。除了Spark SQL指南中列出的类型之外,DataFrame还可以使用ML Vector类型。

可以从常规的RDD隐式或显式创建DataFrame。有关示例,请参阅下面的代码示例和Spark SQL编程指南

DataFrame中的列可以被命名。下面的代码示例使用如“text,” “features,” 和“label.”这样的名称。

Pipeline 组件

Transformers (转换器)

转换器是一种包含特征转换和学习模型的抽象。技​​术上,转换器实现了一个transform()方法,它通过追加一个或多个列来将一个DataFrame转换成另一个。例如:

  • 一个特征变换器可能需要一个DataFrame,读取一列(例如文本),将其映射到一个新的列(例如特征向量)中,并输出一个追加了映射列的新DataFrame
  • 学习模型可能需要一个DataFrame,读取包含特征向量的列,预测每个特征向量的标签,并输出一个新的DataFrame,并追加预测的标签作为新的列。

Estimators (估计器)

一个估计器抽象了学习算法的概念,或者fit(拟合)或训练数据的任何算法。
技术实现上,一个估计器实现一个方法fit(),它接受一个DataFrame并产生一个Model,Model它是一个Transformer。例如,诸如Logistic回归的学习算法是一个估计器,并调用fit()训练一个LogisticRegressionModel,它是一个Model,模型是一个Transformer。

管道组件的属性

转换器(Transformer)的transform和估计器的fit()方法都是无状态的。在将来,可能通过新的概念支持有状态的算法

转换器(Transformer)或者估计器的每个实例都有唯一id,这对指定参数非常有用(下面将会讨论到)

管道(Pipeline)

在机器学习中,通常通过一系列算法来处理和学习数据。例如,一个简单的文本处理工作流可能包含以下几个阶段

  • 切分每个文档的文本到词
  • 将每个文档的词转成数值化特征向量
  • 使用特征向量和标签学习一个预测模型

MLlib表示为像管道一样的工作流,该管道由要按特定顺序运行的一系列流水线阶段(转换器和估计器)组成。
我们将使用这个简单的工作流作为本节中的运行示例。

怎样工作

一个管道包含一系列阶段,每个阶段包含转换器或者估计器。这些阶段按顺序运行,输入的DataFrame在通过每个阶段时被转换。在转换器阶段,transform()方法在DataFrame上被调用。在估计器阶段,fit()方法被调用产生一个估计器(它成为PipelineModel的一部分,或者拟合Pipeline),并且在DataFrame上面调用估计器的transform方法。

下面演示简单的文本文档工作流,下图为训练时使用管道

ML Pipeline Example

上图中,顶行代表一个具有三个阶段的管道。
前两个(Tokenizer和HashingTF)是转换器(蓝色),第三个(Logistic回归)是一个估计器(红色)。
底行表示流经管道的数据,其中柱面表示DataFrames。
在原始DataFrame上调用Pipeline.fit()方法,它具有原始文本文档和标签。
Tokenizer.transform()方法将原始文本文档分割成单词,向DataFrame添加带有单词的新列。
HashingTF.transform()方法将单词列转换为特征向量,将这些向量的新列添加到DataFrame。
现在,由于LogisticRegression是一个估计器,所以Pipeline首先调用LogisticRegression.fit()来生成LogisticRegressionModel。
如果流水线有更多的估计器,那么在将DataFrame传递到下一个阶段之前,它会在DataFrame上调用LogisticRegressionModel的transform()方法。

管道是一个估计器。
因此,在Pipeline的fit()方法运行之后,它会生成一个PipelineModel,它是一个Transformer。
此管道模型在测试时使用;
下图说明了这种用法。


ML PipelineModel Example

在上图中,PipelineModel具有与原始管道相同的步骤,但原始流水线中的所有估计器都已成为转换器。
当在测试数据集上调用PipelineModel的transform()方法时,数据按顺序通过拟合的管道传递。
每个阶段的transform()方法更新数据集并将其传递到下一个阶段。

管道和管道模型有助于确保训练和测试数据通过相同的功能处理步骤。

细节

DAG管道:管道的阶段被指定为有序数组。这里给出的示例全部用于线性管道,即其中每个阶段使用由前一阶段产生管道的数据。
只要数据流图形成有向无环图(DAG),就可以创建非线性流水线。DAG是基于每个阶段的输入和输出列名(通常指定为参数)隐含地指定的。如果管道形成DAG,则必须以拓扑顺序指定阶段。

运行时检查:由于管道可以对具有不同类型的DataFrames进行操作,因此不能使用编译时类型检查。
管道和管道模型是在运行时检查,而不是在实际运行管道之前进行检查。类型检查是使用DataFrame schema完成的,这是DataFrame中列的数据类型的描述。

唯一管道阶段:管道的阶段应该是唯一的实例。例如,同一个实例myHashingTF不应该被插入管道两次,因为流水线阶段必须有唯一的ID。然而,不同的实例myHashingTF1和myHashingTF2(HashingTF类型)都可以放入同一个管道中,因为使用不同的ID创建不同的实例

参数

MLlib的估计器和转换器使用统一的API来指定参数

Param是具有独立文档的命名参数,ParamMap是一组(参数,值)对。

这里有2种给算法指定参数的方法:

1.设置一个实例的参数。例如,如果lr是LogisticRegression的一个实例,可以调用lr.setMaxIter(10)使lr.fit()最多使用10次迭代。该API类似于spark.mllib软件包中使用的API。
2.将ParamMap传递给fit()或transform()方法。任何在ParamMap中的指定参数将覆盖先前通过setter方法指定的参数。

参数属于估计器和转换器的特定实例。例如,如果我们有两个LogisticRegression实例lr1和lr2,那么我们可以使用指定的maxIter参数构建ParamMap:ParamMap(lr1.maxIter - > 10,lr2.maxIter - > 20)。如果在管道中存在两个算法,指定不同的maxIter参数的这将非常有用。

保存和加载Pipelines

通常情况下,将模型或管道保存到磁盘以备以后使用是值得的。在Spark 1.6中,模型导入/导出功能已添加到Pipeline API中。
大多数基本ML模型的转换器都得到很好的支持。请参阅具体的算法的API文档,以查看保存和加载功能是否被支持。

Code examples

本节将给出说明上述功能的示例代码。有关更多信息,请参阅API文档(ScalaJavaPython)。

示例:估计器,转换器和参数
有关API的详细信息,请参阅估计器Scala文档转换器Scala文档Params Scala文档

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row

// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(0.0, 1.1, 0.1)),
  (0.0, Vectors.dense(2.0, 1.0, -1.0)),
  (0.0, Vectors.dense(2.0, 1.3, 1.0)),
  (1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")

// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")

// We may set parameters using setter methods.
lr.setMaxIter(10)
  .setRegParam(0.01)

// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println("Model 1 was fit using parameters: " + model1.parent.extractParamMap)

// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
  .put(lr.maxIter, 30)  // Specify 1 Param. This overwrites the original maxIter.
  .put(lr.regParam -> 0.1, lr.threshold -> 0.55)  // Specify multiple Params.

// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")  // Change output column name.
val paramMapCombined = paramMap ++ paramMap2

// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println("Model 2 was fit using parameters: " + model2.parent.extractParamMap)

// Prepare test data.
val test = spark.createDataFrame(Seq(
  (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
  (0.0, Vectors.dense(3.0, 2.0, -0.1)),
  (1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")

// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
  .select("features", "label", "myProbability", "prediction")
  .collect()
  .foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
    println(s"($features, $label) -> prob=$prob, prediction=$prediction")
  }

完整代码在Spark仓库代码的examples/src/main/scala/org/apache/spark/examples/ml/EstimatorTransformerParamExample.scala

示例:管道(Pipeline)
该示例遵循上图中所示的简单文本文档处理管道。
有关API的详细信息,请参阅管道Scala文档

import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
  (0L, "a b c d e spark", 1.0),
  (1L, "b d", 0.0),
  (2L, "spark f g h", 1.0),
  (3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")

// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
  .setInputCol("text")
  .setOutputCol("words")
val hashingTF = new HashingTF()
  .setNumFeatures(1000)
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("features")
val lr = new LogisticRegression()
  .setMaxIter(10)
  .setRegParam(0.001)
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))

// Fit the pipeline to training documents.
val model = pipeline.fit(training)

// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")

// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")

// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")

// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
  (4L, "spark i j k"),
  (5L, "l m n"),
  (6L, "spark hadoop spark"),
  (7L, "apache hadoop")
)).toDF("id", "text")

// Make predictions on test documents.
model.transform(test)
  .select("id", "text", "probability", "prediction")
  .collect()
  .foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
    println(s"($id, $text) --> prob=$prob, prediction=$prediction")
  }

详细代码位于Spark仓库的examples/src/main/scala/org/apache/spark/examples/ml/PipelineExample.scala

模型选择(超参调试)

使用ML管道的一大优点是超参数调试。有关自动模型选择的更多信息,请参阅ML调试指南

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 212,080评论 6 493
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,422评论 3 385
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 157,630评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,554评论 1 284
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 65,662评论 6 386
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 49,856评论 1 290
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,014评论 3 408
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,752评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,212评论 1 303
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,541评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,687评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,347评论 4 331
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,973评论 3 315
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,777评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,006评论 1 266
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,406评论 2 360
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,576评论 2 349

推荐阅读更多精彩内容