Spark SQL调优

Spark SQL调优

查看执行计划

语法格式:

1
EXPLAIN [SIMPLE|EXTENDED|CODEGEN|FORMATTED] query-sql

其中可选项:

  • SIMPLE:只展示物理执行计划
  • EXTENDED:展示逻辑和物理执行计划
  • CODEGEN:展示可执行Java代码
  • FORMATTED:展示格式化的物理执行计划

执行计划主要包括四部分:

  • Parsed Logical Plan:解析后的逻辑执行计划,这一步会检验SQL语法是否有问题,不检查表名、列名。
  • Analyzed Logical Plan:分析后的逻辑执行计划,这一步通过访问Spark中的Catalog存储库来解析验证语义、列名、类型、表名等。
  • Optimized Logical Plan:优化后的逻辑执行计划,这一步Spark中的Catalyst优化器会根据各种规则对SQL进行优化。
  • Physical Plan:物理执行计划。根据优化后的逻辑执行计划生成。

运算符说明:

  • HashAggregate:表示数据聚合。一般HashAggregate是成对出现,第一个HashAggregate是将执行节点本地的数据进行局部聚合,另一个HashAggregate是将各个分区的数据进行进一步聚合。
  • Exchange:其实就是Shuffle。表示需要在集群上移动数据。很多时候HashAggregate会以Exchange分割开来。
  • Project:SQL中的投影操作,就是选择列(例如select name,age等)
  • BroadcastHashJoin:表示通过基于广播方式进行HashJoin。
  • LocalTableScan:全表扫描本地的表。

资源调优

  • executor-cores:每个executor的最大核数。例如单节点32核,设置为4,那么该节点最多可以启8个executor。
  • num-executors:executor的总执行数(节点 x 单节点的executors数)。例如上面如果有三个节点,那么num-executors= 8 * 3
  • executor-memory:每个executor的内存数。该参数=yarn-nodemanager.resource.memory-mb/每个节点的excutor数量。

并行度优化

1
2
--Shuffle Reduce阶段默认的并行度。默认值为200。此参数只能控制Spark SQL分区个数,不能控制RDD分区个数。
set spark.sql.shuffle.partitions=600;

基于RBO优化(Catalyst 优化器完成)

RBO:基于规则的优化。
主要包括三方面:

  • 谓词下推(Predicate Pushdown)指的是将过滤条件尽可能前移,减少下游数据的处理量。
  • 列裁剪(Column Pruning)指的是扫描数据源的时候,只读取那些与查询相关的字段。
  • 常量替换(Constant Folding)指的是Catalyst优化器自动替换SQL中的常量。例如将age=(10+20)替换成age=30

基于CBO优化(默认关闭)

CBO:基于代价的优化。

1
2
3
4
5
6
--开启CBO,开启CBO后,CBO优化器可以基于表和列的统计信息,进行一系列的估算,最终选择出最优的查询计划。
spark.sql.cbo.enabled=true;
--使用CBO来自动调整连续的inner join顺序。默认false。
spark.sql.cbo.joinReorder.enabled=true;
--使用CBO来自动调整连续的inner join的表的个数阈值。默认12。
spark.sql.cbo.joinReorder.dp.threshold=6;

Spark3.0 AQE(默认开启)

AQE(Adaptive Query Execution),即自适应查询执行。AQE是Spark SQL的一种动态优化机制。在运行时,每当Shuffle Map阶段执行完成完毕,AQE都会结合这个阶段的统计信息,来完成对原始查询语句的运行时优化。
主要包括以下三方便优化:

  • 动态合并分区
  • 动态切换Join策略
  • 动态优化Join倾斜

相关参数如下:

1
2
3
4
-- AQE总开关。默认开启。
spark.sql.adaptive.enabled=true
-- 是否开启倾斜join检测,如果开启了会将倾斜的分区拆成多个分区。
spark.sql.adaptive.skewJoin.enabled=true

Spark3.0 DPP(默认开启)

DPP(Dynamic Partition Pruning),即动态分区裁剪。DPP的核心思路是先将Join一侧作为子查询计算出来,再将其所有分区用到Join另一侧作为表过滤条件,从而实现对分区的动态裁剪。
相关参数如下:

1
2
--默认开启
spark.sql.optimizer.dynamicPartitionPruning.enabled = true;

Spark3.0 Hint增强

在 spark2.4 的时候就有了Hint功能,不过只有 broadcasthash join 的 Hint,Spark3.0增加了sort merge join,shuffle_hash join,shuffle_replicate nested loop join。

1
2
3
4
5
-- BROADCAST
select /*+ BROADCAST(school) */ *
from test_student student
left join test_school school
on student.id=school.id

数据倾斜

数据倾斜是指在Spark SQL执行过程中,大多数Task任务运行速度很快,但是有少部分Task运行极其缓慢,甚至出现OOM。
解决数据倾斜主要依赖调整SQL。

过滤掉NULL值

如果SQL语句Join字段中出现NULL值,可能会造成笛卡尔积,从而导致出现数据倾斜。

1
2
--过滤掉null值
SELECT * FROM t1 where t1.id is not null

拆分大Key

通过调整SQL来将大Key拆分。例如:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
--原SQL
select * from table_a
join table_b
on table_a.id = table_b.id

--调整之后的SQL
select * from (
--打散操作,rand() 随机数范围默认为0到1,*2转换为int之后必然为0或者1。打散的越多,b表需要扩容的越多。
select concat(id,’_’,cast(rand()*2 as int)) id
from table_a
) ta
Join(
--扩容操作,保证a表和b表能关联上。
select concat(id,’_’,0) id from table_b
union all
select concat(id,’_’,1) id from table_b

) tb
on ta.id =tb.id

小表广播

1
2
## 设置阈值,默认10M。-1表示禁用广播。
spark.sql.autoBroadcastJoinThreshold=104857600
打赏
  • 版权声明: 本博客所有文章除特别声明外,著作权归作者所有。转载请注明出处!
  • Copyrights © 2015-2023 henrrywan

请我喝杯咖啡吧~

支付宝
微信