Spark_性能调优及Spark3.0新特性

1.spark_shuffle_分区数

spark_sql aqe 优化
SparkSQL中,基于SQL分析或者DSL分析,执行Job时,如果产生Shuffle,默认分区数:200
【实际生产环境中必须合理设置shuffle时分区数目】
> spark.sql.shuffle.partitions
【代码中设置】

spark = SparkSession.builder \
          .appName("RetailAnalysis") \
          .master("local[4]") \
          .config("spark.sql.shuffle.partitions", "4") \
          .getOrCreate()

注意:
在Spark 3之前,每个应用都需要通过测试,合理设置分区数目。

【从Spark 3 开始,SparkSQL底层实现自适应调整性能优化】
当Job运行时,自动依据Shuffle时数据量,合理设置Shuffle时分区数。

【官方文档】
https://spark.apache.org/docs/3.1.2/sql-performance-tuning.html#coalescing-post-shuffle-partitions

【参数】
参数1:表示是否启用自适应调整机制
spark.sql.adaptive.enabled=true
参数2:表示Shuffle后分区数目
spark.sql.adaptive.coalescePartitions.enabled=true
其他参数:
最小分区数目
spark.sql.adaptive.coalescePartitions.minPartitionNum
初始化分区数目
spark.sql.adaptive.coalescePartitions.initialPartitionNum
每个分区数据处理数据量
spark.sql.adaptive.advisoryPartitionSizeInBytes=64m

2_spark项目调优方案

问题: 问题:Hive、Spark做了哪些优化?/ 项目中做了哪些优化?
我将项目内的spark调优分为两个

pyspark 开发调优

1.使用高性能的rdd算子
如 mappartition 和 foreachpartition

2.使用窄依赖算子替代宽依赖的算子
如 降低分区 使用coalesce代替repartition

3.适当增加和减少分区
数据量大,分区少,调大分区:读取数据
数量量小,分区多,降低分区:聚合之后

4.适当使用 缓存机制 及时释放内存
RDD或者DataFrame或者TmpView使用多次:先缓存persist
后续代码中不再需要时,就释放缓存
5.尽量使用map端聚合的算子
列如 reducebykey agg..bykey 代替 groupbykey
6.当大表join小表时 使用广播join

  • Sql优化 遵循谓词下推思想

将需要过滤的数据或者无用的数据在进行复杂操作之前提前进行过滤
列如

  • 需要过滤:where/having:能用where就不用having ,where是分组前 having 是分组后,where前不能加聚合,having 后边可以加聚合

  • row_nubmer实现去重:减少计算的数据量

举例
无用数据:订单表1000万,商品表100万,需求将两张表进行join
假设已经 卖出去60w 种商品

  • 订单表:tb_order:oid、pid
  • 商品表:tb_goods:pid、pname
    分析 商品表内的一百万条数据 在订单表中不一定全有, 只有卖出去的部分才需要join
    所以 1_找出所有已经卖出的商品单号 表A 60w
    select distinct pid from tb_order
    2_筛选出卖出商品的详细信息 表B 60w
    select
    pid,pname
    from tb_goods B join A on A.pid = B.pid
    3_step3:用订单表和B进行关联
select
    a.oid,
    a.pid,
    B.pname
from tb_order a 
join B on a.pid = B.pid
  • 结构优化 文件存储类型、分区结构化

分区表:采用静态分区

select count(*) from table1 where dt = '2021-11-09'; --走分区裁剪过滤查询


-- 不支持join中动态分区裁剪
select
 *
from table1 a join table2 b on a.dt = b.dt 
and a.dt = '2021-11-09' and b.dt = '2021-11-09';
  • 合适文件类型:parquet、orc【索引】
  • 合适压缩类型:snappy、lz4、lzo

Spark 3.0新特性

  • 动态分区裁剪(Dynamic Partition Pruning)

    • 默认的分区裁剪只有在单表查询过滤时才有效

    • 开启动态分区裁剪:自动在Join时对两边表的数据根据条件进行查询过滤,将过滤后的结果再进行join

      spark.sql.optimizer.dynamicPartitionPruning.enabled=true
      

