具体PageRank 算法原理 这里不多讲了,重点说下Spark 实现的过程
- 1 读入数据点 links sc.textfile ,split reduceByKey, 注意去重
- 2 初始化Rank
- 3 循环迭代:
- 3.1 利用 links join ranks 得到(List(点),rank)
- 3.2 利用 flatMap 求得每个点真正的rank
- 3.3 利用reduce(,)聚合结果得到,最终结果
package org.apache.spark.myExamples
import org.apache.spark.sql.SparkSession
/**
* Created by wjf on 16-8-30.
*/
object TestPageRank {
def main(args: Array[String]): Unit = {
val spark =SparkSession.builder().appName("test PageRank").master("local").getOrCreate()
val iter=10
val links = spark.read.textFile("data/mllib/pagerank_data.txt").rdd.map { line =>
val parts = line.split(" ")
(parts(0), parts(1))
}.distinct().groupByKey().cache()
links.foreach(println)
var ranks = links.mapValues( v => 1.0)
ranks.foreach(println)
for(i <- 1 to iter){
val contri=links.join(ranks).values.flatMap{ case(urls,rank) =>
val size=urls.size
urls.map(url => (url,rank/size))
}
ranks=contri.reduceByKey(_ + _).mapValues(0.15 + 0.85*_)
println("Iteration %d".format(i))
ranks.foreach(println)
}
}
}