推荐系统之SparkML预测模型构建

1.Spark ML重要概念

1.Spark ML基于什么来处理的?

Spark-Core的核心开发是基于RDD,但是RDD并不是非常的灵活,如果做一个结构化数据处理,还需要转换成DataFrame(在Python当中引出的概念)DataFrame其实就是行对象的RDD加上schema,类似于文本的数据,对这些数据加入schema,做一些结构的转换,可以把它简单地理解为数据库里的一张表,里面有数据,有类型。RDD和DataFrame的关系可以理解为普通文本和表的对应关系。
所以DataFrame处理起来会更加灵活一些,ML就是基于DataFrame进行处理的,来用于学习的一个数据集,所以当我们创建一个DataFrame的时候可以指定一个schem,或者创建一个行对象RDD,可以从已有的RDD转换而来,DF也可以转换成RDD。
DataFrame在ML中有多种类型,除了常见的整型、字符串等,还可以支持图像、特征工程转换的向量Vector。所以DataFrame是Spark ML用来进行训练的数据集。

2.进行机器学习的流程?

前面讲到过,一般机器学习的流程是,先收集数据集,将数据集划分为训练集合测试集,再用训练集训练模型,然后用模型对测试集进行预测。最后通过混淆矩阵,借助于AUC、召回率或准确率来进行模型的评估。
这个流程是可以形成管道的,整体就是个DAG(有向无环图,spark通过DAG来进行调度的)其实整个模型测试的过程就是一个管道,这些管道会有各种各样的组件,每一个步骤便是一个组件,组件可以分成两个类别,第一个是Transformers,用到的函数是transform。它的作用就是进行转换,即DataFrame转成另外一个DataFrame,例如,将原本的数据集拆分成训练集和测试集。对测试集进行预测,预测的过程,也是将原本的测试集(DF)转换成了预测结果集(另外一个DF)。
第二种类型是Estimators,评估器,用到的函数是fit。它的作用是应用在一个DF上生成一个转换器的算法,我们通常的用训练集来训练一个模型(逻辑回归),本质上就是用Estimators的fit方法去做。

2.Spark ML如何工作


上图上部分就是整个管道的操作,下部分就是不同数据类型的变化



上图就是对上一个模型的整个transformer操作了。

3.Spark ML参数和保存

1.参数是所有的转换器和评估器共享一个公共的api,比如正则化参数都是可以共用的
2.参数名Param是一个参数,可以单独去设置某个参数的值
3.ParamMap是一个参数的集合 (parameter, value)
4.传递参数的两种方式:
为实例设置参数
传递ParamMap给fit()或transform()方法
保存和加载管道:可以对模型和管道进行保存

4.Spark ML代码实例

实例1:Estimator, Transformer, and Param

步骤:
准备带标签和特征的数据
创建逻辑回归的评估器
使用setter方法设置参数
使用存储在lr中的参数来训练一个模型
使用paramMap选择指定的参数
准备测试数据
预测结果
代码如下:

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.param.ParamMap
import org.apache.spark.mllib.linalg.{Vector,Vectors}
import org.apache.spark.sql.Row

val training = sqlContext.createDataFrame(Seq(
(1.0,Vectors.dense(0.0,1.1,0.1)),
(0.0,Vectors.dense(2.0,1.0,-1.1)),
(0.0,Vectors.dense(2.0,1.3,1.0)),
(1.0,Vectors.dense(0.0,1.2,-0.5))
)).toDF("label","features")

val lr = new LogisticRegression()
//查看构建模型的默认参数
print(lr.explainParams())

打印运行结果如下图:



从上图可以看到正则化参数默认为0.0,最大迭代次数默认为100.

//设置参数
lr.setMaxIter(10).setRegParam(0.01)
val model = lr.fit(training)
//查看model的参数
model.parent.extractParamMap

打印结果如下图:



可以看到,模型的正则化参数值被修改为0.01,最大迭代次数被修改为10.

//重置参数
val paramMap = ParamMap(lr.maxIter -> 20).
put(lr.maxIter,10).
put(lr.regParam -> 0.1, lr.threshold -> 0.55)
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability")
//参数组合
val paramMapCombined = paramMap ++ paramMap2
val model2 = lr.fit(training,paramMapCombined)
model2.parent.extractParamMap

打印结果如下:



从结果可以看出,被框起来的参数都被修改或者覆盖了。

//测试数据集
val test = sqlContext.createDataFrame(Seq(
(1.0,Vectors.dense(-1.0,1.5,1.3)),
(0.0,Vectors.dense(3.0,2.0,-0.1)),
(1.0,Vectors.dense(0.0,2.2,-1.5))
)).toDF("label","features")
//测试
model.transform(test).collect
//美化打印
model2.transform(test).select("label","features","myProbability","prediction").collect().
foreach{case Row(label:Double,features:Vector,myProbability:Vector,prediction:Double) =>
println(s"($features,$label) -> myProbability=$myProbability,prediction=$prediction")}

模型测试的结果如下:


实例2:管道使用

步骤:
准备训练文档
配置ML管道,包含三个stage:
Tokenizer,HashingTF和lr
安装管道到数据上
保存管道到磁盘
包括安装好的和未安装好的
加载管道
准备测试文档
预测结果

import org.apache.spark.ml.feature.{Tokenizer,HashingTF}
import org.apache.spark.ml.{Pipeline,PipelineModel}

