Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink Table/SQL does not expose fine-grained control for users to control filter control filter pushdown. However, filter pushdown has some may have side effects in some cases, such as additional computational pressure on external systems. Moreover, Improper queries can lead to issues such as full table scans, which in turn can impact the stability of external systems. The JDBC source is a typical example of that. If a filter is pushed down to the database, an expensive full table scan may happen if the filter involves unindexed columns.

Different with table.optimizer.source.predicate-pushdown-enabled

...

Support fine-grained configuration to control filter push down for each Table/SQL sources to let user decide how to perform filter pushdown.

Public Interfaces

...

Proposed Changes

...

  1. Introduces a new configuration
  1. filter.handling.policy

...

  1. to the JDBC and MongoDB connector.

...

  1. Suggests a convention option name if other connectors are going to add an option for the same purpose. 


Usually a prefix for filter.handling.policy is not required for the top level connector options. This is because the 'connector' option is already there.     

Code Block
languagesql
'connector' = 'jdbc',
'filter.handling.policy' = 'ALWAYS'


The prefix is needed when the option is for a 2nd+ level. 

Code Block
'connector' = 'filesystem',
'format' = 'orc',
'orc

...

.filter.handling.policy' = 'NUBERIC_ONY'

In this case, the values of this configuration may be different depending on the format option. For example, orc format may have INDEXED_ONLY while parquet format may have something else. 

Public Interfaces

Introduces a new ConfigOption to the JDBC and MongoDB connector.

Code Block
languagejava
    public static final ConfigOption<FilterHandlingPolicy> FILTER_HANDLING_POLICY =
            ConfigOptions.key("jdbc.filter.handling.policy")
                    .enumType(FilterHandlingPolicy.class)
                    .defaultValue(FilterHandlingPolicy.ALWAYS)
                    .withDescription("Filter handling policy that can be chosen.");

Introduce a native an enum to describe filter handling policy that can be chosen. We can also introduce other policies such as INDEX_ONLY and NUMBERIC_ONLY to provide users with more choices.

Code Block
languagejava
/** Filter handling policy that can be chosen. */
@PublicEvolving
public enum FilterHandlingPolicy implements DescribedEnum {

    ALWAYS(
            "always",
            text("Always push the supported filters to the external system")),

    NEVER(
            "never",
            text("Never push the supported filters to the external system"));

    private final String name;
    private final InlineElement description;

    FilterHandlingPolicy(String name, InlineElement description) {
        this.name = name;
        this.description = description;
    }

    @Override
    public InlineElement getDescription() {
        return description;
    }

    @Override
    public String toString() {
        return name;
    }
}

Proposed Changes

We plan to first introduce this native configuration option in jdbc and mongodb connector, and then gradually introduce it in other connectors as needed.

For sources that need to support filter handling policies, it is necessary to follow the same naming style prefix.filter.handling.policy.

...


Compatibility, Deprecation, and Migration Plan

...