深入理解 Apache Calcite-Catalog
Calcite 支持管理底层数据库的元数据信息和根据元数据优化查询 SQL。但是 Calcite 不直接存储底层系统中的元数据信息,用户需要先将元数据注册到 Calcite Catalog 中,才可以使用Calcite提供的能力。
Catalog
在 Caclite 中,Catalog 主要用来定义 SQL 查询过程中所需要的元数据和命名空间,主要包含以下内容:
- Schema: 表和函数的命名空间。也支持任意级别的嵌套子schema。
- Table: 代表一个数据集,其中所有字段的类型由RelDataType定义。
- RelDataType: 代表一个数据集中的所有字段。简单的说,描述了表中一行数据的类型。
- Function: 代表注册在 Schema 中的一个函数,接收参数并返回类型。
- Lattice: 晶格,用于定义多维数据模型和自动化管理物化视图(Materialized View)的核心概念。
Catalog的基础实现是抽象类 CalciteSchema。CalciteSchema 有 CachingCalciteSchema 和 SimpleCalciteSchema 两个子类,他们的区别主要是是否查找、函数和类型。
classDiagram
class CalciteSchema {
- CalciteSchema parent
+ Schema schema
+ Sring name
# NameMap~CalciteSchema~ subSchemaMap
+ add(String name, Schema schema) CalciteSchema
+ add(String tableName, Table table) TableEntry
+ add(String name, RelProtoDataType type) TypeEntry
- add(String name, Function function) FunctionEntry
+ getSubSchema(String schemaName, boolean caseSensitive) CalciteSchema
+ getTable(String tableName, boolean caseSensitive) TableEntry
+ getType(String name, boolean caseSensitive) TypeEntry
+ getFunctions(String name, boolean caseSensitive) Collection~Function~
+ isCacheEnabled() boolean
+ setCache(boolean cache)
}
CalciteSchema o-- Schema
<<Abstract>> CalciteSchema
class CachingCalciteSchema
CalciteSchema <|--CachingCalciteSchema
note for CachingCalciteSchema "支持查询缓存的Catalog 实现"
class SimpleCalciteSchema
CalciteSchema <|--SimpleCalciteSchema
note for SimpleCalciteSchema "简单的Catalog 实现"CalciteSchema 实际上是 Schema的 Wrapper,两者间是 1:1 的关系。由于 Schema 支持嵌套,CalciteSchema也支持了嵌套关系:
- Parent 字段就指向了父节点
- schema 字段是内部的 Schema
entry
CalciteSchema 使用下面的字段存储 name 和 entry(table,type,function,lattice) 的映射关系,支持通过 add*()方法添加 ,get*()查找和remove*() 删除。
注意, 上述的修改方法不会影响内部的 Schema。
1 | public abstract class CalciteSchema { |
NameMap<V>内部有一个NavigableMap<String,V>(继承自SortedMap),支持返回子视图,支持范围查询。NameMultimap<V>内部是一个NameMap<List<V>>。NameSet内部是一个NameMap<Object>,其中 value 都是默认值Object DUMMY = new Object();。
Entry的子类有4类
classDiagram
class Entry{
+ CalciteSchema schema
+ String name
+ path() List~String~
}
class TableEntry{
+ getTable() Table
}
<<Abstract>> TableEntry
class TableEntryImpl
TableEntry <|.. TableEntryImpl
Entry <|-- TableEntry
class TypeEntry{
+ getType() RelProtoDataType
}
<<Abstract>> TypeEntry
class TypeEntryImpl
TypeEntry <|.. TypeEntryImpl
Entry <|-- TypeEntry
class FunctionEntry{
+ getFunction() Function
+ isMaterialization() boolean
}
<<Abstract>> FunctionEntry
class FunctionEntryImpl
FunctionEntry <|.. FunctionEntryImpl
Entry <|-- FunctionEntry
class LatticeEntry{
+ getLattice() Lattice
+ getStartTable() TableEntry
}
<<Abstract>> LatticeEntry
class LatticeEntryImpl
LatticeEntry <|.. LatticeEntryImpl
Entry <|-- LatticeEntryLookup
Lookup 代表 Catalog 中的一次(大小写敏感/不敏感)查询。
1 | public interface Lookup<T> { |
Lookup 有以下实现:
classDiagram
class Lookup~T~
<<Interface>> Lookup
class IgnoreCaseLookup~T~
<<Abstract>> IgnoreCaseLookup
Lookup <|.. IgnoreCaseLookup
class CompatibilityLookup~T~
IgnoreCaseLookup <|-- CompatibilityLookup
class ConcatLookup~T~
Lookup <|.. ConcatLookup
class EmptyLookup~T~
Lookup <|.. EmptyLookup
class LoadingCacheLookup~T~{
- Lookup~T~ delegate
- LoadingCache~String, T~ cache
- LoadingCache~String, Named~ cacheIgnoreCase
}
Lookup <|.. LoadingCacheLookup
class NameMapLookup~T~
Lookup <|.. NameMapLookup
class SnapshotLookup~T~{
- Lookup~T~ delegate
- LazyReference~Lookup~ cachedDelegate
- boolean enabled
}
Lookup <|.. SnapshotLookup
class TransformingLookup~S,T~{
- Lookup~S~ delegate
- BiFunction~S, String, T~ transform
}- IgnoreCaseLookup: 是忽略大小写的 Lookup
- CompatibilityLookup: 支持外部注入的 Lookup 逻辑
- EmptyLookup: 空 Lookup
- ConcatLookup: 多个 Lookup 结果合并(数组顺序是优先级)
- LoadingCacheLookup: 支持缓存的 Lookup
- NameMapLookup: 基于 NameMap的 LookUp
- SnapshotLookup: 存储 Lookup 结果的查询快照
- TransformingLookup: 支持结果变换的 Lookup。
schema
schema是表和函数的命名空间。也支持任意级别的嵌套子schema。大多数数据库的实现是有层级限制的,例如,大部分的 JDBC 仅支持:
- 一级 Schema
- 二级 database+catalog
1 | public interface Schema { |
下图是 calcite-core 包中的Schema类图。
classDiagram class Schema <<Interface>> Schema class AbstractSchema Schema <|.. AbstractSchema class ReflectiveSchema AbstractSchema <|-- ReflectiveSchema class MetadataSchema AbstractSchema <|-- MetadataSchema class CloneSchema AbstractSchema <|-- CloneSchema class JdbcBaseSchema <<Abstract>> JdbcBaseSchema Schema <|.. JdbcBaseSchema class JdbcCatalogSchema JdbcBaseSchema <|-- JdbcCatalogSchema class JdbcSchema JdbcBaseSchema <|-- JdbcSchema class DelegatingSchema Schema <|.. DelegatingSchema class SchemaPlus <<Interface>> SchemaPlus class SchemaPlusImpl Schema <|.. SchemaPlusImpl class MySchemaPlus Schema <|.. MySchemaPlus class SemiMutableSchema <<Interface>> SemiMutableSchema
- AbstractSchema:
- ReflectiveSchema: 通过反射对象获取Java类的schema.
- MetadataSchema: 适配avatica Meta的schema
- CloneSchema: 支持在内存中从JDBC schema拷贝tables.
- JdbcBaseSchema: 从JDBC数据源中获取Schema信息。
- JdbcSchema: 由JDBC数据源支持的Schema的实现。
- JdbcCatalogSchema: 基于JDBC catalog的 Schema 实现。
- DelegatingSchema: 代理Schema,简单的包装器。
- SchemaPlus: Schema接口的扩展。是Calcite包装schema实现的包装器。
- SchemaPlusImpl: SchemaPlus接口的实现类。将CalciteSchema(wrapper)适配到SchemaPlus接口。
- MySchemaPlus: 执行计划中使用。
- SemiMutableSchema: 支持新增物化视图的schema(在calcite 中未被使用)。
AbstractSchema 对 Schema 接口的方法进行了实现,并提供了可重写的 getTableMap、getFunctionMultimap 和 getSubSchemaMap 方法,用于向 Schema 中注册表、函数和子 ReflectiveSchema 和 JdbcCatalogSchema 都是继承了 AbstractSchema 完成 Schema 注册。
在calcite的其他包中还有其他的实现,例如 CsvSchema,FileSchema等。
SchemaPlus
SchemaPlus 是对 Schema接口的扩展,用户自定义的schema不需要实现这个接口,但是schema在系统实际传递中,会被这个接口包装。换句话说,就是用户只需要创建schema,而在Calcite系统的其他地方使用schema时,是通过SchemaPlus来访问的。Calcite 内部提供了 SchemaPlus 的生成方法,例如:CalciteSchema#plus() 方法。
1 | public interface SchemaPlus extends Schema { |
Table
Table代表一个数据集,其中所有字段的类型由RelDataType定义。Table的典型创建方法是Calcite从schema中调用getTable方法。一个表不需要知道自己的name。这是为了表的重用(有多个名字或在多个schema中),这个概念和UNIX文件系统中的Inode有相似之处。
1 | public interface Table { |
下图是 calcite-core 包中的部分Table类图。
classDiagram
%% 核心接口:Table
class Table {
<<interface>>
+ getRowType(RelDataTypeFactory) RelDataType
+ getStatistic() Statistic
+ getJdbcTableType() Schema.TableType
}
%% Table 的主要直接子接口
class ScannableTable {
<<interface>>
+scan(DataContext root) Enumerable~Object[]~
}
class FilterableTable {
<<interface>>
+scan(DataContext root, List~RexNode~ filters) Enumerable~Object[]~
}
class ProjectableFilterableTable {
<<interface>>
+scan(DataContext root, List~RexNode~ filters, int[] projects) Enumerable~Object[]~
}
class TranslatableTable {
<<interface>>
+toRel(RelOptTable.ToRelContext context, RelOptTable table) RelNode
}
class StreamableTable {
<<interface>>
+stream() Table
}
class QueryableTable{
<<interface>>
+ asQueryable(QueryProvider queryProvider, SchemaPlus schema, String tableName) Queryable~T~
}
QueryableTable --|> Table
%% 抽象实现类
class AbstractQueryableTable {
<<abstract>>
#AbstractQueryableTable(String elementType)
+getRowType(...) RelDataType
+getElementType() String
}
class AbstractTable {
<<abstract>>
+AbstractTable()
+getRowType(...) RelDataType
+getStatistic() Statistic
+getJdbcTableType() Schema.TableType
}
%% 实现关系
Table <|-- ScannableTable
Table <|-- FilterableTable
Table <|-- ProjectableFilterableTable
Table <|-- TranslatableTable
Table <|-- StreamableTable
%% 抽象类实现接口
AbstractTable ..|> Table
AbstractQueryableTable ..|> QueryableTable
AbstractQueryableTable --|> AbstractTable
%% 抽象类的常见具体用途
class ViewTable {
+ViewTable(...)
}
class MaterializedViewTable {
+MaterializedViewTable(...)
}
class JdbcTable
AbstractQueryableTable <|-- JdbcTable
%% 具体实现继承抽象类
ViewTable --|> AbstractTable
MaterializedViewTable --|> AbstractTable| 类/接口 | 角色与核心作用 |
|---|---|
| Table (接口) | 最顶层的表抽象。定义了获取行类型(RowType)、统计信息(Statistic)和表类型(JdbcTableType)等元数据的基本方法。 |
| ScannableTable (接口) | 可扫描表。表示可以直接枚举所有行的最基础表类型。scan方法返回一个Enumerable对象。 |
| FilterableTable (接口) | 可过滤表。在ScannableTable基础上增强,允许在扫描时将过滤条件下推,减少数据读取量。 |
| ProjectableFilterableTable (接口) | 可投影和过滤的表。进一步允许将列选择(projects)条件下推,是最强大的基础表类型之一。 |
| TranslatableTable (接口) | 可翻译的表。这是Calcite优化的核心。实现此接口的表可以将自己“翻译”成一个关系代数表达式(RelNode),让优化器能对其进行完整的规则优化(如谓词下推、聚合计算等)。很多高级适配器(如JDBC、Cassandra)都实现此接口。 |
| StreamableTable (接口) | 可流的表。表示可以产生流式数据的表。 |
| AbstractTable (抽象类) | 基础的骨架实现。提供了getRowType, getStatistic等方法的简单默认实现。自定义简单表时通常直接继承此类。ViewTable和MaterializedViewTable就继承自它。 |
| AbstractQueryableTable (抽象类) | 用于实现可查询的表,与Calcite的查询能力集成。 |
| ViewTable(类) | 用于视图处理,通过将视图定义语句 viewSql 转化为 AST 及关系代数,并在 toRel 处理过程中将原有的视图查询语句展开,变换为对原始表的查询,从而实现视图语义。 |
| MaterializedViewTable (类) | 物化视图表 |
RelDataType
RelDataType 代表了关系表达式返回的数据行类型或者标量表达式的类型,Calcite 支持了所有的 SQL 数据类型,也包括结构和数组类型。RelDataType 接口中的主要方法如下:
1 | public interface RelDataType { |
RelDataTypeField 代表了struct类型中的一个字段.
1 | public interface RelDataTypeField extends Map.Entry<String, RelDataType> { |
Function
Calcite 对函数的定义是:接受参数并返回结果的命名表达式。函数通过 Schema 进行注册,可以通过 Schema#getFunctions获取函数,然后根据参数类型获取对应的函数。下面是 Function 接口声明:
1 | public interface Function { |
Function 接口提供了 getParameters 获取函数参数的方法,Function 接口有 ScalarFunction、AggregateFunction、TableFunction 和 TableMarco 等几个主要的子接口。
- ScalarFunction 对应标量函数,也就是函数返回的结果为一个标量- AggregateFunction 对应聚合函数,会将多个值聚合计算为一个标量返回。
- TableFunction 和 TableMacro 都对应了表函数,会返回一个表,他们的区别是:
- TableMacro 会在编译期间进行调用,编译期展开表达式允许 Calcite 实现更加强大的查询优化,例如我们可以对视图在编译期进行展开。
- 相比于 TableMacro,TableFunction 则需要在执行阶段才能知道表的结果。
以上介绍了 Calcite Catalog 体系中涉及到的类及其主要作用,下面将介绍Calcite 如何注入Catalog。
Catalog 注入
以上篇Apache Calcite Quick Start中的quick start demo为例。
加载Jdbc Driver
calcite-core包中的META-INF/services文件夹中有文件java.sql.Driver
1 | org.apache.calcite.jdbc.Driver |
JVM 启动时,会通过 ServiceLoader加载Driver服务。
1 | public class Driver extends UnregisteredDriver { |
CalciteConnection
当我们使用Connection connection = DriverManager.getConnection("jdbc:calcite:", info);获取连接时,DriverManager会使用上面注册的Driver 生成连接,最终生成的连接是CalciteConnectionImpl。
1 | // org.apache.calcite.avatica.UnregisteredDriver |
CalciteConnectionImpl在初始化时会主动创建 rootSchema。
1 | // CalciteConnectionImpl#<init> |
CalciteSchema.createRootSchema方法会根据参数传递的标识决定是否创建 metadata Schema,metadata Schema 会注册 COLUMNS、TABLES 等系统表以提供相关的查询。
注入Catalog
注入 Catalog 有两种方式:
- 编程API
- Json配置文件
编程 API
上面的 Demo 就是通过编程 API 方式注入 Catalog。
1 | // 获取Calcite的根Schema |
Json 配置文件
官方CSV Catalog就是使用 Json 配置方式注入 Catalog。下面是CSV Catalog 配置文件的一个例子。
1 | { |
然后将配置文件路径放到 创建连接时的Properties属性中:
1 | Properties info = new Properties(); |
org.apache.calcite.jdbc.Driver在创建完 Connection 后,会通过钩子函数注入 Catalog:
1 |
|