Spark 之PageRank

具体PageRank 算法原理 这里不多讲了,重点说下Spark 实现的过程

  • 1 读入数据点 links sc.textfile ,split reduceByKey, 注意去重
  • 2 初始化Rank
  • 3 循环迭代:
    • 3.1 利用 links join ranks 得到(List(点),rank)
    • 3.2 利用 flatMap 求得每个点真正的rank
    • 3.3 利用reduce(,)聚合结果得到,最终结果
package org.apache.spark.myExamples

import org.apache.spark.sql.SparkSession

/**
  * Created by wjf on 16-8-30.
  */
object TestPageRank {
  def main(args: Array[String]): Unit = {
    val spark =SparkSession.builder().appName("test PageRank").master("local").getOrCreate()

    val iter=10
    val links = spark.read.textFile("data/mllib/pagerank_data.txt").rdd.map { line =>
      val  parts = line.split(" ")
      (parts(0), parts(1))
    }.distinct().groupByKey().cache()

    links.foreach(println)

    var ranks = links.mapValues( v => 1.0)
    ranks.foreach(println)

    for(i <- 1 to iter){
      val contri=links.join(ranks).values.flatMap{ case(urls,rank) =>
        val size=urls.size
        urls.map(url => (url,rank/size))
      }
      ranks=contri.reduceByKey(_ + _).mapValues(0.15 + 0.85*_)
      println("Iteration %d".format(i))
      ranks.foreach(println)
    }
    
  }

}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。

推荐阅读更多精彩内容