1.自定义分区方式
/**
* Created by chh on 2016/5/21.
* 自定义分区
*/
class DomainNamePartitioner(numsParts: Int) extends Partitioner{
override def numPartitions: Int = numsParts
override def getPartition(key: Any): Int = {
val domain=new java.net.URL(key.toString).getHost()
val code=(domain.hashCode % numPartitions)
if(code < 0){
code + numPartitions //确保非负值
} else {
code
}
}
override def equals(other: Any): Boolean = other match {
//这个实例是DomainNamePartitioner的实例,并且numPartitions相同,返回true
case dnp: DomainNamePartitioner =>
dnp.numPartitions ==numPartitions
case _=> false
}
}
2.一个pageRank实例
步骤
① 将每个页面的排序值初始化为1.0
②在每次迭代中,向每个有直接链接的页面,发送一个值为rank(p)/numNeighbors(p)(出链数目) 的贡献量
③将每个页面的排序值设置为0.15+0.85*contributionsReceived
object Spark {
/*
#以下是url的内容:
www.baidu.com www.hao123.com
www.baidu.com www.2345.com
www.baidu.com www.zhouyang.com
www.hao123.com www.baidu.com
www.hao123.com www.zhouyang.com
www.zhouyang.com www.baidu.com
*/
def main(args :Array[String]): Unit ={
val conf =new SparkConf().setAppName("PageRank").setMaster("local")
val sc=new SparkContext(conf)
val inputs =sc.textFile("F:\\url.txt")
//url,[urls]
val links =inputs.map(x=>(x.split(" ")(0),x.split(" ")(1)))
.distinct()
.groupByKey()
.cache()
//url,rank
var ranks =links.mapValues(value =>1.0)
for(i<-0 until 10){
val contribs =links.join(ranks).flatMap({
case(pageid,(links,rank))=>
//url Double
links.map(dest=>(dest,rank/links.size))
})
//reduce and add the contribs
ranks=contribs.reduceByKey((x,y)=>x+y).mapValues(v => 0.15+0.85*v)
}
ranks.collect().foreach(println)
}
}
/*
结果:
(www.hao123.com,0.3685546839262259)
(www.baidu.com,0.761571325242544)
(www.2345.com,0.3685546839262259)
(www.zhouyang.com,0.5269013026650011)
*/
3.分区的设置与获取
val conf =new SparkConf().setAppName("wordCount").setMaster("local")
val sc=new SparkContext(conf)
val pairs=sc.parallelize(List((1,1),(2,2),(3,3)))
val partitioned =pairs.partitionBy(new HashPartitioner(2))
println(partitioned.partitioner)