开启动态分区裁剪之后,过滤条件都会先执行 不论是on 还是 where 还是 主副表的条件都会执行

  • 自适应查询执行(Adaptive Query Execution)

    • 基于CBO优化器引擎:实现最小代价的数据处理

    • 自动根据统计信息设置Reducer【ShuffleRead】的数量来避免内存和I/O资源的浪费

    • 自动选择更优的join策略来提高连接查询性能

    • 自动优化join数据来避免不平衡查询造成的数据倾斜,将数据倾斜的数据自动重分区

      spark.sql.adaptive.enabled=true
      

    cbo 将 catalyst 优化器优化后的逻辑计划 转换为物理计划

  • 加速器感知调度(Accelerator-aware Scheduling)(不重要)
    • 添加原生的 GPU 调度支持,该方案填补了 Spark 在 GPU 资源的任务调度方面的空白
    • 有机地融合了大数据处理和 AI 应用,扩展了 Spark 在深度学习、信号处理和各大数据应用的应用场景

项目总结问题

问题1: 数据导出不满足原子性

场景:使用sqoop将报表数据导出到mysql中时,有一部分map task 失败了,导致数据只导出了一部分
需求,我们希望如果 要失败都失败,mysql里面一点数据也没有,要成功都成功
解决 :在sqoop导入的时候添加一个参数 --staging-table
原理 :sqoop将这张表的数据导入到一张临时表中,如果成功了,就将数据从临时表中放到目标表中,如果失败了就删除目标表

问题2 数据量不一致的情况

场景 hive表中的数据量于数据源oracle中的表数据量不一致
原因, sqoop在导入hive中 如果以textfile格式存储的话,会默认将\t\n等特殊字符当作 换行符生成文本
解决 ,不以textfile格式进行存储 (avro orc格式)

问题3 spark的数据倾斜

现象 :部分task 执行时间过长
定位:从4040监控中看到某个Stage中的Task运行时间远高于别的Task

rdd.groupByKey.reduceByKey.sortByKey.join

解决

  • 提高Shuffle过程中的并行度:增加分区个数

  • 选用带有分区内聚合的算子:带有Map端聚合的算子,减少Shuffle Read的数据量

  • 将小表数据进行广播,实现广播Join:类似于Map Join

  • 采样抽取倾斜的数据,单独Join,最后Union合并:类似于Hive中Skew Join

  • 增加随机前缀:增加随机的前缀,实现随机分区

  • 自定义分区器:默认的是HashPartition

  • 小表扩大N倍,大表增加1 ~ N的随机前缀再做Join

问题4小文件问题

  • 每个Task会产生一个结果文件
  • Task个数根据分区个数来决定
  • 分区多,每个分区的数据少
  • 调整分区个数:coalesce
    问题4:ThriftServer资源不足,GC问题
    本质 ThriftServer本身是一个spark 应用 项目中所有SQL计算都是提交给ThriftServer,计算结果都会返回给 Driver
    会导致 Driver 内存不足
    解决 给Driver 分配更多的内存
start-thriftserver.sh \--name sparksql-thrift-server \--master yarn \--deploy-mode client \--driver-memory 1g \--hiveconf hive.server2.thrift.http.port=10001 \--num-executors 3 \--executor-memory 1g \--conf spark.sql.shuffle.partitions=2

问题5:ThriftServer单点故障问题

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 213,254评论 6 492
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 90,875评论 3 387
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 158,682评论 0 348
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 56,896评论 1 285
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 66,015评论 6 385
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 50,152评论 1 291
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 39,208评论 3 412
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 37,962评论 0 268
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 44,388评论 1 304
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 36,700评论 2 327
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 38,867评论 1 341
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 34,551评论 4 335
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 40,186评论 3 317
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 30,901评论 0 21
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 32,142评论 1 267
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 46,689评论 2 362
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 43,757评论 2 351

推荐阅读更多精彩内容