Page properties | ||||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Key | Type | Default Value | Description |
table.optimizer.runtime-filter.enabled | Boolean | false | A flag to enable or disable the runtime filter. |
table.optimizer.runtime-filter.max-build-data-size | MemorySize | 10 150 MB | Data volume threshold of the runtime filter build side. Estimated data volume needs to be under this value to try to inject runtime filter. |
table.optimizer.runtime-filter.min-probe-data-size | MemorySize | 10 GB | Data volume threshold of the runtime filter probe side. Estimated data volume needs to be over this value to try to inject runtime filter. |
table.optimizer.runtime-filter.min-filter-ratio | Double | 0.5 | Filter ratio threshold of the runtime filter. Estimated filter ratio needs to be over this value to try to inject runtime filter. |
...
In planner part, planner will try to inject runtime filter for eligible join during the optimization phase. It will add Projection(Calc), RuntimeFilterBuilder . We build the runtime filter in a two-stage manner: First, each subtask on the build side builds a local filter based on its local data, and sends the built filter to a global aggregation node. Then the global aggregation node aggregates the received filters into a global filter, and sends the global filter to all probe side subtasks. Therefore, we will add LocalRuntimeFilterBuilder, GlobalRuntimeFilterBuilder and RuntimeFilter into the physical plan (Figure 1 is an example).
In the runtime part, we need to add two provide operator implementations, we will add 3 new operators: LocalRuntimeFilterBuilderOperator, RuntimeFilterBuilderOperator GlobalRuntimeFilterBuilderOperator and RuntimeFilterOperator. When scheduling, the scheduler will schedule based on topological order. The first is the build side(with its chained LocalRuntimeFilterBuilderOperator), then the RuntimeFilterBuilderGlobalRuntimeFilterBuilderOperator, and then the RuntimeFilter(The built filter is local/global filters are sent via the data edge from RuntimeFilterBuilder to RuntimeFilter).
CREATE TABLE dim ( x INT, y INT, z BIGINT); CREATE TABLE fact ( a INT, b BIGINT, c INT, d VARCHAR, e BIGINT); SELECT * FROM fact, dim WHERE x = a AND z = 2; |
...
- The parser parses a given query and converts it to an AST (Abstract Syntax Tree) plan. The optimizer will detect the runtime filter pattern and try to inject runtime filter, and then the optimized physical plan will be converted to ExecNode plan. The ExecNode plan will be converted to StreamingGraph and then JobGraph.
- The client submits the generated JobGraph to the job manager. Then, the scheduler will schedule job vertices execution according to the topological order.
- Schedule the TableSource(dim) with its chained LocalRuntimeFilterBuilder first.
- After the TableSource(dim) is finished, the scheduler will schedule the RuntimeFilterBuilder GlobalRuntimeFilterBuilder to build generate the bloom global filter.
- After the RuntimeFilterBuilder GlobalRuntimeFilterBuilder is finished, the scheduler will schedule the TableSource(fact) with its chained RuntimeFilter, and use the built bloom received filter to filter the data.
- At last, the Join will be scheduled.
Planner
Supported Join types
...
- The estimated data volume of build side needs to be under the value of "table.optimizer.runtime-filter.max-build-data-size". If the data volume of build side is too large, the building overhead will be too large, which may lead to a negative impact on job performance.
- The estimated data volume of probe side needs to be over the value of "table.optimizer.runtime-filter.min-probe-data-size". If the data volume on the probe side is too small, the overhead of building runtime filter is not worth it.
- The filtering ratio needs to be over the value of “table.optimizer.runtime-filter.min-filter-ratio-threshold”ratio”. Same as 2, a low filter rate is not worth it to build runtime filter. The filter ratio can be estimated by the following formula:
...
Theoretically, the runtime filter can be pushed down along the probe side, as close to data sources as possible. But in the first version, for simplicity, we will only inject the runtime filter before the join, without pushdown, the runtime filter pushdown can be a future improvement.
Runtime
RuntimeFilterBuilderOperator
Supported Shuffle Types
The runtime filter can work well with all shuffle modes: pipeline shuffle, blocking shuffle, and hybrid shuffle.
LocalRuntimeFilterBuilderOperator
The LocalRuntimeFilterBuilderOperator RuntimeFilterBuilderOperator is responsible for building a bloom filter based on data from the build sidecurrent subtask's local data. When building the bloom filter, we should also compare "table“table.optimizer.runtime-filter.max-build-data-size" size” with the size of received data. Once this limit is exceeded, it will output a fake filter(which always returns true).
GlobalRuntimeFilterBuilderOperator
The GlobalRuntimeFilterBuilderOperator is responsible for aggregating all received bloom filters into a global bloom filter by OR operation. It will also output a fake filter if one of the received filters is fake. The parallelism of this operator will be 1, and the built global bloom filter will be broadcast to all related RuntimeFilterOperator.
...
In the first version, the runtime filter will be an optional optimization which the user has to activate explicitly by setting the config option table.optimizer.runtime-filter.enabled: true. This entails that Flink's default behavior won't change.
Future Improvements
More underlying filter implementations
In this version, the underlying implementation of the runtime filter is a bloom-filter. In the future, we can introduce more underlying implementations for further optimization. For example, when the input data volume on the build side is small enough, we can use an in-filter to reduce building overhead and avoid the false positive problem. We can also introduce a min-max filter so that the filter can be easily pushed down to the source to reduce the scan IO.
...
We need to give the number of expected records when creating a bloom filter. Currently, the number is is estimated at in the planning phase. However, a better solution would be to let the RuntimeFilterBuilder know the real number of records on the build side at execution phase, we may do it in the future.
Reuse the hash table for deduplication
When the join type is hash join, we can reuse the hash table built in the join operator to build the bloom filter. The keyset of the hash table gives us exact NDV counts and deduplicated keys, which helps avoid inserting records twice into the bloom filter. This idea comes from the discussion on the mailing list, you can check the mailing list for more details.
Use blocked bloom filters to improve cache efficiency
If we want to improve cache efficiency for the build of larger filters, we could structure them as blocked bloom filters, where the filter is separated into blocks and all bits of one key go only into one block. This idea comes from the discussion on the mailing list, you can check the mailing list for more details.
Test Plan
The proposed changes will be tested for correctness and performance through the TPC-DS benchmark suite in a real cluster.
...