doris-查询原理

2023-11-19


目录

一.查询简介

二.查询流程

1.Query 接收

2.Query Parse

3.Query Analyze

4.Query Rewrite

5.Plan

5.1 Query 单机Plan

5.2 Query 分布式Plan

6.Query Schedule

7.Query Execute

OlapScanNode

AggregationNode

PartitionedAggregationNode

ExchangeNode

HashJoinNode

三.总结

一.查询简介

Doris 的查询和大多数数据库一样,需要经过 Parse,Analyze,Optimize,Plan, Schedule, Execute 等过程。 在 Doris 中,FE 负责查询的 Parse/解析,Analyze/分析,Optimize/优化,Plan/生成计划, Schedule/调度,BE 负责查询的 Execute/执行。Doris 查询的分布式执行是MPP 架构,相比于 Kylin 和 Druid 的 Scatter-Gather 架构,可以很好地支持 Shuffle 操作,所以对大数据集的 Join 和聚合,处理效率会更高。 Doris 的单机查询执行模型是 Batch 模式的Volcano 模型,相比于 tuple one time 的 Volcano 模型,解释执行的开销更低,CPU 利用效率更高。

二.查询流程

1.Query 接收

Doris 兼容 Mysql 协议,用户可以通过 Mysql 客户端和绝大多数兼容 Mysql 协议的 BI 工具向 Doris 发送 查询。

Doris 的 MysqlServer 接收用户的请求,每个请求都会封装成一个 ConnectContext, ConnectScheduler 会维护一个线程池,每个 ConnectContext 会在一个线程中由 ConnectProcessor 进程处理。 ConnectProcessor 负责查询的处理,审计和返回查询结果给客户端。

2.Query Parse

ConnectProcessor 会首先进行查询 SQL 的 Parse。

Doris 使用的 Parse 是 Java CUP Parser,语法规则 定义的文件在 sql_parser.cup。

Query Parse 的输入是 SQL 的 String 字符串,Query Parse 的输出是 Abstract Syntax Tree,每个节点都是一个 ParseNode 。

下面是 Doris 中 SqlParser 用法的示例,这个示例中 parse 的结果 StatementBase 就是一个 SelectStmt。

代码块

 
String originStmt = "select lo_shipmode, sum(lo_revenue) from lineorder group by lo_shipmode;"; SqlScanner input = new SqlScanner(new StringReader(originStmt), ctx.getSessionVariable().getSqlMode()); SqlParser parser = new SqlParser(input); StatementBase statementBase = SqlParserUtils.getFirstStmt(parser);

一个 SelectStmt 由 SelectList, FromClause, wherePredicate, GroupByClause, havingPredicate, OrderByElement, LimitElement 组成,对应一个 SQL 的常见组成。

3.Query Analyze

ConnectProcessor 进行 SQL Parse 得到 AST 后,由 StmtExecutor 具体负责查询的执行。StmtExecutor 会首先对 AST 进行语法和语义分析。 大概会做下面的一些事情 (每个 ParseNode 的 analyze 方法的实现):

  • 检查并绑定 Cluster, Database, Table, Column 等元信息

  • SQL 的合法性检查:窗口函数不能 DISTINCT,HLL 和 Bitmap 列不能 sum, count, where 中不能有 grouping 操作等

  • SQL 重写:比如将 select * 扩展成 select 所有列,count distinct 查询重写等

  • Table 和 Column 的别名处理

  • Tuple, Slot, 表达式分配唯一的 ID

  • 函数参数的合法性检测

  • 表达式替换

  • 类型检查,类型转换(BIGINT 和 DECIMAL 比较,BIGINT 类型需要 Cast 成 DECIMAL)

下面是 Doris 中对 AST Analyze 的示例:

代码块

 
ConnectContext ctx = new ConnectContext(channel); Analyzer analyzer = new Analyzer(ctx.getCatalog(), ctx); statementBase.analyze(analyzer);

4.Query Rewrite

StmtExecutor 在对 AST 进行语法和语义分析后,会让 ExprRewriter 根据 ExprRewriteRule 进行一次 Rewrite。目前 Doris 的重写规则比较简单,主要是进行了常量表达式的化简和谓词的简单处理。 常量表达式的化简是指 1 + 1 + 1 重写成 3,1 > 2 重写成 Flase 等。

如果重写后,有部分节点被成功改写,比如, 1 > 2 被改写成 Flase,那么就会再触发一次语法和语义分析的过程。

对于有子查询的 SQL,StmtRewriter 会进行重写,比如将 where in, where exists 重写成 semi join, where not in, where not exists 重写成 anti join。

5.Plan

5.1 Query 单机Plan

AST 经过语法和语义分析后,会首先生成单机的执行 Plan。

代码块

 
Planner planner = new Planner(); planner.plan(parsedStmt, analyzer, tQueryOptions);

单机 Plan 由 SingleNodePlanner 执行,输入是 AST,输出是单机物理执行 Plan, Plan 中每个节点是一个 PlanNode。

SingleNodePlanner 核心任务就是根据 AST 生成 OlapScanNode, AggregationNode,
HashJoinNode, SortNode, UnionNode 等。

Doris 在生成单机 Plan 的时候主要进行了以下工作或优化:

  • Slot 物化:指确定一个表达式对应的列需要 Scan 和计算,比如聚合节点的聚合函数表达式和 Group By 表达式需要进行物化

  • 投影下推:BE 在 Scan 时只会 Scan 必须读取的列

  • 谓词下推:在满足语义正确的前提下将过滤条件尽可能下推到 Scan 节点

  • 分区,分桶裁剪:比如建表时按照 UserId 分桶,每个分区 100 个分桶,那么当不包含 or 的 Filter 条件包含 UserId ==xxx 时,Doris 就只会将查询发送 100 个分桶中的一个发送给 BE,可以大大减少不必要的数据读取

  • Join Reorder:对于 Inner Join, Doris 会根据行数调整表的顺序,将大表放在前面

  • Sort + Limit 优化成 TopN

  • MaterializedView 选择:会根据查询需要的列,过滤,排序和 Join 的列,行数,列数等因素选择最佳的 MaterializedView

5.2 Query 分布式Plan

FRAGMENT内部是pipeline,FRAGMENT之间是shuffle/exchange

explain看到的执行 数据是从下面一步步‘推’到最上层

有了单机的 Plan 之后,DistributedPlanner 就会根据单机的 PlanNode 树,生成 PlanFragment 树。分布式化的目标是最小化数据移动和最大化本地 Scan。

前面示例 SQL select lo_shipmode,sum(lo_revenue) from lineorder group by lo_shipmode;生成的分布式 Plan 如下:

代码块

SQL

 
+---------------------------------------------------------------------------------+
| PLAN FRAGMENT 0                                                                 
|  OUTPUT EXPRS:<slot 2> | <slot 3>                                               
|   PARTITION: UNPARTITIONED                                                      
|   RESULT SINK                                                                   
|   4:EXCHANGE                                                                    
|      tuple ids: 1                                                               
| PLAN FRAGMENT 1                                                                 
|  OUTPUT EXPRS:                                                                  
|   PARTITION: HASH_PARTITIONED: <slot 2>                                         
|   STREAM DATA SINK                                                            
|     EXCHANGE ID: 04                                                             
|     UNPARTITIONED                                                               
|   3:AGGREGATE (merge finalize)                                                  
|   |  output: sum(<slot 3>)                                                      
|   |  group by: <slot 2>                                                         
|   |  tuple ids: 1                                                                                                                                    |
|   2:EXCHANGE                                                                    
|      tuple ids: 1                                                               |                                                                    
| PLAN FRAGMENT 2                                                                 
|  OUTPUT EXPRS:                                                                  
|   PARTITION: RANDOM                                                             |                                                                            
|   STREAM DATA SINK                                                              
|     EXCHANGE ID: 02                                                             
|     HASH_PARTITIONED: <slot 2>                                                  
|   1:AGGREGATE (update serialize)                                                
|   |  STREAMING                                                                  
|   |  output: sum(`lo_revenue`)                                                  
|   |  group by: `lo_shipmode`                                                    
|   |  tuple ids: 1                                                               
|   0:OlapScanNode                                                                
|      TABLE: lineorder                                                           
|      PREAGGREGATION: ON                                                         
|      partitions=1/1                                                             
|      rollup: lineorder                                                          
|      tabletRatio=40/40                                                          
|      cardinality=119994608                                                      
|      avgRowSize=26.643219                                                       
|      numNodes=1                                                                 
|      tuple ids: 0                                                               |
+---------------------------------------------------------------------------------+

1 个 PlanFragment 封装了在一台机器上对同一数据集的操作逻辑。

每个 PlanFragment 包含至少一个 PlanNode,上面的 PLAN FRAGMENT 2 就包含了 OlapScanNode 和 AggregationNode。

Plan 分布式化的方法是增加 ExchangeNode,执行计划树会以 ExchangeNode 为边界拆分为 PlanFragment。ExchangeNode 的作用是实现不同 BE 之间的数据交换,类型 Spark 和 MR 中的 Shuffle。

各个 Fragment 的数据流转和最终的结果发送依赖:DataSink。比如 DataStreamSink 会将一个 Fragment 的数据发送到另一个 Fragment 的 ExchangeNode,ResultSink 会将查询的结果集发送到 FE。

每个 PlanFragment 可以在每个 BE 节点生成 1 个或多个执行实例,不同执行实例处理不同的数据集,通过并发来提升查询性能。

DistributedPlanner 中最主要的工作是决定 Join 的分布式执行策略:Shuffle Join,Broadcast Join,Colocate Join,和增加 Aggregation 的 Merge 阶段。

决定 Join 的分布式执行策略的逻辑如下:

  1. 如果两种表示 Colocate Join 表,且 Join 的 Key 和分桶的 Key 一致,且两张表没有正在数据 balance,就会执行 Colocate Join

  2. 如果 Join 的右表比较少,集群节点数较少,计算出的 Broadcast Join 成本较低,就会选择 Broadcast Join,否则就会选择 Shuffle Join。

6.Query Schedule

在生成查询的分布式 Plan 之后,Coordinator 会负责 PlanFragment 的执行实例生成,PlanFragment 的调度,每个 BE 执行状态的管理,查询结果的接收。

有了分布式 Plan 之后,我们需要解决下面的问题:

  • 哪个 BE 执行哪个 PlanFragment

  • 每个 Tablet 选择哪个副本去查询

  • 如何进行多实例并发

前面提到,Doris 会先进行分区,分桶裁剪,得到需要访问的 Tablet 列表,然后对于每个 Tablet,Doris 会先选择版本匹配的,健康的,所在的 BE 状态正常的副本进行查询,然后在最终决定每个 Tablet 选择哪个副本查询的时候,是随机的方式,不过 Doris 会尽可能保证每个 BE 的请求均衡。假如我们有 10 个 BE,10 个 tablet,最终调度的结果理论上就是每个 BE 负责 1 个 tablet 的 Scan。具体逻辑在computeScanRangeAssignmentByScheduler。

当包含 Scan 的 PlanFragment 由哪些 BE 节点执行确定后,其他的 PlanFragment 实例也会在 Scan 的 BE 节点上执行,不过具体选择哪个 BE 是随机选取的。

当每个 PlanFragment 实例的 BE 节点确定后,每个 DataSink 的目标 BE 节点自然也就确定了。

多实例并发执行的话,是数据并行的方式,假如我们有 10 个 tablet,并行度设置为 5 的话,那么 Scan 所在的 PlanFragment,每个 BE 上我们可以生成 5 个执行实例,每个执行实例会分别 Scan 2 个 tablet。

当我们知道每个 PlanFragment 需要生成多少个执行实例,每个执行实例在哪个 BE 执行后,FE 就会将 PlanFragment 执行相关的参数通过 Thrift 的方式发送给 BE。


以下是BE的执行

7.Query Execute

Doris 的查询执行模式 Volcano 模式,不过做了 Batch 的优化,不同的 operator 之间以 RowBatch 的方式传输数据。

BE 的 BackendService 会接收 FE 的 查询请求,让 FragmentMgr 进行处理。
FragmentMgr::exec_plan_fragment 会启动一个线程由 PlanFragmentExecutor 具体执行一个 plan fragment。PlanFragmentExecutor 会根据 plan fragment 创建一个 ExecNode 树,FE 每个 PlanNode 都会对应 ExecNode 的一个子类。 

PlanFragmentExecutor::get_next_internal 会驱动整个 ExecNode 树的执行,会自顶向下调用每个 ExecNode 的 get_next 方法,最终数据会从 ScanNode 节点产生,向上层节点传递,每个节点都会按照自己的逻辑处理 RowBatch。 PlanFragmentExecutor 在拿到每个 RowBatch 后,如果是中间结果,就会将数据传输给其他 BE 节点,如果是最终结果,就会将数据传输给 FE 节点。

下面我们看下一些重要的 ExecNode 的处理逻辑

OlapScanNode

OlapScanNode 在拿到 FE 的请求参数后,会首先将下推到 Scan 节点的谓词表达式转为存储层的数据结构 TCondition,然后为了提高 Scan 的并发,会将 Scan 的范围切分的更细,然每个 Scan 线程可以只 Scan 部分数据,Scan 范围切分完之后,就会通过线程池启动多个线程,让每个 OlapScanner 线程 Scan 特定部分的数据。

一个 OlapScanner 会绑定一个 Tablet,OlapScanner 会根据 Scan 的 Tablet 和,Version,构造好 RowsetReader。RowsetReader 会逐行返回数据,每行数据是一个 RowCursor, OlapScanner 会先将 RowCursor 转为 Tuple, 因为在查询计算层,都是基于 Tuple 进行计算的,然后会用没有下推到存储层的 Filter, 对 Tuple 再进行一次过滤,最后会把 Tuple 加入 RowBatch 中,如果 RowBatch 满了,就会加入到 RowBatch 的 Queue 中,OlapScanNode 线程会不断从这个 Queue 中读取数据。

OlapScanNode 针对大查询做了一个优化,因为 OlapScanner 的线程池是整个 BE 进程共享的,所以可能出现一个大查询占用了所有 OlapScanner 线程,导致小查询迟迟无法 Scan。 OlapScanNode 会根据 Scan Range 的个数,Scan 的数据量确定每个 Scan 的优先级,Scan Range 个数和 Scan 数据量越小,优先级越高,不过也会定期提高大查询的优先级,避免大查询完全饿死。

AggregationNode

在 open 阶段,会消费子节点所有数据。 对于没有 group by 的聚合查询,AggregationNode::process_row_batch_no_grouping 会对每个 RowBatch 进行循环,对每个 Tuple 调用聚合函数。 如果是 Update 阶段,会调用每个聚合函数的 update function,如果 Merge 阶段,会调用每个聚合函数的 merge function。
对于有 group by 的聚合查询,就需要使用到 HashTable, HashTable 的 key 是 group by,HashTable 的 Value 是聚合函数的状态值。处理过程就是对 group by 字段相同的聚合函数的状态值不断调用聚合函数进行更新。

在 get_next 阶段,会遍历 HashTable,用 Tuple 填充 RowBatch,然后向上层返回。

PartitionedAggregationNode

核心逻辑和 AggregationNode 相同,不过做了下面几点优化:

  1. 使用了 PartitionedHashTable

  2. 支持 Spill Disk

  3. 支持 Prefetch

  4. 支持 streaming preaggregation, 对于有 group by 的聚合查询,在 update 阶段,将聚合的过程从 open 阶段移到 get_next 阶段,每接受一批 RowBatch,就会进行聚合,并输出。

ExchangeNode

前面提到,Plan 分布式化的方法是增加 ExchangeNode,DataStreamSink 会将一个 Fragment 的数据发送到另一个 Fragment 的 ExchangeNode。

具体来讲, PlanFragmentExecutor::get_next_internal 获取到一个 Rowbatch 的数据后,就会调用 DataStreamSender::send 发送数据,DataStreamSender 会为每个 BE 维护一个 Channel,每个 Channel 的数据满一个 RowBatch,就会通过 BRPC 发送到目标 BE 节点。

如果 DataStreamSender 发送的数据是有序的,DataStreamRecvr 就需要为每个 Sender 维护一个单独的队列,最后再 merge,如果是无序的,DataStreamRecvr 只需要为所有 Sender 维护一个队列。

