Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Doris查询原理 #82

Open
levy5307 opened this issue Mar 12, 2024 · 0 comments
Open

Doris查询原理 #82

levy5307 opened this issue Mar 12, 2024 · 0 comments

Comments

@levy5307
Copy link
Owner

https://levy5307.github.io/blog/doris-query/

同大多数数据库一样,Doris的查询主要分为查询接收、Parse(词法分析和语法分析)、Analyze(语义分析)、Rewrite(查询重写)、逻辑计划生成(单机执行计划)、分布式执行计划生成、Schedule、查询计划执行等步骤。其中查询计划执行由BE负责,其他均由FE负责。

查询接收

在Doris中,AcceptListener负责监听mysql连接。当一个新连接到来时AcceptListener会创建一个ConnectProcessor类型对象。ConnectProcessor周期性的获取该连接上的请求,针对不同的command执行不同的操作,包括use database操作(COM_INIT_DB)、查询操作(COM_QUERY)、quit(COM_QUIT)等

Parse

ConnectProcessor::handleQuery主要处理查询操作,随后使用Java CUP Parser对输入的字符串进行词法和语法分析。词法分析主要将输入的字符串解析成一系列token(例如select、from等),而语法分析则根据词法分析生成的token,生成一颗抽象语法树(Abstract Syntax Tree)。在Doris中抽象语法树用类StatementBase表示,StatementBase是一个虚类,它有多个不同的子类,例如SelectStmt、InsertStmt等,分别用于表示查询请求、写入请求等等。

其中,SelectStmt类中包含selectList、fromClause、groupByClause、havingClause等等,具体如下所示:

public class SelectStmt extends QueryStmt {
protected SelectList selectList;
private final ArrayList colLabels; // lower case column labels
protected final FromClause fromClause;
protected GroupByClause groupByClause;
private List originalExpr;
private Expr havingClause; // original having clause
protected Expr whereClause;
// havingClause with aliases and agg output resolved
private Expr havingPred;

// set if we have any kind of aggregation operation, include SELECT DISTINCT
private AggregateInfo aggInfo;
// set if we have analytic function
private AnalyticInfo analyticInfo;
// substitutes all exprs in this select block to reference base tables
// directly
private ExprSubstitutionMap baseTblSmap = new ExprSubstitutionMap();

private ValueList valueList;

// if we have grouping extensions like cube or rollup or grouping sets
private GroupingInfo groupingInfo;

// having clause which has been analyzed
// For example: select k1, sum(k2) a from t group by k1 having a>1;
// this parameter: sum(t.k2) > 1
private Expr havingClauseAfterAnaylzed;

// SQL string of this SelectStmt before inline-view expression substitution.
// Set in analyze().
protected String sqlString;

// Table alias generator used during query rewriting.
private TableAliasGenerator tableAliasGenerator = null;

// Members that need to be reset to origin
private SelectList originSelectList;

}

Analyze

当通过Parse阶段得到AST后,随后会根据parsedStmt创建一个StmtExecutor。该StmtExecutor首先为该查询分配一个16位的随机queryId,并通过analyzeVariablesInStmt函数获取select查询中的Optimizer Hints。

随后通过Analyzer来进行语义分析。注意这里的Analyzer类其实并不是语义解析器,正确的名字应该叫AnalyzerContext,其保存的是语义解析所需要的各种上下文及状态信息。真正的语义解析是在具体的StatementBase子类里的,最终通过多态来获取不同执行语句的不同语义分析实现。

对于SelectStmt,语义分析阶段所做的主要工作有:

检查cluster name是否为空,若是则抛出AnalysisException异常


对于含有order by但是order by字段为空的查询,将order by移除掉。


对于含有limit的查询:


  
    当offset大于0且不含有order by时报错。
  
  
    当limit = 0时,设置hasEmptyResultSet为true,该变量表示该查询结果一定返回空
  
  
    …
  



对于含有with的查询,依次analyze该查询中的所有的view(即子查询)


