Trino源码学习-执行计划生成
上篇分析了trino提交查询部分的源码,本篇来分析下,构建执行计划部分的源码。
DDL执行
DDL执行通过QueryExecution的子类DataDefinitionExecution实现。
1 | public void start() |
DDL执行较为简单,通过内部的DataDefinitionTask执行,一般都是通过Metadata接口进行操作。Metadata提供了对元数据的操作API,其实现基于Connector的ConnectorMetadata实现提供。对于部分Task(例如createTable),也会调用Analyzer进行分析。
flowchart LR Metadata --> MetadataManager MetadataManager --> CatalogMetadata CatalogMetadata --> ConnectorMetadata
sql执行
sql执行是通过QueryExecution的子类SqlQueryExecution实现的。
sql执行前的语法树分析
在SqlQueryExecution的构造器中会通过Analyzer分析语法树。
1 | // io.trino.execution.SqlQueryExecution |
下面来关注下analyzer中做了什么。
1 | public Analysis analyze(Statement statement, QueryType queryType) |
statementRewrite.Rewrite接口的rewrite方法会重写部分语句(例如将show tables命令,改为从元信息表information_schema.tables中查询)。在Rewrite接口的每个实现类中,都有AstVisitor的子类。Rewrite接口的Rewrite方法实际上是通过遍历语法树的visitor实现的。
classDiagram
class AstVisitor
DescribeInputRewrite..>AstVisitor
ShowQueriesRewrite ..>AstVisitor
DescribeOutputRewrite ..>AstVisitor
ExplainRewrite..>AstVisitor
ShowStatsRewrite..>AstVisitor
class Rewrite{
<<Interface>>
+ rewrite(AnalyzerFactory analyzerFactory, Session session, Statement node, List[Expression] parameters, Map[NodeRef[Parameter], Expression] parameterLookup,WarningCollector warningCollector): Statement
}
DescribeInputRewrite --|>Rewrite
ShowQueriesRewrite --|>Rewrite
DescribeOutputRewrite --|>Rewrite
ExplainRewrite--|>Rewrite
ShowStatsRewrite--|>Rewrite重写完的Statement将通过StatementAnalyzer进一步分析。在StatementAnalyzer分析中会用到Metadata。
StatementAnalyzer对每个Statement实现子类分析后会得到Scope.
1 | public class Scope |
对于Select和Show 语句,返回的是结果视图结构,对于insert,delete和create table as select语句返回的字段只有一列(语句操作的行数)。
此外在StatementAnalyzer中还会调用AggregationAnalyzer和ExpressionAnalyzer的方法。
- AggregationAnalyzer会分析表达式和group的关系
- ExpressionAnalyzer会返回表达式的返回值类型。
sql执行计划入口
Sql查询的入口是start方法
1 | // io.trino.execution.SqlQueryExecution |
生成sql执行计划
SqlQueryExecution通过planQuery(),生成Query的执行计划。
1 | // io.trino.execution.SqlQueryExecution |
逻辑计划
LogicalPlanner类会根据分析后的SQL语句,生成逻辑执行计划Plan。逻辑执行计划是一个有向图,图中的每个节点都是一个PlanNode。
1 | public abstract class PlanNode |
可以看到,每个planNode都有输入和输出,如果将输入和输出的planNode分别一一对应连接起来就构成了一个有向图。planNode的所有实现类都在io.trino.sql.planner.plan包下。这里就不一一赘述了。
下面来看下逻辑计划是如何生成的。
1 | public Plan plan(Analysis analysis, Stage stage, boolean collectPlanStatistics) |
生成逻辑计划
1 | // io.trino.sql.planner.LogicalPlanner |
对于上面的分支,我们主要分析下Query对应的createRelationPlan分支。
1 | // io.trino.sql.planner.LogicalPlanner |
RelationPlanner继承自AstVisitor,覆写了下面几个方法:
- visitTable:
- visitTableFunctionInvocation
- visitAliasedRelation
- visitPatternRecognitionRelation
- visitSampledRelation
- visitLateral
- visitJoin
- visitTableSubquery
- visitQuery:使用QueryPlanner分析
- visitQuerySpecification:使用QueryPlanner分析
- visitValues
- visitUnnest
- visitUnion
- visitIntersect
- visitExcept
- visitSubqueryExpression
除了最后的SubqueryExpression(里面包含了Query节点),其他类型都是Relation的子类。
以简单SELECT * FROM system.runtime.nodes为例:
该查询会通过RelationPlanner.visitTable方法处理,生成如下逻辑计划:
flowchart TD
TableScanNode --> OutputNode对于带join的查询:
1 | select |
会生成如下逻辑计划:
flowchart LR
ProjectNode1[ProjectNode]-->OutputNode
ProjectNode2[ProjectNode] --> ProjectNode1
ProjectNode3[ProjectNode] --> ProjectNode2
ProjectNode4[ProjectNode] --> ProjectNode3
FilterNode --> ProjectNode4
JoinNode --> FilterNode
ProjectNodeL1[ProjectNode]-->|left|JoinNode
ProjectNodeL2[ProjectNode] --> ProjectNodeL1
TableScanNodeL[TableScanNode] -->ProjectNodeL2
ProjectNodeR1[ProjectNode]-->|right|JoinNode
ProjectNodeR2[ProjectNode] --> ProjectNodeR1
TableScanNodeR[TableScanNode] -->ProjectNodeR2逻辑计划优化
在生成逻辑计划后,会遍历所有的PlanOptimizer来优化逻辑执行计划。
1 | public interface PlanOptimizer |
Trino支持的优化器从类型上来看有两种,Rule-Based和Cost-Based。当前Trino的Cost-Based优化器支持并不全面.本篇主要介绍Rule-Based优化器的架构,对于Cost-Based优化器将在后面的文章中介绍。
Rule-Based的优化器从实现上分为两种:
- io.trino.sql.planner.plan.SimplePlanRewriter: PlanOptimizer内置一个SimplePlanRewriter,SimplePlanRewriter继承自PlanVisitor,通过一次遍历(大多数情况下是一次遍历)重写Plan(例如PredicatePushDown 谓词下推)。
- io.trino.sql.planner.iterative.Rule: PlanOptimizer是IterativeOptimizer,支持传入多个Rule。Rule中包含Pattern和查询match上Pattern后的重写逻辑(例如PruneProjectColumns 删除无用投影字段 )。
IterativeOptimizer 驱动rule。IterativeOptimizer内部存储了Rule列表。
- 通过递归的方式(类似深度优先遍历)去驱动Rule
- 先优化自己,然后再优化孩子节点
- 如果孩子节点发生了变化,会再次尝试对自身进行优化。
- 如果节点不再发生变化则返回。
- 支持超时检测。
IterativeOptimizer的驱动时序图如下:
sequenceDiagram
IterativeOptimizer#exploreGroup-->>IterativeOptimizer#exploreNode: 优化自己
IterativeOptimizer#exploreNode->>checkTimeoutNotExhausted:检测超时
checkTimeoutNotExhausted-->>IterativeOptimizer#exploreNode: 未超时
loop each rule
IterativeOptimizer#exploreNode-->Rule: transform plan
Rule-->>IterativeOptimizer#exploreNode: optimized
end
IterativeOptimizer#exploreNode-->> IterativeOptimizer#exploreGroup: 自己优化完成
break Children not change or self not change
IterativeOptimizer#exploreGroup -->> IterativeOptimizer#exploreChildren: 优化孩子节点
loop each child Node
IterativeOptimizer#exploreChildren-->>IterativeOptimizer#exploreGroup: 遍历优化child节点
IterativeOptimizer#exploreGroup-->>IterativeOptimizer#exploreChildren: child节点优化完成
end
IterativeOptimizer#exploreChildren -->> IterativeOptimizer#exploreGroup: 所有孩子节点优化完成
IterativeOptimizer#exploreGroup -->>IterativeOptimizer#exploreNode: 优化自己
end接下来我们看看Rule的实现:
1 | public interface Rule<T> |
- pattern是一个链表结构,previous指针指向 上一个Pattern。
- pattern使用accept进行匹配,匹配时的入参是 Node和Captures,返回的参数是Node和Captures
- Node 是指plan节点。
- Captures是一个链表,内部包含了每个pattern节点捕获的信息和一个尾指针。
flowchart
subgraph input
nodeI(PlanNode)
CapturesI(Captures)
end
input -->|accpet|pattern
subgraph pattern
a(pattern A) --> null
b(pattern B) -->|previous|a
c(pattern C) -->|previous|b
end
pattern -->Match
subgraph Match
CapturesO(Captures.NIL) -->|tail| CapturesOa(Captures A)
CapturesOa -->|tail| CapturesOb(Captures B)
CapturesOb -->|tail| CapturesOc(Captures C)
end对于上面带join的查询:
1 | select |
优化后的语法树节点如下:
flowchart
JoinNode[JoinNode,DistributionType=partitioned]-->OutputNode
ExchangeNode1[ExchangeNode,scope=remote,type=repartition] -->|left|JoinNode
ExchangeNode2[ExchangeNode,scope=local,type=repartition] -->|right|JoinNode
ProjectNode1[ProjectNode]-->ExchangeNode1
FilterNode1[FilterNode]-->ProjectNode1
TableScanNode1[TableScanNode]-->FilterNode1
ExchangeNode3[ExchangeNode,scope=remote,type=repartition]-->ExchangeNode2
ProjectNode2[ProjectNode]-->ExchangeNode3
TableScanNode2[TableScanNode]-->ProjectNode2值得注意的是Exchange节点是通过AddExchanges等优化规则加入语法树节点的,后续将通过Exchange节点拆分执行计划。
Trino目前支持的Join有两种,partitioned(Hash join)和replicated(broadcast join)
逻辑计划分段
执行计划分段的实现方法是PlanFragmenter#createSubPlans。PlanFragmenter会将PlanNode树构建为SubPlan树。
1 | //io.trino.sql.planner.SubPlan |
PlanFragmenter中内置了一个Fragmenter,Fragmenter是SimplePlanRewriter的实现类。主要的片段拆分逻辑依靠ExchangeNode。
1 | public PlanNode visitExchange(ExchangeNode exchange, RewriteContext<FragmentProperties> context) |
对于上面带join的查询:
1 | select |
生成的Subplan结构如下:
flowchart
subgraph subPlan0
subgraph planFragment0
OutputNode0(OutputNode)-->|source|JoinNode0(JoinNode)
JoinNode0-->|left source|RemoteSourceNode0(RemoteSourceNode)
JoinNode0-->|right source|ExchangeNode0(ExchangeNode0)
ExchangeNode0-->|source|RemoteSourceNode01(RemoteSourceNode)
end
end
subgraph subPlan1
subgraph planFragment1
ProjectNode1(ProjectNode)-->|source|FilterNode1(FilterNode)
FilterNode1-->|source|TableScanNode1(TableScanNode,system.runtime.tasks)
end
end
subgraph subPlan2
subgraph planFragment2
ProjectNode2(ProjectNode)-->|source|TableScanNode2(TableScanNode,system.runtime.nodes)
end
end
RemoteSourceNode0-.->ProjectNode1
RemoteSourceNode01-.->ProjectNode2详细执行计划可以通过explain关键字返回:
1 | Fragment 0 [HASH] |
PartitioningHandle
PartitioningHandle定义了执行计划中的分区状况。例如:
- 在上文中介绍的AddExchanges优化规则中,会设置不同分区的ExchangeNode。
- 在QueryPlaner中visitMerge时,会在MergeWriterNode中设置MergePartitioningHandle。
- 在上文介绍的Fragmenter中,会在访问不同planNode时,设置上下文的PartitioningHandle,然后在buildFragment时,将PartitioningHandle设置到PlanFragment中。例如在处理TableScan时, TableScan会从Connector中获取ConnectorPartitioningHandle的实现类。
1 |
|
SystemPartitioningHandle
SystemPartitioningHandle是Trino系统默认的分区方式。有5种内置分区类型:
- SINGLE: 在单个节点上执行,通常是用于汇总结果。
- FIXED: 将数据分散到固定的多个节点上执行。
- SOURCE: 一般是用于从数据源读取表
- COORDINATOR_ONLY: 一般只在COORDINATOR上执行。
- ARBITRARY: 表示无限制,动态扩展的
在执行计划中会有类似的输出 Fragment 0 [HASH],描述Fragment的分区方式,FIXED和ARBITRARY方式会打印使用的函数。
1 | public final class SystemPartitioningHandle |
系统默认组合如下:
| Function\Partitioning | SINGLE | COORDINATOR_ONLY |
|---|---|---|
| SINGLE | SINGLE_DISTRIBUTION | COORDINATOR_DISTRIBUTION |
| Function\Partitioning | FIXED |
|---|---|
| HASH | FIXED_HASH_DISTRIBUTION |
| ROUND_ROBIN | FIXED_ARBITRARY_DISTRIBUTION |
| BROADCAST | FIXED_BROADCAST_DISTRIBUTION |
| UNKNOWN | FIXED_PASSTHROUGH_DISTRIBUTION |
| Function\Partitioning | ARBITRARY |
|---|---|
| ROUND_ROBIN | SCALED_WRITER_ROUND_ROBIN_DISTRIBUTION |
| UNKNOWN | ARBITRARY_DISTRIBUTION |
| Function\Partitioning | SOURCE |
|---|---|
| UNKNOWN | SOURCE_DISTRIBUTION |