Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Discussion threadhttps://lists.apache.org/thread/mm0o8fv7x7k13z11htt88zhy7lo8npmg
Vote threadhttps://lists.apache.org/thread/60c0obrgxrcxb7qv9pqywzxvtt5phnhy
JIRA

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32486

Release1.18.0

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadTBD
Vote threadTBD
JIRATBD
Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

We intend to introduce the following new configuration parameters.

Key

Type

Default Value

Description

table.optimizer.runtime-filter.enabled

Boolean

false

A flag to enable or disable the runtime filter.

table.optimizer.runtime-filter.max-build-data-size

MemorySize

10

150 MB

Data volume threshold of the runtime filter build side. Estimated data volume needs to be under this value to try to inject runtime filter.

table.optimizer.runtime-filter.min-probe-data-size

MemorySize

10 GB

Data volume threshold of the runtime filter probe side. Estimated data volume needs to be over this value to try to inject runtime filter.

table.optimizer.runtime-filter.min-filter-ratio

Double

0.5

Filter ratio threshold of the runtime filter. Estimated filter ratio needs to be over this value to try to inject runtime filter.

Proposed Changes

The Whole Workflow for Runtime Filter

...

In planner part, planner will try to inject runtime filter for eligible join during the optimization phase. It will add Projection(Calc), RuntimeFilterBuilder We build the runtime filter in a two-stage manner: First, each subtask on the build side builds a local filter based on its local data, and sends the built filter to a global aggregation node. Then the global aggregation node aggregates the received filters into a global filter, and sends the global filter to all probe side subtasks. Therefore, we will add LocalRuntimeFilterBuilder, GlobalRuntimeFilterBuilder and RuntimeFilter into the physical plan (Figure 1 is an example).

In the runtime part, we need to add two provide operator implementations, we will add 3 new operators:  LocalRuntimeFilterBuilderOperator, RuntimeFilterBuilderOperator GlobalRuntimeFilterBuilderOperator and RuntimeFilterOperator. When scheduling, the scheduler will schedule based on topological order. The first is the build side(with its chained LocalRuntimeFilterBuilderOperator), then the RuntimeFilterBuilderGlobalRuntimeFilterBuilderOperator, and then the RuntimeFilter(The built filter is local/global filters are sent via the data edge from RuntimeFilterBuilder to RuntimeFilter).


CREATE TABLE dim (

  x INT,

  y INT,

  z BIGINT);

CREATE TABLE fact (

  a INT,

  b BIGINT,

  c INT,

  d VARCHAR,

  e BIGINT);

SELECT * FROM fact, dim WHERE x = a AND z = 2;

Next, we use the above query(the dim is small table, and the fact is large table) as an example to show whole workflow of runtime filter(Figure 1 shows the planner changes, Figure 2 shows the whole work flow):

  1. The parser parses a given query and converts it to an AST (Abstract Syntax Tree) plan. The optimizer will detect the runtime filter pattern and try to inject runtime filter, and then the optimized physical plan will be converted to ExecNode plan. The ExecNode plan will be converted to StreamingGraph and then JobGraph.
  2. The client submits the generated JobGraph to the job manager. Then, the scheduler will schedule job vertices execution according to the topological order.
  3. Schedule the TableSource(dim) with its chained LocalRuntimeFilterBuilder first.
  4. After the TableSource(dim) is finished, the scheduler will schedule the RuntimeFilterBuilder GlobalRuntimeFilterBuilder to build generate the bloom global filter.
  5. After the RuntimeFilterBuilder GlobalRuntimeFilterBuilder is finished, the scheduler will schedule the TableSource(fact) with its chained RuntimeFilter, and use the built bloom received filter to filter the data.
  6. At last, the Join will be scheduled.

Image RemovedImage Added

Image RemovedImage Added

Planner

Supported Join types

...

  1. The estimated data volume of build side needs to be under the value of “table"table.optimizer.runtime-filter.max-build-threshold”data-size". If the data volume of build side is too large, the building overhead will be too large, which may lead to a negative impact on job performance.
  2. The estimated data volume of probe side needs to be over the value of “table"table.optimizer.runtime-filter.min-probe-data-threshold”size". If the data volume on the probe side is too small, the overhead of building runtime filter is not worth it.
  3. The filtering ratio needs to be over the value of “table.optimizer.runtime-filter.min-filter-ratio-threshold”ratio”. Same as 2, a low filter rate is not worth it to build runtime filter. The filter ratio can be estimated by the following formula:

...

Theoretically, the runtime filter can be pushed down along the probe side, as close to data sources as possible. But in the first version, for simplicity, we will only inject the runtime filter before the join, without pushdown, the runtime filter pushdown can be a future improvement.

Runtime

RuntimeFilterBuilderOperator

Supported Shuffle Types

The runtime filter can work well with all shuffle modes: pipeline shuffle, blocking shuffle, and hybrid shuffle.

LocalRuntimeFilterBuilderOperator

The  LocalRuntimeFilterBuilderOperator RuntimeFilterBuilderOperator is responsible for building a bloom filter based on data from the build sidecurrent subtask's local data. When building the bloom filter, we should also compare table“table.optimizer.runtime-filter.max-build-data-threshold size” with the size of received data. Once this limit is exceeded, it will output a fake filter(which always returns true)

GlobalRuntimeFilterBuilderOperator

The GlobalRuntimeFilterBuilderOperator is responsible for aggregating all received bloom filters into a global bloom filter by OR operation. It will also output a fake filter if one of the received filters is fake. The parallelism of this operator will be 1, and the built global bloom filter will be broadcast to all related RuntimeFilterOperator.

...

In the first version, the runtime filter will be an optional optimization which the user has to activate explicitly by setting the config option table.optimizer.runtime-filter.enabled: true. This entails that Flink's default behavior won't change.

Future Improvements

More underlying filter implementations

In this version, the underlying implementation of the runtime filter is a bloom-filter. In the future, we can introduce more underlying implementations for further optimization. For example, when the input data volume on the build side is small enough, we can use an in-filter to reduce building overhead and avoid the false positive problem. We can also introduce a min-max filter so that the filter can be easily pushed down to the source to reduce the scan IO.

...

We need to give the number of expected records when creating a bloom filter. Currently, the number is  is estimated at in the planning phase. However, a better solution would be to let the RuntimeFilterBuilder know the real number of records on the build side at execution phase, we may do it in the future.

Reuse the hash table for deduplication

When the join type is hash join, we can reuse the hash table built in the join operator to build the bloom filter. The keyset of the hash table gives us exact NDV counts and deduplicated keys, which helps avoid inserting records twice into the bloom filter. This idea comes from the discussion on the mailing list, you can check the mailing list for more details.

Use blocked bloom filters to improve cache efficiency

If we want to improve cache efficiency for the build of larger filters, we could structure them as blocked bloom filters, where the filter is separated into blocks and all bits of one key go only into one block. This idea comes  from the discussion on the mailing list, you can check the mailing list for more details.

Test Plan

The proposed changes will be tested for correctness and performance through the TPC-DS benchmark suite in a real cluster.

...