对于analyze from从句,依次analyze从句中的所有表(包括BaseTable、LateralView、LocalView)。对于BaseTable,如果该表没有指定database,则指定为默认database。


对于select list:


  
    如果是select *,则将*扩展成所有列
  
  
    判断select从句中是否包含子查询,如果是则抛出异常
  
  
    依次解析select list中的所有expr及其child expr(如果有)
  
  
    …
  



对于where从句:


  
    查看where从句中是否有group操作,如果有则抛出异常
  
  
    查看where从句中是否有聚合操作,如果有则抛出异常
  
  
    检查where从句是否返回bool类型或者null,否则抛异常
  
  
    依次解析where从句中的所有child expr(如果有)
  
  
    …

Rewrite

当执行完词法分析后,要根据具体的ExprRewriteRule进行查询重写,例如:

FoldConstantsRule,通过对expr求值并进行替换:1 + 1 + 1 –> 3,toupper('abc') –> 'ABC'、cast('2016-11-09' as timestamp) –> TIMESTAMP '2016-11-09 00:00:00'。


NormalizeBinaryPredicatesRule,规范化二进制谓词,使slot位于左侧,例如:5 > id –> id < 5


BetweenPredicates,将between谓词转换成conjunctive/disjunctive谓词,例如: BETWEEN X AND Y –> A >= X AND A <= Y、 NOT BETWEEN X AND Y –> A < X OR A > Y

另外对于存在子查询的查询,会依次进行from从句子查询重写、where从句子查询重写(rewriteWhereClauseSubqueries)以及have从句中子查询重写(rewriteHavingClauseSubqueries)。例如,对于have从句子查询的重写,会先将having子句用where进行等价重写,再将where子句等价重写(例如使用in重写)

当某些重写发生时,需要重新执行语义分析。

单节点执行计划

逻辑计划生成和物理计划生成都是由Planner来完成的。在Planner中,SingleNodePlanner通过AST生成单节点执行计划(PlanNode构成的算子树)。例如:

select A.a, B.b from A join B where A.a = B.b
生成的算子树如下:

在生成单节点执行计划时,主要做了以下优化:

Slot物化。slot物化是指标记出哪些列在表达式中涉及到,即哪些列需要被读取以及计算。


谓词下推。将谓词下推到Scan节点。


当没有开启排序落盘时(enable_spilling=true),将sort+limit修改成top n。


投影下推。保证在Scan时只读取必须的列。


Join reorder。对于 Inner Join, Doris 会根据行数调整表的顺序,将大表放在前面。


分区,分桶裁剪:根据过滤条件中的信息,确定需要扫描哪些分区,哪些桶的tablet。(在OlapScanNode::init时做分区裁剪,OlapScanNode::finalize做分桶裁剪)


MaterializedView选择:会根据查询需要的列,过滤,排序和Join的列,行数,列数等因素选择最佳的物化视图。

分布式计划生成

在Planner中,通过DistributedPlanner执行生成分布式执行计划。这里需要根据分布式环境,将单机的PlanNode树拆分成分布式PlanFragment树。该步骤的主要目标是最大化并行度和数据本地化。主要方法是将能够并行执行的节点拆分出去单独建立一个PlanFragment,用ExchangeNode代替被拆分出去的节点,用来接收数据。拆分出去的节点增加一个DataSinkNode,用来将计算之后的数据传送到ExchangeNode中,做进一步的处理。

这一步主要采用自底向上的方法来遍历整个PlanNode树(通过createPlanFragments函数递归实现)。如上述例子中:

对于ScanNode,则直接创建一个对应的PlanFragment。上述例子中一共有两个ScanNode,则创建两个PlanFragment。


