一文了解函数式查询优化器Spark SQL Catalyst
记录一下个人对sparkSql的catalyst这个函数式的可扩展的查询优化器的理解,目录如下:
Spark SQL的核心是Catalyst优化器,是以一种新颖的方式利用Scala的的模式匹配和quasiquotes机制来构建的可扩展查询优化器。
sparkSql pipeline
sparkSql的catalyst优化器是整个sparkSql pipeline的中间核心部分,其执行策略主要两方向,
- 基于规则优化/Rule Based Optimizer/RBO
- nestedLoopsJoin,P,Q双表两个大循环, O(M*N)
- sortMergeJoin是P,Q双表排序后互相游标
- broadcastHashJoin,PQ双表中小表放入内存hash表,大表遍历O(1)方式取小表内容
- 一种经验式、启发式优化思路
- 对于核心优化算子join有点力不从心,如两张表执行join,到底使用broadcaseHashJoin还是sortMergeJoin,目前sparkSql是通过手工设定参数来确定的,如果一个表的数据量小于某个阈值(默认10M?)就使用broadcastHashJoin
- 基于代价优化/Cost Based Optimizer/CBO
- 针对每个join评估当前两张表使用每种join策略的代价,根据代价估算确定一种代价最小的方案
- 不同physical plans输入到代价模型(目前是统计),调整join顺序,减少中间shuffle数据集大小,达到最优输出
- Parser,利用ANTLR将sparkSql字符串解析为抽象语法树AST,称为unresolved logical plan/ULP
- Analyzer,借助于数据元数据catalog将ULP解析为logical plan/LP
- Optimizer,根据各种RBO,CBO优化策略得到optimized logical plan/OLP,主要是对Logical Plan进行剪枝,合并等操作,进而删除掉一些无用计算,或对一些计算的多个步骤进行合并
Optimizer是catalyst工作最后阶段了,后面生成physical plan以及执行,主要是由sparkSql来完成。
- SparkPlanner
- 优化后的逻辑执行计划OLP依然是逻辑的,并不能被spark系统理解,此时需要将OLP转换成physical plan
- 从逻辑计划/OLP生成一个或多个物理执行计划,基于成本模型cost model从中选择一个
- Code generation
- 生成Java bytecode然后在每一台机器上执行,形成RDD graph/DAG
将sparkSql字符串切分成一个一个token,再根据一定语义规则解析为一个抽象语法树/AST。Parser模块目前基本都使用第三方类库ANTLR来实现,比如Hive,presto,sparkSql等。
parser切词
Spark 1.x版本使用的是Scala原生的Parser Combinator构建词法和语法分析器,而Spark 2.x版本使用的是第三方语法解析器工具ANTLR4。
Spark2.x SQL语句的解析采用的是ANTLR4,ANTLR4根据语法文件SqlBase.g4自动解析生成两个Java类:词法解析器SqlBaseLexer和语法解析器SqlBaseParser。
SqlBaseLexer和SqlBaseParser都是使用ANTLR4自动生成的Java类。使用这两个解析器将SQL字符串语句解析成了ANTLR4的ParseTree语法树结构。然后在parsePlan过程中,使用AstBuilder.scala将ParseTree转换成catalyst表达式逻辑计划LogicalPlan。
通过解析后ULP有了基本骨架,但是系统对表的字段信息是不知道的。如sum,select,join,where还有score,people都表示什么含义,此时需要基本的来表达这些token。最重要的元数据信息就是,
- 表的schema信息,主要包括表的基本定义(表名、列名、数据类型)、表的数据格式(json、text、parquet、压缩格式等)、表的物理位置
- 基本函数信息,主要是指类信息
Analyzer会再次遍历整个AST,对树上的每个节点进行数据类型绑定以及函数绑定,比如people词素会根据元数据表信息解析为包含age、id以及name三列的表,people.age会被解析为数据类型为int的变量,sum会被解析为特定的聚合函数,
词义注入
Optimizer是catalyst的核心,分为RBO和CBO两种。 RBO的优化策略就是对语法树进行一次遍历,模式匹配能够满足特定规则的节点,再进行相应的等价转换,即将一棵树等价地转换为另一棵树。SQL中经典的常见优化规则有,
- 谓词下推(predicate pushdown)
- 常量累加(constant folding)
- 列值裁剪(column pruning)
- Limits合并(combine limits)
由下往上走,从join后再filter优化为filter再join
从`100+80`优化为`180`,避免每一条record都需要执行一次`100+80`的操作
剪裁不需要的字段,特别是嵌套里面的不需要字段。如只需people.age,不需要people.address,那么可以将address字段丢弃
至此,OLP已经得到了比较完善的优化,然而此时OLP依然没有办法真正执行,它们只是逻辑上可行,实际上spark并不知道如何去执行这个OLP。
- 比如join只是一个抽象概念,代表两个表根据相同的id进行合并,然而具体怎么实现这个合并,逻辑执行计划并没有说明
optimized logical plan -> physical plan
此时就需要将左边的OLP转换为physical plan物理执行计划,将逻辑上可行的执行计划变为spark可以真正执行的计划。
- 比如join算子,spark根据不同场景为该算子制定了不同的算法策略,有broadcastHashJoin、shuffleHashJoin以及sortMergeJoin,物理执行计划实际上就是在这些具体实现中挑选一个耗时最小的算法实现,这个过程涉及到cost model/CBO
CBO off
CBO on
CBO中常见的优化是,以便尽量减少中间shuffle数据集大小,达到最优输出。
- WholeStageCodegen,将多个operators合并成一个java函数,从而提高执行速度
- Project,投影/只取所需列
- Exchange,stage间隔,产生了shuffle