Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Support fine-grained configuration to disable filter push down for each Table/SQL sources to let user decide whether to perform filter pushdown.

Public Interfaces

The user specifies option for sources through a new connector option:

...

For other  source abilities, we can also consider adding similar configuration options in the future to give users the choice. This is not urgent and can be continuously introduced.Introduce a native configuration ignore.filter.pushdown for connectors. The configuration option in connectors should follow the same configuration name.

OptionTypeDefault value
source.aggregate-push-down.enabledignore.filter.pushdownBooleantrue
...Booleantrue

Add a new source ability options in the TableSourceOptions class.

Code Block
languagejava
@PublicEvolving
public class TableSourceOptions {


    public static final ConfigOption<String> ENABLE_FILTER_PUSH_DOWN =
            key("source.filter-push-down.enabled")
                    .booleanType()
                    .defaultValue(true)
                    .withDescription("Whether to enable filter push down.");
    ......
}

...

false

Proposed Changes

Currently, there are not many external connectors that support filter pushdown.

We plan to first introduce this native configuration option in the flink-connector-jdbc and flink-connector-connector, and then gradually introduce it in other connectors as needed.

For sources that need to support disabling filter pushdown, it is necessary to check this configuration in the applyFilters method to determine whether to perform predicate pushdown.

...

Code Block
languagejava
@Override
public Result applyFilters(List<ResolvedExpression> filters) {
    if (config.get(ENABLEIGNORE_FILTER_PUSH_DOWNPUSHDOWN)) {         
        return applyFiltersInternal(Result.of(Collections.emptyList(), filters);
      } else {
        return Result.of(Collections.emptyList(), applyFiltersInternal(filters);  
    }
}

Test Plan

The changes will be covered by UTs.

Rejected Alternatives

Reject Plan A

Add default method enableFilterPushDown in SupportsFilterPushDown interface.

In FilterPushDownSpec, determine whether filter pushdown is allowed. If it is not allowed, the applyFilters logic will not be executed.

...

The current interface can already satisfy the requirements. The connector can reject all the filters by returning the input filters as `Result#remainingFilters`.

Reject Plan B

Add a common configuration source.filter-push-down.enabled to disable filter push down for connectors.


The user specifies option for sources through a new connector option:

OptionTypeDefault value
source.filter-push-down.enabledBooleantrue


For other  source abilities, we can also consider adding similar configuration options in the future to give users the choice. This is not urgent and can be continuously introduced.

OptionTypeDefault value
source.aggregate-push-down.enabledBooleantrue
...Booleantrue


Add a new source ability options in the TableSourceOptions class.

Code Block
languagejava
@PublicEvolving
public class TableSourceOptions {


    public static final ConfigOption<String> ENABLE_FILTER_PUSH_DOWN =
            key("source.filter-push-down.enabled")
                    .booleanType()
                    .defaultValue(true)
                    .withDescription("Whether to enable filter push down.");
    ......
}


Reject Reason

A common implementation should always come first, then its configuration becomes a common configuration as a natural result.