...
Page properties | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, Flink Table/SQL does not expose fine-grained control for users to control filter control filter pushdown. However, filter pushdown has some may have side effects in some cases, such as additional computational pressure on external systems. Moreover, Improper queries can lead to issues such as full table scans, which in turn can impact the stability of external systems. The JDBC source is a typical example of that. If a filter is pushed down to the database, an expensive full table scan may happen if the filter involves unindexed columns.
Different with table.optimizer.source.predicate-pushdown-enabled
The existing configuration "table.optimizer.source.predicate-pushdown-enabled" was introduced by legacy FilterableTableSource interface which allows the optimizer to disable all predicate pushdown, but it cannot provide fine-grained control for each individual source.
...
Considering compatibility with older versions, the newly added option needs to respect the optimizer's "table.optimizer.source.predicate-pushdown-enabled" option. But maybe we can deprecate "table.optimizer.source.predicate-pushdown-enabled" option and drop it in Flink 2.0 if it is not necessary anymore.
Goals
Support fine-grained configuration to control filter push down for each Table/SQL sources to let user decide how to perform filter pushdown.
Public Interfaces
...
Proposed Changes
...
- Introduces a new configuration
- filter.handling.policy
...
- to the JDBC and MongoDB connector.
- Suggests a convention option name if other connectors are going to add an option for the same purpose.
Public Interfaces
Introduces a new ConfigOption to the JDBC and MongoDB connectorFor example, for the jdbc connector, the configuration option name should be jdbc.filter.handling.policy.
Code Block | ||
---|---|---|
| ||
public static final ConfigOption<FilterHandlingPolicy> FILTER_HANDLING_POLICY =
ConfigOptions.key("jdbc.filter.handling.policy")
.enumType(FilterHandlingPolicy.class)
.defaultValue(FilterHandlingPolicy.ALWAYS)
.withDescription("Filter handling policy that can be chosen."); |
Introduce a native an enum to describe filter handling policy that can be chosen.
We can also introduce other policies such as INDEX_ONLY and NUMBERIC_ONLY to provide users with more choices.
Notice That: The policies that may only applicable to some but not all of the databases. We need to ensure fast failure in scenarios where the policy is not supported by the database and suggest users to choose a suitable policy.
Code Block | ||
---|---|---|
| ||
/** Filter handling policy that can be chosen. */ @PublicEvolving public enum FilterHandlingPolicy implements DescribedEnum { ALWAYS( "always", text("Always push the supported filters to the external system")), NEVER( "never", text("Never push the supported filters to the external system")); private final String name; private final InlineElement description; FilterHandlingPolicy(String name, InlineElement description) { this.name = name; this.description = description; } @Override public InlineElement getDescription() { return description; } @Override public String toString() { return name; } } |
Proposed Changes
We plan to first introduce this native configuration option in jdbc and mongodb connector, and then gradually introduce it in other connectors as needed.
For sources that need to support filter handling policies, it is necessary to follow the same naming style prefix.filter.handling.policy.
...
Compatibility, Deprecation, and Migration Plan
In the implementation of the applyFilters method, source needs to follow the following pattern to respond to the configuration option.
Code Block | ||
---|---|---|
| ||
@Override public Result applyFilters(List<ResolvedExpression> filters) { switch (filterHandlingPolicy) { case NEVER: return Result.of(Collections.emptyList(), filters); case ALWAYS: default: return applyFiltersInternal(filters); } } |
Test Plan
The changes will be covered by UTs.
Rejected Alternatives
Reject Plan A
Add default method enableFilterPushDown in SupportsFilterPushDown interface.
...
The current interface can already satisfy the requirements. The connector can reject all the filters by returning the input filters as `Result#remainingFilters`.
Reject Plan B
Add a common configuration source.filter-push-down.enabled to disable filter push down for connectors.
...