聚类
Spark的机器学习库分成两类,一类是针对RDD的,在org.apache.spark.mllib
包下,另一类则是针对DataFrame的,在org.apache.spark.ml
包下。
本次实践从最简单的数据入手,找出一个地点的所有发生重量,然后使用KMeans算法聚类分析,先上RDD版本的。
需要引入的库是
import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
实际调用的代码是
val model = KMeans.train(weightRDD, 3, 20)
val cost = model.computeCost(weightRDD)
val lines = mutable.ArrayBuffer.empty[String]
model.clusterCenters.foreach(v => {
lines.append(v.toString.replace("[", "").replace("]", ""))
})
KMeans.train()
接受三个参数,含义依次是:RDD[Vector]
类型的训练数据,聚类的中心点个数,计算的迭代次数。通过之前的分析,由于大部分地点发生的重量都有1到2个密集出现的数值范围,所以姑且先把中心点个数定为3。官方给出的例子里面的计算迭代次数是20,实践中试过100,50,20三种取值,发现cost
结果几乎相同,可以理解为已经达到最优解,所以判断20次足够。model.clusterCenters
得到的是一组Vector
就是第一个参数中所有Vector
的几个中心位置,维数与传入的Vector
相同。本例中,传入的Vector
很无耻的只有1维,所以传出的结果也是1维的:
十九区 5977.1371889452075 ArrayBuffer(11.102791095890407, 4.854385150812066, 17.634308681672035)
结果依次是地点名称,各个点到中心点的距离之和以及三个中心点数值。
效果检验
聚类的效果如何?需要写个小算法检验一下,对于训练数据集中每个重量,找到其和多个中心点之间的最小距离,和一个阈值比对,如果小于阈值即合法。阈值的选取应该是有讲究的,目前为了简化问题,选择一个定值。本例中,阈值为2kg。
之前已经把聚类的最新结果写入数据库中,表结构大概如下:
读取聚类中心点的代码如下:
val teamCluserMap = spark.read.jdbc(mysqlHelper.DB_URL_W, "team_weight_centers",
Array("org_id = " + orgId), mysqlHelper.PROPERTIES)
.select("team_name", "centers").rdd.map(item => {
val centers = item.getAs[String]("centers").split(",").map(_ toDouble);
(item.getAs[String]("team_name"), centers)
}) collect() toMap
这样就得到了一个Map<String,Array<Double>>
用来存放各个地点的中心点数据。
比对的时候代码如下:
val judgeRDD = weightDataSet.filter(item => item._1 == teamName).map(item => {
val centers = teamCluserMap.get(teamNameShort).get
var dis = Double.MaxValue
for (c <- centers) {
dis = Math.min(Math.abs(item._2 - c), dis)
}
if (dis < 2) ("Y", 1)
else ("N", 1)
}).rdd.reduceByKey((A, B) => A + B)
var yCount=0
var nCount=0
for(item<-judgeRDD.collect()) {
println(teamNameShort+"\t"+item._1+"\t"+item._2)
if(item._1 == "Y") yCount+=item._2
if(item._1 == "N") nCount+=item._2
}
println("\t\t"+yCount.toDouble/(yCount.toDouble+nCount.toDouble))
将输出结果贴到excel中可以了。
优化聚类结果
现在可以迭代聚类-检验过程,进行参数调优了。
在保持阈值不变、聚类迭代次数不变的情况下,分别尝试3、4、5个中心点的时候,匹配率明显上升中,还是以上文中的地点为例:
对于定值的阈值而言,显然中心点数越多,覆盖率越大。但这个不符合实际,我们需要引入一些变化,采用每个样本点到中心点的平均距离作为阈值看起来是一个合理的方法:中心点越多,平均距离就越小,阈值也越小;中心点越多,平均距离就越大,阈值也就越大。
聚类的算法变化如下:
val cost = model.computeCost(weightRDD)
变成了
val cost = model.computeCost(weightRDD)/weightRDD.count()
这样重新运行检验时,样本上文中地点的5个中心点的覆盖率为0.785822021116138,4个中心点覆盖率为0.950980392156862,3个中心点的覆盖率则为0.981900452488687,更进一步的,2个中心点的覆盖率达到了0.993966817496229,可想而知1个中心点的覆盖率应该能达到1,但显然没有意义。
所以我们需要对每个地点进行训练,调整参数使其覆盖率在0.95~0.96之间,这样才能得到比较符合实际的聚类。
新的ml库
一开始我们说过,新的ml库是基于DataFrame
的,而且Spark的官方文档上说到3.0的时候会取消基于RDD
的机器学习库,所以使用新库是大势所趋。
在引入库的时候就要注意了:
import org.apache.spark.ml.clustering.KMeans
import org.apache.spark.ml.linalg.Vectors
同时聚类算法的定义变成了:
val kmeans = new KMeans().setK(3).setSeed(1L)
中心点还是3个,随机数种子是1,如果不指定,源码指示会给出参数类的hashcode作为默认值:
final val seed: LongParam = new LongParam(this, "seed", "random seed")
setDefault(seed, this.getClass.getName.hashCode.toLong)
适应算法需要的DataFrame
需要包含名为“features”的列,列的每行记录都是一个Vector
,这样的数据集才能被算法识别。所以我们需要先创建类型是Vector
的列,再将其命名为“features”。
val weightDataset = medicalWasteDataFrame.map(record => {
(rfidCardMap.get(record.getAs[String]("team_id")) toString,
Vectors.dense(record.getAs[Double]("mw_weight")))
}) cache()
以上得到的是Dataset[(String,Vector)]
val weightDataSetFilted = weightDataset
//隐藏了一些敏感的条件过滤语句...
.rdd.toDF("team_name","features").cache()
以上先将DataSet
转成RDD
,再从RDD
转成DataFrame
,这样我们就有了算法可用的训练数据集了。
val model = kmeans.fit(weightDataSetFilted)
其他的就和之前的写法没有区别。