1 数据倾斜
- 绝大部分任务都很快完成,只有一个或者少数几个任务执行的很慢甚至最终执行失败,这样的现象为数据倾斜现象。
- 将数据倾斜分为单表携带了 GroupBy 字段的查询和两表(或者多表)Join 的查询。
1.1 单表数据倾斜优化
1.1.1 Map 端进行聚合 - GroupBy 操作同时聚合函数为 count 或者 sum
set hive.map.aggr = true;
set hive.groupby.mapaggr.checkinterval = 100000;
set hive.groupby.skewindata = true;
# 当选项设定为 true,生成的查询计划会有两个 MR Job。
1.1.2 增加 Reduce 数量(多个 Key 同时导致数据倾斜)
方法1:
set hive.exec.reducers.bytes.per.reducer = 256000000
set hive.exec.reducers.max = 1009
计算 reducer 数的公式
N=min(参数 2,总输入数据量/参数 1)(参数 2 指的是上面的 1009,参数 1 值得是 256M)
方法2:
set mapreduce.job.reduces = 15;
1.2 多表Join 数据倾斜优化
1.2.1 拆分倾斜key
# join 的键对应的记录条数超过这个值则会进行分拆,值根据具体数据量设置
set hive.skewjoin.key=100000;
# 如果是 join 过程出现倾斜应该设置为 true
set hive.optimize.skewjoin=false;
如果开启了,在 Join 过程中 Hive 会将计数超过阈值 hive.skewjoin.key(默认 100000)的倾斜 key 对应的行临时写进文件中,然后再启动另一个 job 做 map join 生成结果。通hive.skewjoin.mapjoin.map.tasks 参数还可以控制第二个 job 的 mapper 数量,默认 10000。
set hive.skewjoin.mapjoin.map.tasks=10000;
1.2.2 MapJoin
- MapJoin 是将 Join 双方比较小的表直接分发到各个 Map 进程的内存中,在 Map 进程中进行 Join 操作,这样就不用进行 Reduce 步骤,从而提高了速度。
set hive.auto.convert.join=true; #默认为 true
set hive.mapjoin.smalltable.filesize=25000000;
2 大数据量性能优化
2.1 分区表
- Hive 中的分区就是分目录,把一个大的数据集根据业务需要分割成小的数据集。
- 分区字段不能是表中已经存在的数据,可以将分区字段看作表的伪列。
- 分区表加载数据时,必须指定分区
- 在查询时通过 WHERE 子句中的表达式选择查询所需要的指定的分区
- 创建二级分区
hive (default)> create table dept_partition2(
deptno int,
dname string,
loc string)
partitioned by (day string, hour string)
row format delimited fields terminated by '\t';
- 动态分区:导入数据,数据库自动会根据分区字段的值,将数据插入到相应的分区中
# 开启动态分区功能(默认 true,开启)
set hive.exec.dynamic.partition=true;
# 设置为非严格模式(动态分区的模式,默认 strict,表示必须指定至少一个分区为静态分区,nonstrict 模式表示允许所有的分区字段都可以使用动态分区。)
set hive.exec.dynamic.partition.mode=nonstrict;
2.2 分桶表
- 分桶是将数据集分解成更容易管理的若干部分的另一个技术。分区针对的是数据的存储路径,分桶针对的是数据文件。
create table stu_buck(id int, name string)
clustered by(id)
into 4 buckets
row format delimited fields terminated by '\t';
3 HQL查询性能优化
3.1 文件存储格式与压缩算法
综合压缩速度、压缩比率、列式存储、行式存储,在实际的项目开发当中,hive 表的数据存储格式一般选择:orc 或 parquet。压缩方式一般选择 snappy,lzo。
create table log_parquet_snappy(
track_time string,
url string,
session_id string,
referer string,
ip string,
end_user_id string,
city_id string
)
row format delimited fields terminated by '\t'
stored as parquet
tblproperties("parquet.compression"="SNAPPY");
3.2 多重模式
- 一堆 SQL 都是从同一个表进行扫描,做不同的逻辑。有可优化的地方:如果有 n 条 SQL,每个 SQL 执行都会扫描一次这张表。
insert int t_ptn partition(city=A). select id,name,sex, age from student
where city= A;
insert int t_ptn partition(city=B). select id,name,sex, age from student
where city= B;
insert int t_ptn partition(city=c). select id,name,sex, age from student
where city= c;
修改为:
from student
insert int t_ptn partition(city=A) select id,name,sex, age where city= A
insert int t_ptn partition(city=B) select id,name,sex, age where city= B
3.3 vectorization : 矢量计算的技术
- 在计算类似scan, filter, aggregation的时候,vectorization技术以设置批处理的增量大小为 1024 行单次来达到比单条记录单次获得更高的效率。
set hive.vectorized.execution.enabled = true;
set hive.vectorized.execution.reduce.enabled = true;
3.4 in/exists 高效用法left semi join
select a.id, a.name from a where a.id in (select b.id from b);
select a.id, a.name from a where exists (select id from b where a.id =
b.id);
-- left semi join 实现
select a.id, a.name from a left semi join b on a.id = b.id;
3.5 开启CBO成本优化器
- Hive 在提供最终执行前,优化每个查询的执行逻辑和物理执行计划。根据查询成本执行进一步的优化,从而产生潜在的不同决策:如何排序连接,执行哪种类型的连接,并行度等等
set hive.cbo.enable=true;
set hive.compute.query.using.stats=true;
set hive.stats.fetch.column.stats=true;
set hive.stats.fetch.partition.stats=true;
3.6 谓词下推
- 将 SQL 语句中的 where 谓词逻辑都尽可能提前执行,减少下游处理的数据量
set hive.optimize.ppd = true; #谓词下推,默认是 true
3.7 大表join- Sort Merge Bucket Join
set hive.optimize.bucketmapjoin = true;
set hive.optimize.bucketmapjoin.sortedmerge = true;
set hive.input.format=org.apache.hadoop.hive.ql.io.BucketizedHiveInputFormat;
# 分桶表join
insert overwrite table jointable
select b.id, b.t, b.uid, b.keyword, b.url_rank, b.click_num, b.click_url
from bigtable_buck1 s
join bigtable_buck2 b on b.id = s.id;
4 Hive Job性能优化
4.1 hive map端优化
4.1.1 复杂文件增加 Map 数
- computeSliteSize(Math.max(minSize,Math.min(maxSize,blocksize)))=blocksize=128M 公式,调整 maxSize 最大值。让 maxSize 最大值低于 blocksize 就可以增加 map 的个数。
- set mapreduce.input.fileinputformat.split.maxsize=100;
-
4.1.2 小文件合并
- 合并文件的大小,默认 256M set hive.merge.size.per.task = 268435456;
当输出文件的平均大小小于该值时,启动一个独立的 map-reduce 任务进行文件 merge
set hive.merge.smallfiles.avgsize = 16777216; - 在 map 执行前合并小文件
set hive.input.format= org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
- 在 map-only 任务结束时合并小文件,默认 true
set hive.merge.mapfiles = true;
- 在 map-reduce 任务结束时合并小文件,默认 false
set hive.merge.mapredfiles = true;
4.1 合理设置reduce数
- 在设置 reduce 个数的时候也需要考虑这两个原则:处理大数据量利用合适的 reduce 数;
使单个 reduce 任务处理数据量大小要合适;
# 每个 Reduce 处理的数据量默认是 256MB
set hive.exec.reducers.bytes.per.reducer = 256000000
# 每个任务最大的 reduce 数,默认为 1009
set hive.exec.reducers.max = 1009
# 计算 reducer 数的公式
N=min(参数 2,总输入数据量/参数 1)(参数 2 指的是上面的 1009,参数 1 值得是 256M)
# 设置每个 job 的 Reduce 个数
set mapreduce.job.reduces = 15;
4.3 并行执行
- Hive 会将一个查询转化成一个或者多个阶段。这样的阶段可以是 MapReduce 阶段、抽样阶段、合并阶段、limit 阶段。或者 Hive 执行过程中可能需要的其他阶段。默认情况下,Hive 一次只会执行一个阶段。
set hive.exec.parallel=true; //打开任务并行执行,默认为 false
set hive.exec.parallel.thread.number=16; //同一个 sql 允许最大并行度,默认为 8
4.4 本地模式
- Hive 可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短
set hive.exec.mode.local.auto=true;
# 设置 local mr 的最大输入数据量,当输入数据量小于这个值时采用 local mr 的方式,默认为 134217728,即 128M
set hive.exec.mode.local.auto.inputbytes.max=50000000;
# 设置 local mr 的最大输入文件个数,当输入文件个数小于这个值时采用 local mr 的方式,默认为 4
set hive.exec.mode.local.auto.input.files.max=10;
5 spark运行参数调优
spark.executor.cores
- 该参数表示每个 Executor 可利用的 CPU 核心数。其值不宜设定过大,因为 Hive 的底层以 HDFS 存储,而 HDFS 有时对高并发写入处理不太好,容易造成 race condition。根据经验实践,设定在 3~6 之间比较合理
- 由于一个 Executor 需要一个 YARN Container 来运行,所以还需保证 spark.executor.cores
的值不能大于单个 Container 能申请到的最大核心数,即 yarn.scheduler.maximum-allocationvcores 的值。
spark.executor.memory/spark.yarn.executor.memoryOverhead
- 这两个参数分别表示每个 Executor 可利用的堆内内存量和堆外内存量
- Hive 官方提供了一个计算 Executor 总内存量的经验公式,如下:
yarn.nodemanager.resource.memory-mb*(spark.executor.cores/yarn.nodemanager.resource.cpu-vcores)
spark.executor.instances
- 若我们一共有 10 台 32C/128G 的节点,并按照上述配置(即每个节
点承载 7 个 Executor),那么理论上讲我们可以将 spark.executor.instances 设为 70,以使集群资源最大化利用。但是实际上一般都会适当设小一些(推荐是理论值的一半左右,比如 40),因为 Driver 也要占用资源,并且一个 YARN 集群往往还要承载除了 Hive on Spark 之外的其他业务
spark.dynamicAllocation.enabled
- 推荐将 spark.dynamicAllocation.enabled 参数设为 true,以启用 Executor 动态分配。
set hive.execution.engine=spark;
set spark.executor.memory=11.2g;
set spark.yarn.executor.memoryOverhead=2.8g;
set spark.executor.cores=4;
set spark.executor.instances=40;
set spark.dynamicAllocation.enabled=true;
set spark.serializer=org.apache.spark.serializer.KryoSerializer;
spark.driver.cores
- 该参数表示每个 Driver 可利用的 CPU 核心数。绝大多数情况下设为 1 都够用。
spark.driver.memory/spark.driver.memoryOverhead
- 这两个参数分别表示每个 Driver 可利用的堆内内存量和堆外内存量。根据资源富余程度和作业的大小,一般是将总量控制在 512MB~4GB 之间,并且沿用 Executor 内存的“二八分配方式”。例如,spark.driver.memory 可以设为约 819MB,spark.driver.memoryOverhead 设为
约 205MB,加起来正好 1G