ML Pipelines 提供了一组统一的构建在DataFrame上的高级API用于帮助用户创建和调优机器学习管道
ML Pipelines中的一些概念
MLlib标准化了机器学习算法的api,使多个算法更容易组合到一个单一的Pipeline或工作流中。
-
DataFrame:ML API使用Spark SQL中的DataFrame作为ML的数据集 -
Transformer:Transformer是一种将DataFrame转为另一个DataFrame的算法。比如一个ML 模型是一个将特征DataFrame转为预测DataFrame的Transformer -
Estimator:Estimator是一个能适用于DataFrame并产生Transformer的算法。比如 学习算法是一种训练DataFrame并且产生一个模型的Estimator -
Pipeline:Pipeline用于链接多个Estimator和Transformer以形成一个完整的工作流 -
Parameter:Estimator和Transformer的通用Parameter API
DataFrame
机器学习可以被用于各式各样的数据类型,比如向量,文本,图片和结构化数据。这些都可以使用DataFrame表示
Pipeline components
Transformers
Transformers是对特征转化和学习模型的抽象。一般一个Transformer实现了 transform()方法用于将一个DataFrame转化另一个DataFrame(一般是在原DataFrame上添加一些列实现)。
- 一个
feature transformer接收一个DataFame,读取一列(eg:text),将其map为一个新的列(eg.,feature vectors)然后将新的列添加到DataFrame上作为输出 - 一个
learning model接收一个DataFrame作为输入,读取包含feature vectors的列,为每个特征向量预测label,让后将预测的label作为新的列添加到输出DataFrame上
Estimators
Estimator是对学习算法和数据训练算法的抽象,一般一个Estimator实现了fit()方法,它接收一个DataFrame并产生一个Model(Transformer)。比如LogisticRegression是一个Estimator,通过调用fit()训练出一个LogisticRegressionModel,这个Model是一个Transformer
Properties of pipeline components
目前
Transformer.transform()和 Estimator.fit()都是无状态的
每个Transformer和 Estimator都有一个唯一的ID,方便调参
Pipeline
在机器学习对数据进行处理和学习一般需要一系列的算法,比如一个简单的文本处理工作流可能包含如下几个阶段:
- 将文本拆分为单词
- 将单词转为特征向量
- 使用特征向量和标签进行预测模型的学习
MLlib使用Pipeline表示这种工作流,它包含了一系列 以一定顺序运行的PipelineStages(Transformer或 Estimator)
How it works
Pipeline 的每个阶段由 Transformer或Estimator构成。这些阶段按一定的顺序运行,并且在每个阶段都对输入的DataFrame做转化。对于Transformer阶段,在DataFrame上调用transform() 方法。对于Estimator阶段,fit()方法被调用用于产生一个Transformer(which becomes part of the PipelineModel, or fitted Pipeline)
简单的文本处理工作流在training time的Pipeline

简单的文本处理工作流在
test time的Pipeline
Code examples
Example: Estimator, Transformer, and Param
// $example on$
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.linalg.{Vector, Vectors}
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.sql.Row
// $example off$
import org.apache.spark.sql.SparkSession
object EstimatorTransformerParamExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("EstimatorTransformerParamExample")
.master("local[16]")
.getOrCreate()
// $example on$
// Prepare training data from a list of (label, features) tuples.
val training = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5))
)).toDF("label", "features")
// Create a LogisticRegression instance. This instance is an Estimator.
val lr = new LogisticRegression()
// Print out the parameters, documentation, and any default values.
println(s"LogisticRegression parameters:\n ${lr.explainParams()}\n")
// We may set parameters using setter methods.
lr.setMaxIter(10)
.setRegParam(0.01)
// Learn a LogisticRegression model. This uses the parameters stored in lr.
val model1 = lr.fit(training)
// Since model1 is a Model (i.e., a Transformer produced by an Estimator),
// we can view the parameters it used during fit().
// This prints the parameter (name: value) pairs, where names are unique IDs for this
// LogisticRegression instance.
println(s"Model 1 was fit using parameters: ${model1.parent.extractParamMap}")
// We may alternatively specify parameters using a ParamMap,
// which supports several methods for specifying parameters.
val paramMap = ParamMap(lr.maxIter -> 20)
.put(lr.maxIter, 30) // Specify 1 Param. This overwrites the original maxIter.
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) // Specify multiple Params.
// One can also combine ParamMaps.
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") // Change output column name.
val paramMapCombined = paramMap ++ paramMap2
// Now learn a new model using the paramMapCombined parameters.
// paramMapCombined overrides all parameters set earlier via lr.set* methods.
val model2 = lr.fit(training, paramMapCombined)
println(s"Model 2 was fit using parameters: ${model2.parent.extractParamMap}")
// Prepare test data.
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(3.0, 2.0, -0.1)),
(1.0, Vectors.dense(0.0, 2.2, -1.5))
)).toDF("label", "features")
// Make predictions on test data using the Transformer.transform() method.
// LogisticRegression.transform will only use the 'features' column.
// Note that model2.transform() outputs a 'myProbability' column instead of the usual
// 'probability' column since we renamed the lr.probabilityCol parameter previously.
model2.transform(test)
.select("features", "label", "myProbability", "prediction")
.collect()
.foreach { case Row(features: Vector, label: Double, prob: Vector, prediction: Double) =>
println(s"($features, $label) -> prob=$prob, prediction=$prediction")
}
// $example off$
spark.stop()
}
}
Example: Pipeline
// $example on$
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
// $example off$
import org.apache.spark.sql.SparkSession
object PipelineExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder
.appName("PipelineExample")
.master("local[16]")
.getOrCreate()
// $example on$
// Prepare training documents from a list of (id, text, label) tuples.
val training = spark.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id", "text", "label")
// Configure an ML pipeline, which consists of three stages: tokenizer, hashingTF, and lr.
val tokenizer = new Tokenizer()
.setInputCol("text")
.setOutputCol("words")
val hashingTF = new HashingTF()
.setNumFeatures(1000)
.setInputCol(tokenizer.getOutputCol)
.setOutputCol("features")
val lr = new LogisticRegression()
.setMaxIter(10)
.setRegParam(0.001)
val pipeline = new Pipeline()
.setStages(Array(tokenizer, hashingTF, lr))
// Fit the pipeline to training documents.
val model = pipeline.fit(training)
// Now we can optionally save the fitted pipeline to disk
model.write.overwrite().save("/tmp/spark-logistic-regression-model")
// We can also save this unfit pipeline to disk
pipeline.write.overwrite().save("/tmp/unfit-lr-model")
// And load it back in during production
val sameModel = PipelineModel.load("/tmp/spark-logistic-regression-model")
// Prepare test documents, which are unlabeled (id, text) tuples.
val test = spark.createDataFrame(Seq(
(4L, "spark i j k"),
(5L, "l m n"),
(6L, "spark hadoop spark"),
(7L, "apache hadoop")
)).toDF("id", "text")
// Make predictions on test documents.
model.transform(test)
.select("id", "text", "probability", "prediction")
.collect()
.foreach { case Row(id: Long, text: String, prob: Vector, prediction: Double) =>
println(s"($id, $text) --> prob=$prob, prediction=$prediction")
}
// $example off$
spark.stop()
}
}