1.Spark ML重要概念
1.Spark ML基于什么来处理的?
Spark-Core的核心开发是基于RDD,但是RDD并不是非常的灵活,如果做一个结构化数据处理,还需要转换成DataFrame(在Python当中引出的概念)DataFrame其实就是行对象的RDD加上schema,类似于文本的数据,对这些数据加入schema,做一些结构的转换,可以把它简单地理解为数据库里的一张表,里面有数据,有类型。RDD和DataFrame的关系可以理解为普通文本和表的对应关系。
所以DataFrame处理起来会更加灵活一些,ML就是基于DataFrame进行处理的,来用于学习的一个数据集,所以当我们创建一个DataFrame的时候可以指定一个schem,或者创建一个行对象RDD,可以从已有的RDD转换而来,DF也可以转换成RDD。
DataFrame在ML中有多种类型,除了常见的整型、字符串等,还可以支持图像、特征工程转换的向量Vector。所以DataFrame是Spark ML用来进行训练的数据集。
2.进行机器学习的流程?
前面讲到过,一般机器学习的流程是,先收集数据集,将数据集划分为训练集合测试集,再用训练集训练模型,然后用模型对测试集进行预测。最后通过混淆矩阵,借助于AUC、召回率或准确率来进行模型的评估。
这个流程是可以形成管道的,整体就是个DAG(有向无环图,spark通过DAG来进行调度的)其实整个模型测试的过程就是一个管道,这些管道会有各种各样的组件,每一个步骤便是一个组件,组件可以分成两个类别,第一个是Transformers,用到的函数是transform。它的作用就是进行转换,即DataFrame转成另外一个DataFrame,例如,将原本的数据集拆分成训练集和测试集。对测试集进行预测,预测的过程,也是将原本的测试集(DF)转换成了预测结果集(另外一个DF)。
第二种类型是Estimators,评估器,用到的函数是fit。它的作用是应用在一个DF上生成一个转换器的算法,我们通常的用训练集来训练一个模型(逻辑回归),本质上就是用Estimators的fit方法去做。
2.Spark ML如何工作
上图上部分就是整个管道的操作,下部分就是不同数据类型的变化
上图就是对上一个模型的整个transformer操作了。
3.Spark ML参数和保存
1.参数是所有的转换器和评估器共享一个公共的api,比如正则化参数都是可以共用的
2.参数名Param是一个参数,可以单独去设置某个参数的值
3.ParamMap是一个参数的集合 (parameter, value)
4.传递参数的两种方式:
为实例设置参数
传递ParamMap给fit()或transform()方法
保存和加载管道:可以对模型和管道进行保存
4.Spark ML代码实例
实例1:Estimator, Transformer, and Param
步骤:
准备带标签和特征的数据
创建逻辑回归的评估器
使用setter方法设置参数
使用存储在lr中的参数来训练一个模型
使用paramMap选择指定的参数
准备测试数据
预测结果
代码如下:
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector,Vectors}
import org.apache.spark.sql.Row
val training = sqlContext.createDataFrame(Seq(
(1.0,Vectors.dense(0.0,1.1,0.1)),
(0.0,Vectors.dense(2.0,1.0,-1.1)),
(0.0,Vectors.dense(2.0,1.3,1.0)),
(1.0,Vectors.dense(0.0,1.2,-0.5))
)).toDF("label","features")
val lr = new LogisticRegression()
//查看构建模型的默认参数
print(lr.explainParams())
打印运行结果如下图:
从上图可以看到正则化参数默认为0.0,最大迭代次数默认为100.
//设置参数
lr.setMaxIter(10).setRegParam(0.01)
val model = lr.fit(training)
//查看model的参数
model.parent.extractParamMap
打印结果如下图:
可以看到,模型的正则化参数值被修改为0.01,最大迭代次数被修改为10.
//重置参数
val paramMap = ParamMap(lr.maxIter -> 20).
put(lr.maxIter,10).
put(lr.regParam -> 0.1, lr.threshold -> 0.55)
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")
//参数组合
val paramMapCombined = paramMap ++ paramMap2
val model2 = lr.fit(training,paramMapCombined)
model2.parent.extractParamMap
打印结果如下:
从结果可以看出,被框起来的参数都被修改或者覆盖了。
//测试数据集
val test = sqlContext.createDataFrame(Seq(
(1.0,Vectors.dense(-1.0,1.5,1.3)),
(0.0,Vectors.dense(3.0,2.0,-0.1)),
(1.0,Vectors.dense(0.0,2.2,-1.5))
)).toDF("label","features")
//测试
model.transform(test).collect
//美化打印
model2.transform(test).select("label","features","myProbability","prediction").collect().
foreach{case Row(label:Double,features:Vector,myProbability:Vector,prediction:Double) =>
println(s"($features,$label) -> myProbability=$myProbability,prediction=$prediction")}
模型测试的结果如下:
实例2:管道使用
步骤:
准备训练文档
配置ML管道,包含三个stage:
Tokenizer,HashingTF和lr
安装管道到数据上
保存管道到磁盘
包括安装好的和未安装好的
加载管道
准备测试文档
预测结果
import org.apache.spark.ml.feature.{Tokenizer,HashingTF}
import org.apache.spark.ml.{Pipeline,PipelineModel}
val training = sqlContext.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id","text","label")
val tokenizer = new Tokenizer().
setInputCol("text").
setOutputCol("words")
val hashingTF = new HashingTF().
setNumFeatures(1000).
setInputCol(tokenizer.getOutputCol).
setOutputCol("features")
val lr = new LogisticRegression().
setMaxIter(10).setRegParam(0.01)
val pipeline = new Pipeline().
setStages(Array(tokenizer,hashingTF,lr))
val model = pipeline.fit(training)
pipeline.save("./sparkML-LRpipeline")
model.save("./sparkML-LRmodel")
val model2 = PipelineModel.load("./sparkML-LRmodel")
val test = sqlContext.createDataFrame(Seq(
(4L, "spark h d e"),
(5L, "a f c"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
)).toDF("id","text")
model2.transform(test).select("id","text","probability","prediction").collect().
foreach{case Row(id:Long,text:String,probability:Vector,prediction:Double) =>
println(s"($id,$text) -> probability=$probability,prediction=$prediction")}
执行以上两行save代码后,会在当前目录下生成两个文件,这就是保存的模型文件,如下图:
最后模型对测试集的预测打印结果如下:
实例3:通过交叉验证来模型调优
在模型训练过程中,会有一些参数可以选择,比如正则化参数,迭代次数等,但我们自己设定的参数值是不是最优的呢,肯定不是的,那如何选择最优的参数,只能凭借经验来选择可信赖的比较简单的模型,但也不一定是最优参数。所以就需要对模型进行调优了。我们可以给出一些可选的参数,比如迭代次数,给10/100/1000,正则化给0.1/0.01/0.001等,让模型自己去组合选择出最优的参数搭配。
完成参数的选择,我们用到的类为pipeline,基于整体的管道进行调优,而不是给lr模型或者分词单独进行调优。实践如下:
步骤:
准备训练数据
配置ML管道,包含三个stage:
Tokenizer,HashingTF和lr
使用ParamGridBuilder 构造一个参数网格
使用CrossValidator来选择模型和参数
CrossValidator需要一个Estimator(pipeline),一个评估器参数集合,和一个Evaluator(lr就选择二分类评估器)
运行交叉校验,选择最好的参数集
准备测试数据
预测结果
实例4:通过训练校验分类来调优模型
上一个实例的交叉验证是把数据分成多份,每一份把参数组合,对模型计算评分一次。这种方式只需要把每一组参数计算一次就可以了。
而校验分类是自动把数据分成训练集和校验集,例如80%的数据作为训练集,20%的数据作为每一组参数的校验集,每一组参数计算一次。这种方式的使用必须依赖大量数据进行训练,如果训练数据不够,那么所生成的结果是不可信的。这种情况就更适合前面一种交叉验证方式了,对于交叉验证来说,如果数据少一些,没关系,每一组参数会进行多次校验,如果fold=3,会校验3次。所以即使数据量比较少,但是因为计算了多次,3次的结果足够进行评估,选出最优的参数。
在这个实例中,我们使用线性回归进行处理,线性回归主要是用来进行预测的回归算法,这一次用的数据集相对来说大一点。
步骤:
准备训练和测试数据
使用ParamGridBuilder 构造一个参数网格
使用TrainValidationSplit来选择模型和参数
CrossValidator需要一个Estimator,一个评估器参数集合,和一个Evaluator
运行训练校验分离,选择最好的参数
在测试数据上做预测,模型是参数组合中执行最好的一个
数据集如下图:
进入spark-shell前先将数据预处理一下,替换掉所有的"+"符号
sed -i 's/+//g' sample_linear_regression_data.txt
代码如下:
//加载文件
val data = sqlContext.read.format("libsvm").load("./sample_linear_regression_data.txt")
//划分数据集
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed=12345)
//创建模型
val lr = new LinearRegression()
//使用ParamGridBuilder 构造一个参数网格
val paramGrid = new ParamGridBuilder().
addGrid(lr.elasticNetParam, Array(0.0,0.5,1.0)).
addGrid(lr.fitIntercept).
addGrid(lr.regParam, Array(0.1,0.01)).
build()
//使用TrainValidationSplit来选择模型和参数
val trainValidationSplit = new TrainValidationSplit().
setEstimator(lr).
setEstimatorParamMaps(paramGrid).
setEvaluator(new RegressionEvaluator).
setTrainRatio(0.8)
//运行交叉校验,选择最好的参数集
//预测结果
model.transform(test).select("features","label","prediction").show()
预测结果如下图: