You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)
JIRA

here (<- link to Unable to render Jira issues macro, execution error. )

Release1.18

Redo

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink redesigned the table source/sink interfaces as part of FLIP-95 and also added optional interfaces to support projection, partitions, filter push down etc to the table source. Currently, it doesn’t support nested fields filter push down. For eg:

 SELECT * FROM users WHERE user.age < 30

In the above example, the filter on the user.age is currently not pushed down to the table source assuming that the table source supports nested fields filters. The goal of this FLIP is to revise the SupportsFilterPushDown API to enable push down support for nested fields filters while maintaining backward compatibility with existing table sources.

Public Interfaces

  • Introduce a new ResolvedExpression called NestedFieldReferenceExpression to support nested fields filter pushdown.
  • Add a new API called supportsNestedFilters to SupportsFilterPushDown.

Proposed Changes

Introduce NestedFieldReferenceExpression to support nested fields filter pushdown

RexFieldAccess in Calcite is the RexNode for nested fields and a corresponding ResolvedExpression is not available in Flink. To support nested fields filter pushdown, introduce NestedFieldReferenceExpression as the resolved expression for nested fields. If a table source doesn't support NestedFieldReferenceExpression, it will be a no-op and NestedFieldReferenceExpression will be returned as remainingFilters. Ideally, this should ensure backwards compatibility for existing table sources but it depends on the TableSource contract. 

Therefore, add a new API called supportsNestedFilters to SupportsFilterPushDown which will be defaulted to false for backwards compatibility and Flink applies nested filters only if the TableSource supports nested filters.

NestedFieldReferenceExpression.java
public final class NestedFieldReferenceExpression implements ResolvedExpression {

    private final String nestedFieldName;

    private final DataType dataType;

    /**
     * index of an input the field belongs to. e.g. for a join, `inputIndex` of left input is 0 and
     * `inputIndex` of right input is 1.
     */
    private final int inputIndex;

    /**
     * Nested field reference index to traverse from the top level column to the nested leaf column
     */
    private final int[] nestedFieldIndexArray;

    public NestedFieldReferenceExpression(
            String nestedFieldName,
            DataType dataType,
            int inputIndex,
            int[] nestedFieldIndexArray) {
        this.nestedFieldName = nestedFieldName;
        this.dataType = dataType;
        this.inputIndex = inputIndex;
        this.nestedFieldIndexArray = nestedFieldIndexArray;
    }


SupportsFilterPushdown.java
public interface SupportsFilterPushDown {
    /**
     * Provides a list of filters in conjunctive form. A source can pick filters and return the
     * accepted and remaining filters.
     *
     * <p>See the documentation of {@link SupportsFilterPushDown} for more information.
     */
    Result applyFilters(List<ResolvedExpression> filters);

    /** Returns whether this source supports nested fields filter pushdown */
    default boolean supportsNestedFilters();

    /**
     * Result of a filter push down. It represents the communication of the source to the planner
     * during optimization.
     */
    @PublicEvolving
    final class Result {
        private final List<ResolvedExpression> acceptedFilters;
        private final List<ResolvedExpression> remainingFilters;

        public List<ResolvedExpression> getAcceptedFilters() {
            return acceptedFilters;
        }

        public List<ResolvedExpression> getRemainingFilters() {
            return remainingFilters;
        }
    }

Compatibility, Deprecation, and Migration Plan

This feature adds support for nested fields filter pushdown and is backward compatible with existing TableSources. For both new and existing TableSources, either they support nested fields filters or it will be a no-op if they don't support nested fields filter pushdown.

Test Plan

The proposed changes must be covered with UT, IT and E2E tests.

Rejected Alternatives

Change FieldReferenceExpression to support nested fields

  • Modify FieldReferenceExpression class to include nestedFieldIndex array that includes the index of the field at every level traversing from top-level to leaf-level column for the nested field.
  • Add a new API called supportsNestedFilters to SupportsFilterPushDown which will be defaulted to false for backwards compatibility.

The other option is preferred because it offers a more streamlined API and doesn't affect the existing table sources negatively.

  • No labels