You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 4 Next »


Discussion threadTBD
Vote threadTBD
JIRATBD
Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In data analysis scenarios, join is a very common operation. Users can efficiently obtain information by joining two or more tables. However, join is a resource-intensive and time-intensive operation, especially when the input data volume is huge. The execution of join will involve scanning of the underlying table, shuffling, and combining of tables. If we can reduce the data arriving join as much as possible, we can improve the query performance on the one hand, and reduce resource consumption on the other hand (network/io/cpu, etc.), which means we can support more queries with the same resources.

Runtime filter is a common optimization to improve join performance. It is designed to dynamically generate filter conditions for certain Join queries at runtime to reduce the amount of scanned or shuffled data, avoid unnecessary I/O and network transmission, and speed up the query. Its working principle is building a filter(e.g. bloom filter) based on the data on the small table side(build side) first, then pass this filter to the large table side(probe side) to filter the irrelevant data on it, this can reduce the data reaching the join and improve performance. 

In this FLIP, we propose to introduce runtime filter for Flink batch jobs.

Public Interfaces

We intend to introduce the following new configuration parameters.

Key

Type

Default Value

Description

table.optimizer.runtime-filter.enabled

Boolean

false

A flag to enable or disable the runtime filter.

table.optimizer.runtime-filter.max-build-data-size

MemorySize

10 MB

Data volume threshold of the runtime filter build side. Estimated data volume needs to be under this value to try to inject runtime filter.

table.optimizer.runtime-filter.min-probe-data-size

MemorySize

10 GB

Data volume threshold of the runtime filter probe side. Estimated data volume needs to be over this value to try to inject runtime filter.

table.optimizer.runtime-filter.min-filter-ratio

Double

0.5

Filter ratio threshold of the runtime filter. Estimated filter ratio needs to be over this value to try to inject runtime filter.

Proposed Changes

The Whole Workflow for Runtime Filter

The changes of the runtime filter mainly involve two parts: the planner part and the runtime part.

In planner part, planner will try to inject runtime filter for eligible join during the optimization phase. It will add Projection(Calc), RuntimeFilterBuilder and RuntimeFilter into the physical plan (Figure 1 is an example).

In runtime part, we need to add two new operators, RuntimeFilterBuilderOperator and RuntimeFilterOperator. When scheduling, the scheduler will schedule based on topological order. The first is the build side, then the RuntimeFilterBuilder, and then the RuntimeFilter(The built filter is sent via the data edge from RuntimeFilterBuilder to RuntimeFilter).


CREATE TABLE dim (

  x INT,

  y INT,

  z BIGINT);

CREATE TABLE fact (

  a INT,

  b BIGINT,

  c INT,

  d VARCHAR,

  e BIGINT);

SELECT * FROM fact, dim WHERE x = a AND z = 2;

Next, we use the above query(the dim is small table, and the fact is large table) as an example to show whole workflow of runtime filter(Figure 1 shows the planner changes, Figure 2 shows the whole work flow):

  1. The parser parses a given query and converts it to an AST (Abstract Syntax Tree) plan. The optimizer will detect the runtime filter pattern and try to inject runtime filter, and then the optimized physical plan will be converted to ExecNode plan. The ExecNode plan will be converted to StreamingGraph and then JobGraph.
  2. The client submits the generated JobGraph to the job manager. Then, the scheduler will schedule job vertices execution according to the topological order.
  3. Schedule the TableSource(dim) first.
  4. After the TableSource(dim) is finished, the scheduler will schedule the RuntimeFilterBuilder to build the bloom filter.
  5. After the RuntimeFilterBuilder is finished, the scheduler will schedule the TableSource(fact) with its chained RuntimeFilter, and use the built bloom filter to filter the data.
  6. At last, the Join will be scheduled.



Planner

Supported Join types

Flink supports six join types which are inner、left outer、right outer、full outer、left semi and left anti join. Runtime Filter can only supports the following 4 join cases:

  1. inner join
  2. semi join
  3. left join + left is build
  4. right join + right is build 

Regarding the above join types, currently, there are 3 join implementation strategies: HashJoin, SortMergeJoin and NestedLoopJoin. Since NestedLoopJoin is usually used only when the amount of data is small or without equal join condition, runtime filter will only support HashJoin and SortMergeJoin.

Conditions of Injecting Runtime Filter

We will inject the runtime filters only if the following requirements are met:

  1. The estimated data volume of build side needs to be under the value of "table.optimizer.runtime-filter.max-build-data-size". If the data volume of build side is too large, the building overhead will be too large, which may lead to a negative impact on job performance.
  2. The estimated data volume of probe side needs to be over the value of "table.optimizer.runtime-filter.min-probe-data-size". If the data volume on the probe side is too small, the overhead of building runtime filter is not worth it.
  3. The filtering ratio needs to be over the value of “table.optimizer.runtime-filter.min-filter-ratio”. Same as 2, a low filter rate is not worth it to build runtime filter. The filter ratio can be estimated by the following formula:

     

in which the buildNdv is the number of distinct values of build side data, the probeNdv is the number of distinct values of probe side data. If the ndv cannot be estimated, use row count instead.

Placement of Runtime Filter

Theoretically, the runtime filter can be pushed down along the probe side, as close to data sources as possible. But in the first version, for simplicity, we will only inject the runtime filter before the join, without pushdown, the runtime filter pushdown can be a future improvement.

Runtime

RuntimeFilterBuilderOperator

RuntimeFilterBuilderOperator is responsible for building a bloom filter based on data from the build side. When building the bloom filter, we should also compare "table.optimizer.runtime-filter.max-build-data-size" with the size of received data. Once this limit is exceeded, it will output a fake filter(which always returns true). The parallelism of this operator will be 1, and the built bloom filter will be broadcast to all related RuntimeFilterOperator.

RuntimeFilterOperator

RuntimeFilterOperator is responsible for filtering the data of the probe side using the received bloom filter, which should normally be chained with the probe side.

Compatibility, Deprecation, and Migration Plan

In the first version, the runtime filter will be an optional optimization which the user has to activate explicitly by setting the config option table.optimizer.runtime-filter.enabled: true. This entails that Flink's default behavior won't change.

Future Improvements

More underlying implementations

In this version, the underlying implementation of the runtime filter is a bloom-filter. In the future, we can introduce more underlying implementations for further optimization. For example, when the input data volume on the build side is small enough, we can use in-filter to reduce building overhead and avoid the false positive problem. We can also introduce min-max filter so that the filter can be easily pushed down to the source to reduce the scan IO.

Runtime Filter PushDown

Like normal filters, runtime filters can be further pushed down along the probe side, as close to data sources as possible. As mentioned above, we can even push the runtime filters to the source to reduce scan IO if it is supported.

Use the real execution information

We need to give the number of expected records when creating a bloom filter. Currently, the number is  estimated at the planning phase. However, a better solution would be to let the RuntimeFilterBuilder know the real number of records on the build side at execution phase, we may do it in the future.

Test Plan

The proposed changes will be tested for correctness and performance through the TPC-DS benchmark suite in a real cluster.


  • No labels