每个 ExchangeNode 会持有一个 DataStreamSender,来获取其他 BE 节点发送来的数据。

HashJoinNode

HashJoin的核心步骤是两步,分为 Build 和 Probe 两个阶段。
Build 阶段:根据 Inner 表的数据构造 hash table。
Probe 阶段:对于 Outer 表的每一行数据, 先根据join key 查找 hash table,然后再根据Join 的其他谓词进行过滤,获取Join成功的行。

如何确定 Inner 表 和 Outer 表?

  • Left Outer Join:左表是 Outer 表,右表是 Inner 表

  • Right Outer Join:跟 Left Outer Join 相反,右表是 Outer 表,左表是 Inner 表

  • Inner Join:优化器估算出的较大表是 Outer 表,较小的表是 Inner 表

  • Semi Join、Anti Semi Join、Left Outer Semi Join 或 Anti Left Outer Semi Join:左表是 Outer 表,右表是 Inner 表。

NULL 值的问题:因为NULL 和 NULL 不等,所以:

  • 在用 Inner 表建 NULL 值的时候会忽略掉 Join Key 中有 NULL 的数据

  • 当 Outer 表中某行数据的 Join Key 中有 NULL 值的时候我们不会去查哈希表

  • 对于LEFT_OUTER_JOIN, hash table 不需要存储 Null, 但是对于RIGHT_OUTER_JOIN,hash table 需要存储 Null。

三.总结

本文介绍了 Doris 查询处理的整个流程。目前 Doris 的查询优化器和查询执行器都有很大的优化空间,我们在今年计划对查询优化器和查询执行器进行一次大的重构,预期会进一步提升 Doris 的查询性能。

本文内容由网友自发贡献,版权归原作者所有,本站不承担相应法律责任。如您发现有涉嫌抄袭侵权的内容,请联系:hwhale#tublm.com(使用前将#替换为@)

