HQL调优

HQL调优

YARN资源调优

重点关注以下四个参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
<!--表示一个NodeMamager节点分配给Container容器使用的内存。
该参数的配置取决于NodeManager所在节点的总内存容量和该节点运行的其它服务器数量。
比如单节点内存128G,可以分配64G甚至更高,但是不要全部占用。
如果有三个节点,配置完成之后YARN页面Memory Total为64G*3。-->
<property>
<name>yarn.nodemanager.resource.memory-mb</name>
<value>64*1024</value>
</property>

<!--表示一个NodeManager节点分配给Container容器使用的CPU核数。
该参数的配置取决于NodeManager所在节点的总CPU核数和该节点运行的其它服务。
比如单节点CPU核数32,可以分配16甚至更高,但是不要全部占用。
如果有三个节点,配置完成之后yarn页面VCores Total为16*3。-->
<property>
<name>yarn.nodemanager.resource.cpu-vcores</name>
<value>16</value>
</property>

<!--表示单个Container能够使用的最大内存。-->
<property>
<name>yarn.scheduler.maximum-allocation-mb</name>
<value>4*1024</value>
</property>

<!--表示单个Container能够使用的最小内存。-->
<property>
<name>yarn.scheduler.mininum-allocation-mb</name>
<value>4*1024</value>
</property>

MapReduce资源调优

在实际的生产环境中,Hive一般基于Spark引擎,很少基于MapReduce来进行资源调优,这里仅作参考。
MapReduce资源调优主要包括Map Task的内存和CPU,以及Reduce Task的内存和CPU。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
<!--单个Map Task能够申请的Container容器内存大小,默认1024。
该值不能超出YARN参数yarn.scheduler.maximum-allocation-mb和yarn.scheduler.mininum-allocation-mb的范围。-->
<property>
<name>mapreduce.map.memory.mb</name>
<value>1024</value>
</property>

<!--单个Map Task能够申请的Container容器CPU核数。默认为1。一般无需调整。-->
<property>
<name>mapreduce.map.cpu.vcores</name>
<value>1</value>
</property>

<!--单个Reduce Task能够申请的Container容器内存大小,默认1024。
该值不能超出YARN参数yarn.scheduler.maximum-allocation-mb和yarn.scheduler.mininum-allocation-mb的范围。-->
<property>
<name>mapreduce.reduce.memory.mb</name>
<value>1024</value>
</property>

<!--单个Reduce Task能够申请的Container容器CPU核数。默认为1。一般无需调整。-->
<property>
<name>mapreduce.reduce.cpu.vcores</name>
<value>1</value>
</property>

注意:上面四个参数可以基于SQL进行单独配置。

1
set mapreduce.map.memory.mb = 1024

SQL优化

查看SQL执行计划

使用Explain查看SQL执行计划。
执行计划由一系列Stage组成。这一系列Stage具有依赖关系,每个Stage对应一个MapReduce Job或者一个文件系统操作(例如Fetch Operator操作)。如果某个Stage对应一个MapReduce Job,那么其Map端和Reduce端的计算逻辑分别由Map Operator Tree和Reduce Operator Tree进行描述。Operator Tree由一系列的Operator组成,一个Operator代表在Map或Reduce阶段的一个单一的逻辑操作。

Hive中常见的Operator操作有:

  • TableScan:表扫描操作,通常map端第一个操作肯定是表扫描操作。
  • Select Operator:选取操作
  • Group By Operator:分组聚合操作
  • Reduce Output Operator:输出到Reduce操作
  • Filter Operator:过滤操作
  • Join Operator:Join操作
  • File Output Operator:文件输出操作
  • Fetch Operator:客户端获取数据操作

基本语法

格式:EXPLAIN [FORMATTED|EXTENDED|DEPENDENCY] QUERY-SQL
其中,可选项表示:

  • FORMATTED:将执行计划以JSON字符串的形式输出。
  • EXTENDED:输出执行计划中的额外信息,通常是读写的文件名等信息。
  • DEPENDENCY:输出执行计划读取的表及分区。

分组聚合优化