对于HashJoinNode,需要决定Hash Join的分布式执行策略,即Shuffle Join,Broadcast Join,Colocate Join:


  
    如果使用colocate join,由于join操作都在本地,就不需要拆分。设置HashJoinNode与leftFragment共用一个PlanFragment,并删除掉rightFragment。
  
  
    如果使用bucket shuffle join,需要将右表的数据发送给左表。则和left child组成同一个PlanFragment(不单独创建新的,只是和上一条中为left child创建的PlanFragment组成同一个),在该PlanFragment中,对right child使用ExchangeNode来代替。同时对于right child,创建一个DataSinkNode,将数据sink到前述ExchangeNode。
  
  
    如果使用broadcast join,需要将右表的数据发送给左表。具体逻辑同上。
  
  
    如果使用hash partition join,左表和右边的数据都要切分,需要将左右节点都拆分出去,分别创建left ExchangeNode, right ExchangeNode,HashJoinNode指定左右节点为left ExchangeNode和right ExchangeNode。需要为HashJoinNode单独创建一个PlanFragment,指定RootPlanNode为这个HashJoinNode。最后指定leftFragment, rightFragment的数据发送目的地为left ExchangeNode, right ExchangeNode。
  



对于SelectNode,同样创建一个对应的PlanFragment

此三个PlanFragment则对应上图中的三个虚线框中的Fragment。

Schedule

这一步是根据分布式逻辑计划,创建分布式物理计划。主要解决以下问题:

哪个BE执行哪个PlanFragment


每个Tablet选择哪个副本去查询


如何进行多实例并发

Doris根据ScanNode所对应的表,经过分区分桶裁剪之后,可以得到需要访问的Tablet列表。对于Tablet列表,Doris会过滤掉unrecoverable的(该replica随后会被删除)、缺少version的、状态不正常的、以及过滤掉compaction过慢的副本。然后通过Round-Robin的方式在be节点中选择各个tablet的副本,以保证BE之间的负载均衡。

另外,需要处理实例的并发问题。当leftMostNode是ScanNode时,则需要根据fragment的并发度来设置并发(parallelExecNum)。假如需要scan 10个tablet,并行度设置为5的话,那么Scan所在的 PlanFragment,每个BE上可以生成5个执行实例,每个执行实例会分别Scan 2个tablet。当该节点不是ScanNode时,则其肯定是ExchangeNode,则需要根据exchangeInstanceParallel设置并发度。

FInstanceExecParam创建完成后,会分配一个唯一的instancdID,方便追踪信息。如果FragmentExecParams中包含有ExchangeNode,需要计算有多少senders,以便知道需要接受多少个发送方的数据。最后FragmentExecParams确定destinations,并把目的地址填充上去。

最后,获取第一个fragment,由执行该fragment的节点作为Receiver,接收查询结果。并将这些fragment发送给BE执行。

查询计划执行

查询计划由BE负责执行,其执行引擎采用Batch模式的Volcano模型,相对于Tuple模式的Volcano,执行效率更高。

be中的FragmentMgr提供了rpc接口,用于处理这些fragment(FragmentMgr::exec_plan_fragment),在接收到请求后,会先通过查询计划生成对应的算子树(PlanFragmentExecutor::_plan)并对所有算子执行init操作(ExecNode::create_tree)及prepare操作(_plan->prepare)。

随后,创建一个线程用户执行查询计划(FragmentMgr::_exec_actual)。该线程中,先对算子树执行open操作,然后通过PlanFragmentExecutor::get_next_internal驱动整个算子树的执行。该方法自顶向下调用每个算子的get_next方法。最终数据会从ScanNode节点产生,向上层节点传递,每个节点都会按照自己的逻辑处理RowBatch。PlanFragmentExecutor在拿到每个RowBatch后,如果是中间结果,就会将数据传输给其他BE节点,如果是最终结果,就会将数据传输给FE节点(通过DataSink::send多态实现)

参考文档

https://doris.apache.org/zh-CN/blog/principle-of-Doris-SQL-parsing/#23-%E9%80%BB%E8%BE%91%E8%AE%A1%E5%88%92

https://blog.bcmeng.com/post/apache-doris-query.html#doris-query-%E6%89%A7%E8%A1%8C

https://www.oomspot.com/post/apachedorischaxunyuanlijiexi

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

1 participant