1 Motivation

The PipelineX execution engine is an experimental feature in Doris 2.1.0, expected to address the four major issues of the Doris pipeline engine:
1. In terms of execution concurrency, Doris is currently constrained by two factors: one is the parameters set by FE, and the other is limited by the number of buckets. This concurrent strategy prevents the execution engine from fully utilizing machine resources.
2. In terms of execution logic, Doris currently has some fixed additional overhead. For example, the common expression for all instances will be initialized multiple times due to independence between all instances.
3. In terms of scheduling logic, the scheduler of the current pipeline will put all blocking tasks into a blocking queue, and a blocking queue scheduler will be responsible for polling and extracting executable tasks from the blocking queue and placing them in the runnable queue. Therefore, during the query execution process, a CPU core will always be occupied to do scheduling instead of execution.
4. In terms of profile, currently the pipeline cannot provide users concise and clear metrics.

2 Goals

1. In terms of execution concurrency, pipelineX introduces local exchange optimization to fully utilize CPU resources, and distribute data evenly across different tasks to minimize data skewing. In addition, pipelineX will no longer be constrained by the number of tablets.
2. Logically, multiple pipeline tasks share all shared states of the same pipeline and eliminate additional initialization overhead, such as expressions and some const variables.
3. In terms of scheduling logic, the blocking conditions of all pipeline tasks are encapsulated using Dependency, and the execution logic of the tasks is triggered by external events (such as rpc completion) to enter the runnable queue, thereby eliminating the overhead of blocking polling threads.
4. Profile: Provide users with simple and easy to understand metrics.

3 Core Design

3.1 Abstraction

3.1.1 Inheritance Relationship



图1 operator继承关系

图2 LocalState继承关系

3.1.2 Combination



图3 核心类组合关系

3.2 Execution Design

3.2.1 Aggregate Query Execution

在2BE的集群上运行,执行模型如下:



图4 多BE执行模型(聚合查询)


核心改造:

  1. 执行线程(thread 1, thread 2)执行各自的pipeline task,而pipeline task仅持有一些运行时状态(即local state)。全局信息则由多个task共享的同一个pipeline对象持有(即global state)

  2. 数据分发在单个be上由local shuffle节点完成,由local shuffle来保证多个pipeline task之间的数据均衡。单be上执行模型如下:



图5 单BE执行模型(聚合查询)

3.2.2 Join Query Execution



图6 单BE执行模型(join查询)

3.2.3 Goals


引入local shuffle主要是为了解决单机的并发能力,

  1. 可以减少部分情况下的数据倾斜 (详细设计见文末参考文档1。)

  2. 执行并发度不再受存储层tablet数量的制约 (详细设计见文末参考文档4。)

  3. 可以在运行时进行动态并发

3.3 Execution Process


pipeline和pipelinex的执行流程对比如下:



图7 pipeline/pipelinex执行流程对比


prepare阶段,pipeline会并发启动多个线程去做不同instance的状态初始化,而由于pipelinex对共享的状态做了复用,也就是把pipeline执行流程中的第3步拆分成了pipelineX执行流程中的第3步和第5步,对比较重的global state只做一次,对更轻量的local state进行串行初始化。

3.3.1 Goals


执行流程的改造主要是为了降低了初始化的额外开销,

  1. pipeline会启动多个线程同时对多个instance进行初始化的开销

  2. 全局const变量初始化在多个instance中重复初始化的开销

3.4 PipelineX Scheduler


pipeline调度中,就绪task保存在就绪队列中等待调度,阻塞task保存在阻塞队列中等待满足执行条件。而在Doris pipeline当前额外需要一个CPU core去轮询阻塞队列,如果task满足执行条件则保存在就绪队列中。而在pipelineX中,阻塞条件都使用dependency进行了封装,task的阻塞/就绪全部依赖事件通知来解决(详细设计见文末参考文档2。)。例如rpc数据到达将会触发ExchangeSourceOperator满足执行条件进入就绪队列。



图8 pipeline/pipelinex调度模型对比

3.4.1 Goals


消除了轮询线程的额外开销。

3.5 PipelineX Profile


对于operator的profile,pipelineX做了整理,包括删除不合理的metrics并且添加了必要的metrics。除此之外,得益于调度模型改造,pipelineX中所有阻塞都被dependency封装,所以我们将所有dependency的就绪时间加入profile,通过wait for dependency时间我们可以直观看出每个地方的时间开销。
举几个例子。
Scan operator:

OLAP_SCAN_OPERATOR (id=504):(Active: 17.606ms, % non-child: 0.00%)

    - WaitForDependencyTime: 0ns

        - WaitForData: 14.311ms

        - WaitForEos: 0ns

        - WaitForScannerDone: 0ns

    - WaitForPendingFinishDependency: 16.748ms

其中,OLAP_SCAN_OPERATOR的active总时间是17.606ms(包括了等待scanner读数据的时间和执行的时间),其中因为等待scanner扫描数据阻塞了14.311ms,而OLAP_SCAN_OPERATOR因为PendingFinish阻塞了16.748ms(不包含在active时间中)
Exchange source operator:

EXCHANGE_OPERATOR (id=517):(Active: 575.846us, % non-child: 0.00%)

    - WaitForDependencyTime:

        - WaitForData: 56.122ms

    - WaitForPendingFinishDependency: 0ns

EXCHANGE_OPERATOR的Active时间为575.846us,等待上游数据的时间为56.122ms

4 User Interface


Add 3 SessionVariable knobs.
enable_pipeline_x_engine

enable_local_shuffle

ignore_storage_data_distribution
Ignore storage data distribution or not. If turn on, execution concurrency will not be restricted by tablet num. Please refer to Further Reading 4 for more details.

A new http api:http://{host}:{web_server_port}/api/running_pipeline_tasks
Please refer to Further Reading 3 for more details.

Further Reading

  1. LocalExchanger Rules

  2. PipelineX Dependency Design
  3. pipelineX Debug Manual
  4. PipelineX Parallel Execution Design
  • No labels