Hive中未经优化的分组聚合是通过一个MapReduce Job实现的。Map端负责读取数据,并按照分组字段分区,通过Shuffle将数据发往Reduce端,各组数据在Reduce端完成最终的聚合。
Hive对分组聚合的优化主要围绕着减少Shuffle数据量进行,具体做法是使用map-side聚合。所谓map-side聚合,就是在Map端维护一个Hash Table,利用其完成部分的聚合,然后将部分聚合的结果,按照分组字段分区发送至reduce端,完成最终的聚合。map-side聚合能够有效减少Shuffle的数据量,提高分组聚合运算的效率。
相关参数如下:

1
2
3
4
5
6
7
8
9
10
11
12
--启用map-side聚合操作(默认开启)
set hive.map.aggr = true

--用于检测源表数据是否适合进行map-sdie聚合。
--检测方法:先对若干条数据(下面的参数决定)进行map-side聚合,若聚合后的条数和聚合前的条数比值小于该值(比如聚合后4条,聚合前10条,比值0.4),则认为该表适合进行map-side操作,否则认为该表不适合,后续数据便不再进行map-side操作。
set hive.map.aggr.hash.min.reduction = 0.5

--用于检测源表是否适合map-side聚合的条数。
set hive.map.groupby.mapaggr.checkinterval=10000;

--map-side所用的hash table占用map task堆内存的最大比例,若超出该值则会对Hash Table进行一次flush。
set hive.map.aggr.hash.force.flush.memory.threshold = 0.9

Join优化

Hive中拥有多种Join算法。

Common Join

Hive中最稳定的Join算法,通过一个MapReduce Job完成一个Join操作。Map端负责读取Join操作所需表的数据,并按照关联字段进行分区,通过Shuffle将其发送到Reduce端,相同key的数据在Reduce端完成最终的Join操作。
注意:SQL语句中的Join操作和执行计划中的Common Join任务并非是一对一的关系。一个SQL语句中相邻的且关联字段相同的多个Join操作可以合并为一个Common Join操作。
例如:

1
2
and a.id = b.id
and b.id =c.id

Map Join

Map Join算法可以通过两个只有Map阶段的Job完成一个Join操作。其适用场景为大表Join小表。若某Join操作满足要求,则第一个Job会读取小表数据,将其制作为Hash Table,并上传至HDFS,第二个Job会先从HDFS中读取小表数据,并缓存到Map Task的内存中,然后扫描大表数据,这样在Map端即可完成关联操作。

Map Join有两种触发方式:一种是用户在SQL语句中增加Hint提示,另外一种是Hive优化器根据参与Join表的数据量大小,自动触发。

Hint提示(已过时)
1
2
3
4
select /* + mapjoin(ta) */ ta.id,tb,id 
from table_a ta
join table_b tb
on ta.id = tb.id
自动触发

Hive在编译SQL语句阶段,起初所有的Join操作均采用Common Join算法实现。之后在物理优化阶段,Hive会根据每个Common Join任务所需表的大小判断该Common Join是否能够转换成Map Join任务。如果能够,变自动转换成Map Join任务。

1
2
3
4
5
6
7
8
9
10
11
--启动Map Join自动转换
set hive.auto.convert.join = true;

--一个Common Join Operator转为Map Join Operator的判断条件。
set hive.mapjoin.smalltable.filesize= 25000;

--开启无条件转Map Join
set hive.auto.convert.join.noconditionaltask=true;

--无条件转Map Join时小表之和阈值
set hive.auto.convert.join.noconditionaltask.size = 1000000;

Bucket Map Join

Bucket Map Join是对Map Join算法的改进。
Map Join只适用于大表Join小表,而Bucket Map Join可以用于大表Join大表的场景。
它的核心思想是:如果能保证参与Join的表均为分桶表,且关联字段均为分桶字段,且其中一张表的分桶数量是另外一张表的分桶数量的整数倍,就能保证参与Join的两张表的分桶之间具有明确的关联关系,那么就可以在两表的分桶间进行Map Join操作。这样一来,第二个Job的Map端就无需再缓存小表的全表数据,而只需要缓存其所需的分桶即可。

Bucket Map Join不支持自动转换。用户需要增加Hint提示并配置相关参数方可使用。

