Spark 分布式实现贝叶斯判别
贝叶斯公式
A是事件Ω中的一个事件,则在A给定的条件下,事件Bi的条件概率如下:
Bi 通常视为A发生的”原因“,P(Bi)称为先验概率(主观概率),表示各种原因发生的可能性大小;P(Bi|A)(i=1,2...)则反映当出现结果A之后,再对各种“原因”概率的新认识,故称后验概率。
一个小案例
大家都知道狼来了的故事,我们就有贝叶斯的思想来解释一下这个故事:
在最开始的时候,大家对于放羊娃的认识不够深刻,主观上认为放羊娃说真话(记为事件B1)和说假话(记为事件B2)的概率相同。即:
再假设狼来了(记为事件A),说谎话喊狼来了时,狼来的概率为1/3,说真话喊狼来了时,狼来的概率是2/3,即:
狼没来的情况下小孩说谎了(在村民们的主观印象上,小孩说谎的概率增加了):
随着小孩说谎的次数增加,村民们对于小孩说谎的主观概率也不断增加,当这个概率增加到一定程度时计算小孩说真话,村民们就不会再相信他。
贝叶斯判别
之前提到的两种判别分析方法都非常简单,实用,但是也存在着一定的缺点:一是判别方法与各个总体出现的概率大小无关,而是与错判后造成的损失无关。贝叶斯判别则考虑了这两种情况:贝叶斯判别假定对样本有一定的认知(先验概率),然后计算得出后验概率并进行统计推断的判别方法。
贝叶斯判别求解过程
由于过程较长,公式比较多,直接上书
代码实现过程
本案例使用的数据为鸢尾花数据集
def main(args: Array[String]): Unit = {
val spark = SparkSession
.builder()
.appName(s"${this.getClass.getSimpleName}")
.master("local[*]")
.getOrCreate()
import spark.implicits._
val sc = spark.sparkContext
val irisData = spark.read
.option("header", true)
.option("inferSchema", true)
.csv("F:\\DataSource\\iris.csv")
val schema = irisData.schema
val fts = schema.filterNot(_.name == "class").map(_.name).toArray
val amountVectorAssembler: VectorAssembler = new VectorAssembler()
.setInputCols(fts)
.setOutputCol("features")
val vec2Array = udf((vec: DenseVector) => vec.toArray)
val irisFeatrus = amountVectorAssembler
.transform(irisData)
.select($"class", vec2Array($"features") as "features")
val p: Long = irisFeatrus.count()
// 计算均值向量的自定义聚合函数(请参考之前的两篇文章)
val ui = spark.udf.register("udafMedian", new meanVector(fts.length))
// 计算样本均值向量
val uiGroup = irisFeatrus
.groupBy($"class")
.agg(ui($"features") as "ui", count($"class") as "len")
val covMatrix = irisFeatrus
.join(uiGroup, "class")
.rdd
.map(row => {
val lable = row.getAs[String]("class")
val len = row.getAs[Long]("len")
val u = densevec(row.getAs[Seq[Double]]("ui").toArray)
val x = densevec(row.getAs[Seq[Double]]("features").toArray)
val denseMatrix = (x - u).toDenseMatrix
lable -> (denseMatrix, u, len)
})
.reduceByKey((d1, d2) => {
// 矩阵合并,均值向量,样本大小
(DenseMatrix.vertcat(d1._1, d2._1), d1._2, d1._3)
})
.mapValues(tp => {
val covm: DenseMatrix[Double] =
(tp._1.t * tp._1).map(_ / (tp._3 - 1)) //协方差矩阵
val qi = math.log(tp._3.toDouble / p) // 先验概率,在此默认为各类样本的频率
(covm, tp._2.toDenseMatrix, qi)
})
val covBroad = sc.broadcast(covMatrix.collect())
val predictudf = udf((seq: Seq[Double]) => {
val dist: Array[(String, Double)] = covBroad.value
.map(tp => {
/**
* 计算判别函数的相关指标
**/
val xi = densevec(seq.toArray).toDenseMatrix
val inCov = inv(tp._2._1)
val lnCov = math.log(det(tp._2._1)) / 2
val xdiff = (xi * inCov * xi.t).data.head / 2
val mdist = (tp._2._2 * inv(tp._2._1) * tp._2._2.t).data.head / 2
val xu = (xi * inCov * tp._2._2.t).data.head
val d = tp._2._3 - lnCov - xdiff - mdist + xu
(tp._1, d)
})
val pm = dist.map(x => math.exp(x._2)).sum
// 计算后验概率
dist.map(tp => {
tp._1 -> math.exp(tp._2) / pm
})
})
irisFeatrus
.withColumn("prediction", predictudf($"features"))
.show(truncate = false)
spark.stop()
}
结果查看:从结果看出,分类效果还是很好的
|class |features |prediction |
+-----------+--------------------+-----------+
|Iris-setosa|[5.1, 3.5, 1.4, 0.2]|Iris-setosa|
|Iris-setosa|[4.9, 3.0, 1.4, 0.2]|Iris-setosa|
|Iris-setosa|[4.7, 3.2, 1.3, 0.2]|Iris-setosa|
由于作者水平有限,在介绍及实现过程中难免有纰漏之处,欢迎细心的朋友指正
参考资料:
《多元统计分析及R语言建模》--王斌会
《概率论与数理统计》 --茆师松