You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 19 Next »

Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, Flink Table/SQL does not expose fine-grained control for users to control filter pushdown. However, filter pushdown has some side effects, such as additional computational pressure on external systems. Moreover, Improper queries can lead to issues such as full table scans, which in turn can impact the stability of external systems.

Different with table.optimizer.source.predicate-pushdown-enabled

The existing configuration "table.optimizer.source.predicate-pushdown-enabled" was introduced by legacy FilterableTableSource interface which allows the optimizer to disable all predicate pushdown, but it cannot provide fine-grained control for each individual source.

Suppose we have an SQL query with two sources: Kafka and a database. The database is sensitive to pressure, and we want to configure it to not perform filter pushdown to the database source. However, we still want to perform filter pushdown to the Kafka source to decrease network IO.

Considering compatibility with older versions, the newly added option needs to respect the optimizer's "table.optimizer.source.predicate-pushdown-enabled" option.  But maybe we can deprecate "table.optimizer.source.predicate-pushdown-enabled" option and drop it in Flink 2.0 if it is not necessary anymore.

Goals

Support fine-grained configuration to control filter push down for each Table/SQL sources to let user decide how to perform filter pushdown.

Public Interfaces

Introduce the native configuration [prefix].filter.handling.policy in the connector.

For example, for the jdbc connector, the configuration option name should be jdbc.filter.handling.policy.

    public static final ConfigOption<FilterHandlingPolicy> FILTER_HANDLING_POLICY =
            ConfigOptions.key("jdbc.filter.handling.policy")
                    .enumType(FilterHandlingPolicy.class)
                    .defaultValue(FilterHandlingPolicy.ALWAYS)
                    .withDescription("Filter handling policy that can be chosen.");

Introduce a native enum to describe filter handling policy that can be chosen.

We can also introduce other policies such as INDEX_ONLY and NUMBERIC_ONLY to provide users with more choices.

/** Filter handling policy that can be chosen. */
@PublicEvolving
public enum FilterHandlingPolicy implements DescribedEnum {

    ALWAYS(
            "always",
            text("Always push the supported filters to the external system")),

    NEVER(
            "never",
            text("Never push the supported filters to the external system"));

    private final String name;
    private final InlineElement description;

    FilterHandlingPolicy(String name, InlineElement description) {
        this.name = name;
        this.description = description;
    }

    @Override
    public InlineElement getDescription() {
        return description;
    }

    @Override
    public String toString() {
        return name;
    }
}

Proposed Changes

We plan to first introduce this native configuration option in jdbc and mongodb connector, and then gradually introduce it in other connectors as needed.

For sources that need to support filter handling policies, it is necessary to follow the same naming style prefix.filter.handling.policy.

OptionTypeDefault value
jdbc.filter.handling.policyEnumALWAYS
mongodb.filter.handling.policyEnumALWAYS
........

Compatibility, Deprecation, and Migration Plan

In the implementation of the applyFilters method, source needs to follow the following pattern to respond to the configuration option.

    @Override
    public Result applyFilters(List<ResolvedExpression> filters) {
        switch (filterHandlingPolicy) {
            case NEVER:
                return Result.of(Collections.emptyList(), filters);
            case ALWAYS:
            default:
                return applyFiltersInternal(filters);
        }
    }

Test Plan

The changes will be covered by UTs.

Rejected Alternatives

Reject Plan A

Add default method enableFilterPushDown in SupportsFilterPushDown interface.

In FilterPushDownSpec, determine whether filter pushdown is allowed. If it is not allowed, the applyFilters logic will not be executed.

@PublicEvolving
public interface SupportsFilterPushDown {

    /**
     * Provides a list of filters in conjunctive form. A source can pick filters and return the
     * accepted and remaining filters.
     *
     * <p>See the documentation of {@link SupportsFilterPushDown} for more information.
     */
    Result applyFilters(List<ResolvedExpression> filters);


    /**
     * Returns whether filter push down is enabled.
     */
    default boolean enableFilterPushDown() {
        return true;
    }
}

Reject Reason

The current interface can already satisfy the requirements. The connector can reject all the filters by returning the input filters as `Result#remainingFilters`.

Reject Plan B

Add a common configuration source.filter-push-down.enabled to disable filter push down for connectors.


The user specifies option for sources through a new connector option:

OptionTypeDefault value
source.filter-push-down.enabledBooleantrue


For other  source abilities, we can also consider adding similar configuration options in the future to give users the choice. This is not urgent and can be continuously introduced.

OptionTypeDefault value
source.aggregate-push-down.enabledBooleantrue
...Booleantrue


Add a new source ability options in the TableSourceOptions class.

@PublicEvolving
public class TableSourceOptions {


    public static final ConfigOption<String> ENABLE_FILTER_PUSH_DOWN =
            key("source.filter-push-down.enabled")
                    .booleanType()
                    .defaultValue(true)
                    .withDescription("Whether to enable filter push down.");
    ......
}


Reject Reason

We don't have a common implementation for SupportsFilterPushdown itself. Why does a potential behavior of the SupportsFilterPushdown.applyFilters() method deserve a common configuration?  A common implementation should always come first, then its configuration becomes a common configuration as a natural result. There are multiple approaches to address the issue. Pushing back the filters is just one way of achieving this.




  • No labels