doris-查询原理 的相关文章

  • Doris集成Spark读写的简单示例

    Doris集成Spark读写的简单示例 文章目录 Doris集成Spark读写的简单示例0 写在前面1 Spark Doris Connector介绍2 基本示例2 1 提前准备表和数据2 2 新建项目2 3 使用SQL方式进行读写2 3
  • doris 常用操作收集

    1 bit map索引 原理 用户可以在建表时指定在某些列上创建Bitmap索引 也可以在运行时通过 ALTER TABLE TODO 命令新增Bitmap索引 Bitmap索引是一中特殊的数据库索引技术 其索引使用bit数组 或称bitm
  • Doris--基础--4.2--数据导入--Insert Into

    Doris 基础 4 2 数据导入 Insert Into 1 介绍 类似Mysql中的insert语句 方式 通过 insert into table select 的方式从Doris的表中读取数据并导入到另一张表中 通过 insert
  • doris前缀索引、doris bloom filter索引、doris bitmap索引原理及适应场景

    索引用于帮助快速过滤或查找数据 目前 Doris 主要支持两类索引 内建的智能索引 包括前缀索引和ZoneMap索引 用户创建的二级索引 包括Bloom Filter索引和Bitmap倒排索引 其中ZoneMap索引是在列存格式上 对每一列
  • Doris--基础--11--动态分区

    Doris 基础 11 动态分区 1 介绍 对表级别的分区实现生命周期管理 TTL 减少用户的使用负担 1 1 功能 动态添加分区 动态删除分区 1 2 原理 在某些使用场景下 用户会将表按照天进行分区划分数据 在没有动态分区功能的时候 用
  • 自建minio实现doris的快速备份与恢复

    一 概述 doris支持通过腾讯云bos 阿里云oss hdfs实现备份与恢复 但是我们公司doris部署在线下机房 如采用oss bos 大数据备份与恢复比较慢 会占用一定的带宽 如采用hdfs 担心小文件太多影响现有的hadoop集群
  • doris tips

    1 schema表格式字段长度 如果是数字 字母这种的长度等于hive sql里面 length variable 的长度 如果是中文要占3 4个Char 2 表增加分区 可以通过脚本自己构造多个sql 语句 类似 ALTER TABLE
  • Clickhouse表引擎-日志系列

    1 表引擎的介绍 Clickhouse的表引擎类似MySQL的表引擎 表引擎决定了如何存储表的数据 主要包含如下特性 数据的存储方式和位置 写到哪里以及从哪里读取数据 支持哪些查询以及如何支持 并发数据访问 索引的使用 如果存在 是否可以执
  • Doris数据模型

    目录 基本概念 Aggregate 模型 示例1 导入数据聚合 示例2 保留明细数据 示例3 导入数据与已有数据聚合 Unique 模型 读时合并 与聚合模型相同的实现方式 写时合并 Duplicate 模型 聚合模型的局限性 Unique
  • 什么是OLAP

    问题导读 1 为什么会出现OLAP应用 2 OLAP的度过了哪些发展历史 3 OLAP的基本内容有哪些 4 OLAP常见操作有哪些 OLAP Online AnalyticalProcessing 是一种数据处理技术 专门设计用于支持复杂的
  • doris-查询原理

    目录 一 查询简介 二 查询流程 1 Query 接收 2 Query Parse 3 Query Analyze 4 Query Rewrite 5 Plan 5 1 Query 单机Plan 5 2 Query 分布式Plan 6 Qu
  • 通过过滤度量值在 MDX 中定义计算成员

    我需要在 MDX 中定义一个计算成员 这是 SAS OLAP 但我很感谢使用不同 OLAP 实现的人员提供的答案 新度量的值应通过应用附加过滤条件根据现有度量计算得出 我想通过一个例子会更清楚 现有衡量标准 总流量 现有维度 方向 入 或
  • 使用VBA选择和取消选择多个切片器项目(OLAP数据)

    我正在编写一个仅选择所需切片器项目的脚本 我尝试使用 SlicerItems Selected True False用于选择和取消选择 但我使用的是 OLAP 数据源 在这种情况下 Selected是只读的 切片器项目的格式为 YYYYWW
  • 在报告中为 icCube 事件分配值

    我正在使用 icCube 5 0 报告 我想将事件的值分配给 icCube MDX 函数 UserName 返回的值 有点像 eventname UserName 最终 eventname reportParm 将被发送到启动的报告以在过滤
  • Microsoft SQL Server 分析服务 OLAP 多维数据集

    我试图找到一种工具来提高应用程序报告的性能 我听说了 OLAP Reporting Services 它被描述为完成这项工作的绝佳组合 无论如何 我没有找到使 OLAP 多维数据集保持最新的方法 因为原始数据库中的数据可能会更改 这是一个交
  • 如何计算 OLAP 多维数据集的可能大小

    有谁知道一种用于获取基于星型模式数据仓库的 OLAP 多维数据集的粗略大小的方法 基于维度的数量 维度表中的记录数量和事实记录的数量 最后是聚合或不同记录的数量等 我正在查看的数据库有一个超过 200 亿行的事实表和一些包含 2000 万
  • OLAP 处理时出错

    我是 OLAP 新手 并且弄清楚了如何制作立方体并处理它 然而 当我玩得太多时 我最终遇到了这个错误 OLAP存储引擎中的错误 找不到属性键 表 dbo v MYEntities 列 uniqueId 值 2548 OLAP 中的错误 存储
  • 基于不同 ID 的 SSAS 聚合

    我希望将不同 ID 值的默认聚合从 SUM 更改为 SUM 这是当前的行为 ID Amount 1 10 1 10 2 20 3 30 3 30 Sum Total 90 默认情况下 我会收到 90 美元 我希望对不同的 id 进行求和并得
  • SQL Express 上的 OLAP

    我想知道是否有任何桌面 OLAP 解决方案可以使用 SQL Express 因此不需要 Analysis Services 我的任务是找到一种方法让我们的客户能够制作 临时 报告 但其中绝大多数都是在 Sql Express 上 在以前的工
  • 在 LINUX 上使用 Python 连接到 OLAP 多维数据集

    我知道如何在 Windows 上使用 Python 连接到 MS OLAP 多维数据集 嗯 至少有一种方法 通常我使用 win32py 包并调用 COM 对象进行连接 import win32com client connection wi

随机推荐