Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently, Flink Table/SQL does not expose fine-grained control for users to enable or disable filter control filter pushdown. However, filter pushdown has some side effects, such as additional computational pressure on external systems. Moreover, Improper queries can lead to issues such as full table scans, which in turn can impact the stability of external systems.

...

Support fine-grained configuration to disable control filter push down for each Table/SQL sources to let user decide whether how to perform filter pushdown.

Public Interfaces

Introduce a the native configuration ignore[prefix].filter.pushdown for connectors. The configuration option in connectors should follow the same configuration name.

...

Proposed Changes

...

handling.policy in the connector.

For example, for the jdbc connector, the configuration option name should be jdbc.filter.handling.policy.

Code Block
languagejava
    public static final ConfigOption<FilterHandlingPolicy> FILTER_HANDLING_POLICY =
            ConfigOptions.key("jdbc.filter.handling.policy")
                    .enumType(FilterHandlingPolicy.class)
                    .defaultValue(FilterHandlingPolicy.ALWAYS)
                    .withDescription("Filter handling policy that can be chosen.");

Introduce a native enum to describe filter handling policy that can be chosen.

We can also introduce other policies such as INDEX_ONLY and NUMBERIC_ONLY to provide users with more choices.

Code Block
languagejava
/** Filter handling policy that can be chosen. */
@PublicEvolving
public enum FilterHandlingPolicy implements DescribedEnum {

    ALWAYS(
            "always",
            text("Always push the supported filters to the external system")),

    NEVER(
            "never",
            text("Never push the supported filters to the external system"));

    private final String name;
    private final InlineElement description;

    FilterHandlingPolicy(String name, InlineElement description) {
        this.name = name;
        this.description = description;
    }

    @Override
    public InlineElement getDescription() {
        return description;
    }

    @Override
    public String toString() {
        return name;
    }
}

Proposed Changes

We plan to first introduce this native configuration option in the flink-connector- jdbc and flink-connector-mongodb connector, and then gradually introduce it in other connectors as needed.

For sources that need to support disabling filter pushdownhandling policies, it is necessary to check this configuration in the applyFilters method to determine whether to perform predicate pushdownfollow the same naming style prefix.filter.handling.policy.

OptionTypeDefault value
jdbc.filter.handling.policyEnumALWAYS
mongodb.filter.handling.policyEnumALWAYS
........

Compatibility, Deprecation, and Migration Plan

In the implementation of the applyFilters method, source needs to follow the following pattern to respond to the configuration option.

Code Block
languagejava
    @Override
    public Result applyFilters(List<ResolvedExpression> filters) {
        ifswitch (config.get(IGNORE_FILTER_PUSHDOWN)filterHandlingPolicy) {         

            case NEVER:
                return Result.of(Collections.emptyList(), filters);
            case ALWAYS:
    } else {
            default:
                return applyFiltersInternal(filters);
  
      }
    }

Test Plan

The changes will be covered by UTs.

...