import org.apache.spark.{Partitioner, SparkConf}
import org.apache.spark.sql.SparkSession
import scala.collection.mutable.ArrayBuffer
class Spark06(number:Int) extends Partitioner{
//number 字段来确定分区数
override def numPartitions: Int = number
//分区规则
override def getPartition(key: Any): Int = {
try {
val str = key.toString.substring(0, 1)
if (str.matches("[A-Z]")) {
0
} else if (str.matches("[a-z]")) {
1
} else if (str.matches("[1-9]")) {
2
} else {
3
}
} catch {
case _ => 3
}
}
}
object Spark06 {
//自定义分区
//分区是什么? 分区就是在shuffle阶段, 用来确定每个key要发送到哪个reduce中
//实现? 继承org.apache.spark.Partitioner 类, 并且重写其中的方法
//案例: 按照key的首字母进行分区,划分规则: 数字,小写字母,大写字母, 其他
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[2]").setAppName("test")
val spark = SparkSession.builder().config(conf).getOrCreate()
spark.sparkContext.setLogLevel("WARN")
//计算逻辑
compute(spark)
spark.stop()
}
def compute(spark:SparkSession):Unit ={
import spark.implicits._
val date =
List(
"ab Ab 3a R ## $a ", " #s s 3g 5g Fa ob Bb ¥la a", "a c 4 88 90 _a "
)
spark.sparkContext.parallelize(date)
.flatMap( line => {
val arr = ArrayBuffer[(String,Int)]()
line.split(" ").foreach( word => arr.+=( (word ,1) ) )
arr
} )
//reduceByKey 是shuffle 算子,在此处设置分区
.reduceByKey( new Spark06(4), _+_ )
//将结果保存在文件中, 才能查看分区效果
.saveAsTextFile("g://result")
}
}
spark分区案例
©著作权归作者所有,转载或内容合作请联系作者
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
平台声明:文章内容(如有图片或视频亦包括在内)由作者上传并发布,文章内容仅代表作者本人观点,简书系信息发布平台,仅提供信息存储服务。
推荐阅读更多精彩内容
- Spark定制班第1课:通过案例对Spark Streaming透彻理解三板斧之一:解密Spark Streami...
- 大数据学习交流微信群 更多RDD的信息参考:https://www.cnblogs.com/qingyunzong...
- 本文基于Spark 2.1.0版本 新手首先要明白几个配置: spark.default.parallelism:...
- 8.不一定非得每秒处理一次 由于Spark Streaming的原理是micro batch, 因此当batch积...