增加Hint提示
1
2
3
4
select /* + mapjoin(ta) */ ta.id,tb,id 
from table_a ta
join table_b tb
on ta.id = tb.id
参数配置
1
2
3
4
5
6
--关闭CBO优化,CBO优化会导致Hint提示被忽略
set hive.cbo.enable=false;
--打开hint提示
set hive.ignore.mapjoin.hint = false;
--启用Bucket Map Join优化功能
set hive.optimize.bucketmapjoin= true;

Sort Merge Bucket Map Join

Sort Merge Bucket Map Join基于Bucket Map Join。
Sort Merge Bucket Map Join要求参与Join的表均为分桶表,且保证分桶内的数据是有序的,且分桶字段、排序字段和关联字段均为相同字段,且其中一张表的分桶数量是另外一张表的分桶数量的整数倍。
Sort Merge Bucket Map Join桶Bucket Map Join一样,同样是利用两表各分桶之间的关联关系在分桶之间进行关联关系。不同的是分桶之间Join操作的实现原理。Sort Merge Bucket两个分桶间的Join实现原理为Sort Merge Join算法,而Bucket Map Join两个分桶之间的Join实现原理为Hash Join算法。
Hash Join和Sort Merge Join均为关系型数据库中常见的Join实现算法。Hash Join的原理相对简单,就是对参与Join的一张表构建Hash Table,然后扫描另外一张表,进行逐行匹配。Sort Merge Join需要在两张按照关联字段排好序的表中进行。

1
2
3
4
5
6
7
8
9
--启动Sort Merge Bucket Map Join
set hive.optimize.bucketmapjoin.sortmerge= true;
--使用自动转换
set hive.auto.convert.sortmerge.join=true;

-- 查看hive相关参数
set -v(查看所有的参数)
set 查看与hadoop不一样的参数
set 具体参数

数据倾斜优化

数据倾斜通常是指参与计算的数据分布不均,即某个Key或者某些Key的数据量远超其他Key,导致在shuffle阶段,大量相同Key的数据被发往同一个Reduce,进而导致该reduce所需的时间远超其他reduce,成为整个任务的瓶颈。
Hive中的数据倾斜常出现在分组聚合和Join操作的场景中。

分组聚合数据倾斜

mpa-side聚合优化

开启map-side聚合后,数据会在map端完成部分聚合操作,这样一来,即便原始数据是倾斜的,经过map端的初步聚合后,发往reduce的数据也就不再倾斜了。理想情况下,map端能完全屏蔽数据倾斜问题。

1
2
3
4
5
6
7
8
9
10
11
--启用map-side聚合操作(默认开启)
set hive.map.aggr = true

--用于检测源表数据是否适合进行map-sdie聚合。检测方法:先对若干条数据(下面的参数决定)进行map-side聚合,若聚合后的条数和聚合前的条数比值小于该值(比如聚合后4条,聚合前10条,比值0.4),则认为该表适合进行map-side操作,否则认为该表不适合,后续数据便不再进行map-side操作。
set hive.map.aggr.hash.min.reduction = 0.5

--用于检测源表是否适合map-side聚合的条数。
set hive.map.groupby.mapaggr.checkinterval=10000;

--map-side所用的hash table占用map task堆内存的最大比例,若超出该值则会对Hash Table进行一次flush。
set hive.map.aggr.hash.force.flush.memory.threshold = 0.9
Skew-GroupBy优化

Skew-GroupBy的原理是启动两个MR任务,第一个MR按照随机数分区,将数据分散发送到Reduce端,完成部分聚合操作,第二个MR按照分组字段分区,完成最终的聚合操作。
相关参数如下:

1
2
--开启skew-GroupBy数据倾斜优化
set hive.groupby.skewindata= true;

注意:map-side优化虽然使用场景更广,但是由于需要维护Hash Table,极端情况下可以会占用大量内存,这种情况下,skew-GroupBy优化的优势就体现出来了。

Join操作数据倾斜

Join操作默认使用Common Join算法,也就是通过一个MapReduce Job完成计算。Map端负责读取Join操作所需表的数据,并按照关联字段进行分区,通过Shuffle,将其发送到Reduce端,相同Key的数据在Reduce端完成最终的Join操作。
如果关联字段的值分布不均,就会导致大量相同的key进入同一Reduce,从而导致数据倾斜问题。
由Join导致的数据倾斜问题,有以下三种解决方案:

Map Join优化

使用Map Join算法,Join操作仅在Map端就能完成,没有Shuffle操作,没有Reduce阶段,自然不会产生Reduce端的数据倾斜。该方案适用于大表Join小表时发生数据倾斜的场景。
相关参数如下:

1
2
3
4
5
6
7
8
9
10
11
--启动Map Join自动转换
set hive.auto.convert.join = true;

--一个Common Join Operator转为Map Join Operator的判断条件。
set hive.mapjoin.smalltable.filesize= 25000;

--开启无条件转Map Join
set hive.auto.convert.join.noconditionaltask=true;

--无条件转Map Join时小表之和阈值
set hive.auto.convert.join.noconditionaltask.size = 1000000;
Skew Join优化

该方案适用于大表join大表时发生数据倾斜的场景。
Skew Join的原理是为倾斜的大Key单独启动一个Map Join任务进行计算,其余Key进行正常的Common Join。
相关参数如下:

1
2
3
4
5
--启动skew join优化
set hive.optimize.skewjoin = true;

--触发skew join的阈值,若某个key的行数超过该参数值,则触发
set hive.skewjoin.key = 100000;

这种方案对参与Join的源表大小没有要求,但是对两表中倾斜的Key的数据量有要求,要求一张表中的倾斜Key的数据量比较小,这样才方便走Map Join。

调整SQL语句

若参与Join的两表均为大表,其中一张表的数据是倾斜的,此时可以通过以下方式对SQL语句进行相应的调整:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--原SQL
select * from table_a
join table_b
on table_a.id = table_b.id

--调整之后的SQL
--打散操作,rand() 随机数范围默认为0到1,*2转换为int之后必然为0或者1。打散的越多,b表需要扩容的越多。
select * from (
select concat(id,’_’,cast(rand()*2 as int)) id
from table_a
) ta
Join(
--扩容操作,保证a表和b表能关联上。
select concat(id,’_’,0) id from table_b
union all
select concat(id,’_’,1) id from table_b

) tb
on ta.id =tb.id

并行度优化

Hive的计算任务由MapReduce完成,因此并行度的调整分为Map端和Reduce端。

Map端并行度优化

Map端并行度,也就是Map的个数,是由输入文件的切片数决定的。一般情况下,Map端的并行度无需手动调整。以下特殊情况可以考虑Map端并行度。

查询表中存在大量小文件

按照Hadoop默认的切片策略,一个小文件会单独启动一个Map Task负责计算。如存在大量小文件,则会启动大量Map Task,造成资源浪费。用户可以使用Hive提供的CombineHiveInputFormat,将多个小文件合并为一个切片,相关参数如下:

1
2
--默认开启
set hive.input.format=apache.hadoop.hive.al.io.CombineHiveInputFormat
Map端有复杂的查询逻辑

若SQL语句中有正则替换,JSON解析等复杂耗时的查询逻辑时,Map端的计算会相对慢一下,在资源充足的情况下,可以考虑增大Map端的并行度。
相关参数如下:

1
set mapreduce.input.fileinputformat.split.maxsize = 256000000;

Reduce端并行度优化

Reduce端并行度,也就是Reduce的个数。可以由用户自己决定,也可以由Hive自行根据MapReduce Job输入的文件大小进行估算。
相关参数如下:

1
2
3
4
5
6
--指定Reduce端并行度,默认值为-1,表示用户未指定。
set mapreduce.job.reduces;
--Reduce端并行度最大值
set hive.exec.reducers.max;
--单个Reduce Task计算的数据量。用于估算Reduce并行度。
set hive.exec.reducers.bytes.per.reducer;

小文件合并优化

Map端输入文件合并优化

合并Map端输入的小文件,是指将多个小文件划分到一个切片中,进而由一个Map Task去处理。
相关参数如下:

1
set hive.input.format=apache.hadoop.hive.al.io.CombineHiveInputFormat

Reduce端输出文件合并优化

合并Reduce端输出的小文件,是指将多个小文件合并成大文件,减少HDFS小文件数量。其原理是根据计算任务输出文件的平均大小进行判断,若符合条件,则单独启动一个额外的任务进行合并。
相关参数如下:

1
2
3
4
5
6
7
8
--开启合并map only任务输出的小文件
set hive.merge.mapfiles = true;
--开启合并Map reduce任务输出的小文件
set hive.merge.mapredfiles=true;
--合并后的文件大小
set hive.merge.size.per.task = 256000000;
--触发小文件合并任务的阈值。若某计算任务输出的文件平均大小低于该值,则触发合并。
set hive.merge.smallfiles.avgsize = 16000000;

CBO优化

CBO指的是Cost Based Optimizer,即基于计算成本的优化。Hive会计算同一SQL语句的不同执行计划的计算成本,并选出成本最低的执行计划。目前CBO在Hive的MR引擎下主要用于Join的优化,例如多表Join的Join顺序。
相关参数如下:

1
2
--是否启用CBO优化
set hive.cbo.enable=true;

谓词下推

谓词下推(predicate pushdown)是指尽量将过滤操作前移,以减少后续步骤的计算量。
相关参数如下:

1
2
--是否启动谓词下推优化
set hive.optimize.ppd= true;

注意:CBO优化也会完成一部分谓词下推优化工作,因为在执行计划中谓词越靠前,整个计算的计算成本就会越低。

矢量化查询

Hive的矢量化查询优化,依赖于CPU的矢量化计算。Hive的矢量化查询可以极大的提高一些典型的查询场景(例如Scan、Filters、Aggregates、Joins)下的CPU使用效率。
相关参数如下:

1
set hive.vectorized.execution.enabled=true;

注意:如果执行计划中出现了Execution mode:vectorized字样,表明使用了矢量化计算。

Fetch抓取

Fetch抓取指定是Hive对某些情况的查询可以不必使用MapReduce计算。例如

1
select * from table;

在这种情况下,Hive可以简单地读取Table表对应的存储目录下的文件,然后输出结果到控制台。
相关参数如下:

1
2
3
4
5
6
--是否在特定场景转换为fetch任务。
--参数说明:
--设置为none表示不转换
--设置为minimal表示支持select *,分区字段过滤,limit等
--设置为more表示支持select 任意字段,包括函数,过滤和limit等。
set hive.fetch.task.conversion = more;

本地模式

大多数的Hadoop Job是需要Hadoop提供完整的可扩展性来处理大数据集的,不过有时Hive的输入数据量非常小,在这种情况下,为查询触发查询任务消耗的时间可能比实际Job执行的时间要多得多。对于大多数这种情况,Hive可以通过本地模式在单台机器上处理所有的任务。对于小数据集,执行时间可以明显被缩短。
相关参数如下:

1
2
3
4
5
6
--开启自动转换为本地模式
set hive.exec.model.local.auto=true;
--设置Local MapReduce的最大输入数据量。当输入数据量小于这个值时采用Local MapReduce的方式,默认128M
set hive.exec.model.local.auto.inputbytes.max = 50000000;
--设置Local MapReduce的最大输入文件个数。当输入文件个数小于这个值时采用Local MapReduce的方式,默认为4
set hive.exec.model.local.auto.input.files.max=10;

并行执行

Hive会将一个SQL语句转换成一个或多个Stage,每个Stage对应一个MapReduce Job。默认情况下,Hive同时只会执行一个Stage。但是某些情况下一个SQL可能包含多个Stage,并且多个Stage并非完全互相依赖,也就是说有些Stage是可以并行执行的。
相关参数如下:

1
2
3
4
--启用并行执行优化
set hive.exec.parallel=true;
--同一个SQL允许最大并行度,默认为8
set hive.exec.parallel.thread.number=8;

严格模式

Hive可以设置某些参数防止危险操作。
例如:

1
2
3
4
5
6
--如果为true,对于分区表,除非where语句中含有分区字段过滤条件来限制范围,否则不允许执行。
set hive.strict.checks.no.partitions.filter = true;
--如果为true,对于使用了order by语句的查询,要求必须使用limit语句。
set hive.strict.checks.orderby.no.limit = true;
--如果为true,会限制笛卡尔积的查询。
set hive.strict.checks.caresian.product = true;
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信