val training = sqlContext.createDataFrame(Seq(
(0L, "a b c d e spark", 1.0),
(1L, "b d", 0.0),
(2L, "spark f g h", 1.0),
(3L, "hadoop mapreduce", 0.0)
)).toDF("id","text","label")

val tokenizer = new Tokenizer().
setInputCol("text").
setOutputCol("words")

val hashingTF = new HashingTF().
setNumFeatures(1000).
setInputCol(tokenizer.getOutputCol).
setOutputCol("features")

val lr = new LogisticRegression().
setMaxIter(10).setRegParam(0.01)

val pipeline = new Pipeline().
setStages(Array(tokenizer,hashingTF,lr))

val model = pipeline.fit(training)

pipeline.save("./sparkML-LRpipeline")
model.save("./sparkML-LRmodel")

val model2 = PipelineModel.load("./sparkML-LRmodel")

val test = sqlContext.createDataFrame(Seq(
(4L, "spark h d e"),
(5L, "a f c"),
(6L, "mapreduce spark"),
(7L, "apache hadoop")
)).toDF("id","text")

model2.transform(test).select("id","text","probability","prediction").collect().
foreach{case Row(id:Long,text:String,probability:Vector,prediction:Double) =>
println(s"($id,$text) -> probability=$probability,prediction=$prediction")}

执行以上两行save代码后,会在当前目录下生成两个文件,这就是保存的模型文件,如下图:


最后模型对测试集的预测打印结果如下:


实例3:通过交叉验证来模型调优

在模型训练过程中,会有一些参数可以选择,比如正则化参数,迭代次数等,但我们自己设定的参数值是不是最优的呢,肯定不是的,那如何选择最优的参数,只能凭借经验来选择可信赖的比较简单的模型,但也不一定是最优参数。所以就需要对模型进行调优了。我们可以给出一些可选的参数,比如迭代次数,给10/100/1000,正则化给0.1/0.01/0.001等,让模型自己去组合选择出最优的参数搭配。
完成参数的选择,我们用到的类为pipeline,基于整体的管道进行调优,而不是给lr模型或者分词单独进行调优。实践如下:
步骤:
准备训练数据
配置ML管道,包含三个stage:
Tokenizer,HashingTF和lr
使用ParamGridBuilder 构造一个参数网格
使用CrossValidator来选择模型和参数
CrossValidator需要一个Estimator(pipeline),一个评估器参数集合,和一个Evaluator(lr就选择二分类评估器)
运行交叉校验,选择最好的参数集
准备测试数据
预测结果

实例4:通过训练校验分类来调优模型

上一个实例的交叉验证是把数据分成多份,每一份把参数组合,对模型计算评分一次。这种方式只需要把每一组参数计算一次就可以了。
而校验分类是自动把数据分成训练集和校验集,例如80%的数据作为训练集,20%的数据作为每一组参数的校验集,每一组参数计算一次。这种方式的使用必须依赖大量数据进行训练,如果训练数据不够,那么所生成的结果是不可信的。这种情况就更适合前面一种交叉验证方式了,对于交叉验证来说,如果数据少一些,没关系,每一组参数会进行多次校验,如果fold=3,会校验3次。所以即使数据量比较少,但是因为计算了多次,3次的结果足够进行评估,选出最优的参数。
在这个实例中,我们使用线性回归进行处理,线性回归主要是用来进行预测的回归算法,这一次用的数据集相对来说大一点。
步骤:
准备训练和测试数据
使用ParamGridBuilder 构造一个参数网格
使用TrainValidationSplit来选择模型和参数
CrossValidator需要一个Estimator,一个评估器参数集合,和一个Evaluator
运行训练校验分离,选择最好的参数
在测试数据上做预测,模型是参数组合中执行最好的一个
数据集如下图:


进入spark-shell前先将数据预处理一下,替换掉所有的"+"符号
sed -i 's/+//g' sample_linear_regression_data.txt

代码如下:

//加载文件
val data = sqlContext.read.format("libsvm").load("./sample_linear_regression_data.txt")
//划分数据集
val Array(training, test) = data.randomSplit(Array(0.9, 0.1), seed=12345)
//创建模型
val lr = new LinearRegression()
//使用ParamGridBuilder 构造一个参数网格
val paramGrid = new ParamGridBuilder().
addGrid(lr.elasticNetParam, Array(0.0,0.5,1.0)).
addGrid(lr.fitIntercept).
addGrid(lr.regParam, Array(0.1,0.01)).
build()
//使用TrainValidationSplit来选择模型和参数
val trainValidationSplit = new TrainValidationSplit().
setEstimator(lr).
setEstimatorParamMaps(paramGrid).
setEvaluator(new RegressionEvaluator).
setTrainRatio(0.8)
//运行交叉校验,选择最好的参数集
  
//预测结果
model.transform(test).select("features","label","prediction").show()

预测结果如下图:


最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 219,869评论 6 508
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 93,716评论 3 396
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 166,223评论 0 357
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 59,047评论 1 295
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 68,089评论 6 395
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 51,839评论 1 308
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 40,516评论 3 420
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 39,410评论 0 276
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 45,920评论 1 319
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 38,052评论 3 340
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 40,179评论 1 352
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 35,868评论 5 346
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 41,522评论 3 331
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 32,070评论 0 22
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 33,186评论 1 272
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 48,487评论 3 375
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 45,162评论 2 356

推荐阅读更多精彩内容