...
Support fine-grained configuration to disable filter push down for each Table/SQL sources to let user decide whether to perform filter pushdown.
Public Interfaces
The user specifies option for sources through a new connector option:
...
For other source abilities, we can also consider adding similar configuration options in the future to give users the choice. This is not urgent and can be continuously introduced.Introduce a native configuration ignore.filter.pushdown for connectors. The configuration option in connectors should follow the same configuration name.
Option | Type | Default value |
---|---|---|
source.aggregate-push-down.enabledignore.filter.pushdown | Boolean | true |
... | Boolean | true |
Add a new source ability options in the TableSourceOptions class.
Code Block | ||
---|---|---|
| ||
@PublicEvolving
public class TableSourceOptions {
public static final ConfigOption<String> ENABLE_FILTER_PUSH_DOWN =
key("source.filter-push-down.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether to enable filter push down.");
......
} |
...
false |
Proposed Changes
Currently, there are not many external connectors that support filter pushdown.
We plan to first introduce this native configuration option in the flink-connector-jdbc and flink-connector-connector, and then gradually introduce it in other connectors as needed.
For sources that need to support disabling filter pushdown, it is necessary to check this configuration in the applyFilters method to determine whether to perform predicate pushdown.
...
Code Block | ||
---|---|---|
| ||
@Override public Result applyFilters(List<ResolvedExpression> filters) { if (config.get(ENABLEIGNORE_FILTER_PUSH_DOWNPUSHDOWN)) { return applyFiltersInternal(Result.of(Collections.emptyList(), filters); } else { return Result.of(Collections.emptyList(), applyFiltersInternal(filters); } } |
Test Plan
The changes will be covered by UTs.
Rejected Alternatives
Reject Plan A
Add default method enableFilterPushDown in SupportsFilterPushDown interface.
In FilterPushDownSpec, determine whether filter pushdown is allowed. If it is not allowed, the applyFilters logic will not be executed.
...
The current interface can already satisfy the requirements. The connector can reject all the filters by returning the input filters as `Result#remainingFilters`.
Reject Plan B
Add a common configuration source.filter-push-down.enabled to disable filter push down for connectors.
The user specifies option for sources through a new connector option:
Option | Type | Default value |
---|---|---|
source.filter-push-down.enabled | Boolean | true |
For other source abilities, we can also consider adding similar configuration options in the future to give users the choice. This is not urgent and can be continuously introduced.
Option | Type | Default value |
---|---|---|
source.aggregate-push-down.enabled | Boolean | true |
... | Boolean | true |
Add a new source ability options in the TableSourceOptions class.
Code Block | ||
---|---|---|
| ||
@PublicEvolving
public class TableSourceOptions {
public static final ConfigOption<String> ENABLE_FILTER_PUSH_DOWN =
key("source.filter-push-down.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether to enable filter push down.");
......
} |
Reject Reason
A common implementation should always come first, then its configuration becomes a common configuration as a natural result.