基于 SparkGraphx 实现适用于位置信息的DBScan聚类

基于 SparkGraphx 实现的 DBScan聚类

关于DBScan算法的详细介绍请参见维基百科

https://en.wikipedia.org/wiki/DBSCAN

Graphx 实现Dbscan 图解


图解

1.上图中蓝色的点代表我们需要聚类的样本点,假设我们将DBScan的两个参数:距离 (Eps)设为1,最小集群点数(minPts)设为 4,则根据聚类规则,上图的A、B部分则会分别被聚为一类,C、D部分则会被视为离群点。

2.而Graphx的作用就是将两个距离满足条件的点连成边,然后再将这些边连成一个个的连通图,最后再计算各个图内的点数是否满足设定的最小集群点数。根据聚类规则我们就可以完成聚类,抽象出来就如上图所示。

3.代码实现过程如下
本文所使用的是经纬度数据,因此在使用距离计算的时候,用的是经纬度距离的计算方法(球面距离),在实现过程中也使用了Geohash算法(相关介绍有很多,这篇帖子就很好)进行了相关优化。

  /**
      * 参数校验
      */
    if (args.length != 4) {
      println(
        """
          |参数:
          |dbinput   输入路径
          |eps       邻域半径
          |minpts    最小密集点数
          |dboutput  输出路径
        """.stripMargin)
      System.exit(3)
    }
    val Array(dbinput, eps, minpts, output) = args

    val spark = SparkSession.builder()
      .appName(s"${this.getClass.getSimpleName}")
      .master("local[*]")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.shuffle.consolidateFiles", "true")
      .config("spark.io.compression.codec", "snappy")
      .getOrCreate()

    import spark.implicits._

  //  加载数据
  val dbdata = spark.read.option("inferSchema", true).csv(dbinput)


  // 计算经纬度距离
    def distanceBetweenPoints(lon1: Double, lon2: Double, lat1: Double, lat2: Double): Double = {
      require(lon1 >= -180 && lon1 <= 180)
      require(lon2 >= -180 && lon2 <= 180)
      require(lat1 >= -90 && lat1 <= 90)
      require(lat2 >= -90 && lat2 <= 90)
      val R = 6371009d // average radius of the earth in metres
      val dLat = toRadians(lat2 - lat1)
      val dLng = toRadians(lon2 - lon1)
      val latA = toRadians(lat1)
      val latB = toRadians(lat2)
      // The actual haversine formula. a and c are well known value names in the formula.
      val a = sin(dLat / 2) * sin(dLat / 2) +
        sin(dLng / 2) * sin(dLng / 2) * cos(latA) * cos(latB)
      val c = 2 * atan2(sqrt(a), sqrt(1 - a))
      // 默认返回千米
      (R * c) / 1000D
    }

    // 经纬度距离 sparksql udf 
    val lonLatDistance = udf((lon1: Double, lon2: Double, lat1: Double, lat2: Double) => {
      distanceBetweenPoints(lon1, lon2, lat1, lat2)
    })

此部分是结合GeoHash算法做的一点优化,主要是根据dbscan的距离参数预先对数据进行分组,笔者水平有限,只想到了这个数据分区的方法。


GeoHash Code 精度对照
   // 根据geohash算法对经纬度数据做分区
    val scope = udf((lon: Double, lat: Double) => {
      // geohash
      val geohash = GeoHash.encodeHash(lat, lon,
        // 计算geohash的最优分区位数
        MLUtils.geoLength(eps.toDouble))
      val neighbours: Array[String] = GeoHash.neighbours(geohash).toArray().map(_.toString)
      Seq(geohash) ++ neighbours
    })
//  将原始的经纬度数据按照相同的分组进行 join聚合
  val localbase = dbdata
      .toDF("lon", "lat")
      .where($"lon".isNotNull and $"lat".isNotNull)
      .withColumn("id", hash($"lon", $"lat"))

    val ll = localbase
      .withColumn("scopes", scope($"lon", $"lat"))
      .withColumn("scope", explode($"scopes"))
      .drop("scopes").cache()

    val ll2 = ll.toDF("lon2", "lat2", "id2", "scope")

    val data = ll.join(ll2, "scope").where($"id" =!= $"id2")
      .withColumn("distance", lonLatDistance($"lon", $"lon2", $"lat", $"lat2"))
 

    //构建边Edge[Int]
    val lv: RDD[(VertexId, VertexId)] = data
      .filter($"distance" <= eps.toDouble)  // 筛选出满足距离条件的点
      .select($"id", $"id2").rdd
      .map(row => {
        val id = row.getAs[Int]("id").toLong
        val id2 = row.getAs[Int]("id2").toLong
        (id, id2)
      })

    val le = lv.map { ids => Edge(ids._1, ids._2, 0) } // 根据点构建边

    // 构建图
    val graph = Graph(lv, le)
    val gcc = graph.connectedComponents().vertices
    val joined = gcc.join(lv)
      .map(tp => {
        (tp._2._1, Seq(tp._2._2))
      }).reduceByKey(_ ++ _)   // 聚合每个联通图的点
      .map(tp => {
        (tp._2.distinct, tp._2.distinct.length)
      }).filter(_._2 >= minpts.toInt)   // 筛选出满足最小聚类点数的连通图

    val clust = joined.toDF("clu", "ct")
      .withColumn("cluid", hash($"clu"))
      .withColumn("id", explode($"clu"))   

    val dbres: DataFrame = localbase.join(clust, Seq("id"), "left")
      .na.fill(0).drop("clu", "ct")  // 离群点的聚类id以0标识

    // 保存聚类结果
    dbres.repartition(1).write.option("header", true)
      .mode("overwrite")
      .csv(output) 

在本案例中,eps设为30km,minPts设为 5,聚类结果的可视化如下 ,红圈的就是两个簇类,其余的都是离群点


聚类结果可视化

本案例的数据链接 https://pan.baidu.com/s/1EaA7oGAmiJ2m4oXPLppsdg

用此方法实现的DBScan聚类在大数据集上运行效率较低,还有很多可以优化的地方,也有很多可以扩展的地方,如有不当之处,欢迎指正

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 204,793评论 6 478
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 87,567评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 151,342评论 0 338
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,825评论 1 277
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,814评论 5 368
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,680评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 38,033评论 3 399
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,687评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 42,175评论 1 300
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,668评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,775评论 1 332
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,419评论 4 321
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 39,020评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,978评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,206评论 1 260
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 45,092评论 2 351
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,510评论 2 343

推荐阅读更多精彩内容