...
Page properties | |||||||||
---|---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, Flink Table/SQL does not expose fine-grained control for users to control filter pushdown. However, filter pushdown may have side effects in some cases, such as additional computational pressure on external systems. The JDBC source is a typical example of that. If a filter is pushed down to the database, an expensive full table scan may happen if the filter involves unindexed columns.
Different with table.optimizer.source.predicate-pushdown-enabled
The existing configuration "table.optimizer.source.predicate-pushdown-enabled" was introduced by legacy FilterableTableSource interface which allows the optimizer to disable all predicate pushdown, but it cannot provide fine-grained control for each individual source.
...
Considering compatibility with older versions, the newly added option needs to respect the optimizer's "table.optimizer.source.predicate-pushdown-enabled" option. But maybe we can deprecate "table.optimizer.source.predicate-pushdown-enabled" option and drop it in Flink 2.0 if it is not necessary anymore.
Goals
Support fine-grained configuration to control filter push down for each Table/SQL sources to let user decide how to perform filter pushdown.
Proposed Changes
- Introduces a new configuration filter.handling.policy to the JDBC and MongoDB connector.
- Suggests a convention option name if other connectors are going to add an option for the same purpose.
Public Interfaces
Introduces a new ConfigOption to the JDBC and MongoDB connector.
...
Code Block | ||
---|---|---|
| ||
/** Filter handling policy that can be chosen. */ @PublicEvolving public enum FilterHandlingPolicy implements DescribedEnum { ALWAYS( "always", text("Always push the supported filters to the external system")), NEVER( "never", text("Never push the supported filters to the external system")); private final String name; private final InlineElement description; FilterHandlingPolicy(String name, InlineElement description) { this.name = name; this.description = description; } @Override public InlineElement getDescription() { return description; } @Override public String toString() { return name; } } |
Compatibility, Deprecation, and Migration Plan
In the implementation of the applyFilters method, source needs to follow the following pattern to respond to the configuration option.
Code Block | ||
---|---|---|
| ||
@Override public Result applyFilters(List<ResolvedExpression> filters) { switch (filterHandlingPolicy) { case NEVER: return Result.of(Collections.emptyList(), filters); case ALWAYS: default: return applyFiltersInternal(filters); } } |
Test Plan
The changes will be covered by UTs.
Rejected Alternatives
Reject Plan A
Add default method enableFilterPushDown in SupportsFilterPushDown interface.
...
The current interface can already satisfy the requirements. The connector can reject all the filters by returning the input filters as `Result#remainingFilters`.
Reject Plan B
Add a common configuration source.filter-push-down.enabled to disable filter push down for connectors.
...