摘要:Spark SQL
启动Spark SQL应用
#! /bin/bash
cd /home/test_gp/SparkSQLExample
nohup spark2-submit \
--class com.example.SparkSQLExample.SparkSQLExampleMain \
--master yarn \
--num-executors 20 \
--executor-cores 3 \
--executor-memory 10g \
--conf spark.yarn.executor.memoryOverhead=12288 \
--conf spark.dynamicAllocation.enabled=false \
--conf spark.network.timeout=300 \
--conf spark.sql.shuffle.partitions=20 \
SparkSQLExample-1.0-SNAPSHOT.jar \
hdfs:///tmp/SparkSQLExample.properties > logs/detail.log 2>&1 &
跟踪日志到Spark WebUI的链接tracking URL
查看jobs
查看任务一共被分为4个job,以及每个job在scala代码中出发的行数
,还包括提交时间,执行时间,每个job得到stage数量
以及成功的stage
数量,每个job所有的task数量
和成功的task数量
。
查看stages
点击每个job可以看待当下job的所有stage,也可以直接点击stages面板,显示一共有19个stage。可以查看每个stage的task数量
和成功的task数量
,Input
从hadoop或者spark storage读取的数据大小,Output
写入hadoop的数据大小,shuffle read
每个stage读取的数据大小,shuffle write
每个stage写入磁盘的数据大小供于未来某个stage读取。
一般而言task数量(partition)等于spark.sql.shuffle.partitions
数量,当spark从hive,kudu读取数据时,task数量和数据表的分区数
保持一致。
如下图spark sql task数为226,hive在hdfs上的分区也是226
如下图spark sql task数为255,kudu的tablet也是255
查看executors
点击executors面板,可以看到driver所在机器以及其他20个executor所在机器和端口号,以及executor端的日志
一般executors数量和num-executors
数量一致,如果开启了spark.dynamicAllocation.enabled
,spark会根据task数量动态调整executors数量,num-executors只是executors数量的初始值
查看SQL的执行计划
点击SQL面板查看执行计划
-
scan
:从hive,kudu读取数据
-
Filter
:过滤操作,包括去除缺失值
,条件筛选
等 -
Project
:映射,选择需要的列 -
HashAggregate
:聚合,本例中HashAggregate+Exchange+HashAggregate
用来distinct去重
-
wholestagecodegen
:全阶段代码生成
,用来将多个处理逻辑整合到单个代码模块中,是Spark的新的SQL代码生成模型
-
Exchange
:数据重分区 -
Sort
:数据根据某个key排序 -
SortMergeJoin
:大表和大表join
的策略,本例中Exchange+Sort+SortMergeJoin
用来做大表和大表join
SortMergeJoin
-
shuffle阶段
:将两张大表根据join key进行重新分区,对应执行计划中的Exchange
-
sort阶段
:对单个分区节点的两表数据,分别进行排序,对应执行计划中的Sort
-
merge阶段
:对排好序的两张分区表数据执行join操作。遍历两张表的有序序列,从第一行开始,碰到相同join key就merge输出,否则取更小一边
的下一个
进行对碰