Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion thread

https://lists.apache.org/thread/nvxx8sp9jm009yywm075hoffr632tm7j

Vote threadhttps://lists.apache.org/thread/c96lwcyyjm26w61tkrp8cgnlfob9v38gsl6gz2zk9tg44r69w732gsrqpt8jwk83
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-34214

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Release<Flink Version>


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink Table/SQL does not expose fine-grained control for users to control filter pushdown. However, filter pushdown may have side effects in some cases, such as additional computational pressure on external systems. The JDBC source is a typical example of that. If a filter is pushed down to the database, an expensive full table scan may happen if the filter involves unindexed columns.

Different with table.optimizer.source.predicate-pushdown-enabled

The existing configuration "table.optimizer.source.predicate-pushdown-enabled" was introduced by legacy FilterableTableSource interface which allows the optimizer to disable all predicate pushdown, but it cannot provide fine-grained control for each individual source.

...

Considering compatibility with older versions, the newly added option needs to respect the optimizer's "table.optimizer.source.predicate-pushdown-enabled" option.  But maybe we can deprecate "table.optimizer.source.predicate-pushdown-enabled" option and drop it in Flink 2.0 if it is not necessary anymore.

Goals

Support fine-grained configuration to control filter push down for each Table/SQL sources to let user decide how to perform filter pushdown.

Proposed Changes

  1. Introduces a new configuration filter.handling.policy to the JDBC and MongoDB connector.
  2. Suggests a convention option name if other connectors are going to add an option for the same purpose. 

Public Interfaces

Introduces a new ConfigOption to the JDBC and MongoDB connector.

...

Code Block
languagejava
/** Filter handling policy that can be chosen. */
@PublicEvolving
public enum FilterHandlingPolicy implements DescribedEnum {

    ALWAYS(
            "always",
            text("Always push the supported filters to the external system")),

    NEVER(
            "never",
            text("Never push the supported filters to the external system"));

    private final String name;
    private final InlineElement description;

    FilterHandlingPolicy(String name, InlineElement description) {
        this.name = name;
        this.description = description;
    }

    @Override
    public InlineElement getDescription() {
        return description;
    }

    @Override
    public String toString() {
        return name;
    }
}

Compatibility, Deprecation, and Migration Plan

In the implementation of the applyFilters method, source needs to follow the following pattern to respond to the configuration option.

Code Block
languagejava
    @Override
    public Result applyFilters(List<ResolvedExpression> filters) {
        switch (filterHandlingPolicy) {
            case NEVER:
                return Result.of(Collections.emptyList(), filters);
            case ALWAYS:
            default:
                return applyFiltersInternal(filters);
        }
    }

Test Plan

The changes will be covered by UTs.

Rejected Alternatives

Reject Plan A

Add default method enableFilterPushDown in SupportsFilterPushDown interface.

...

The current interface can already satisfy the requirements. The connector can reject all the filters by returning the input filters as `Result#remainingFilters`.

Reject Plan B

Add a common configuration source.filter-push-down.enabled to disable filter push down for connectors.

...