Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)/thread/686bhgwrrb4xmbfzlk60szwxos4z64t7
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)thread/x5tyg0sb7h3clyrv34t9rfrwn7kwy3t3
JIRA

here (<- link to

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-20767
)

Release1.19.180

Redo

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

  • Introduce a new ResolvedExpression called NestedFieldReferenceExpression to support nested fields filter pushdown.
  • Add a new API called supportsNestedFilters to SupportsFilterPushDown.

Proposed Changes

Introduce NestedFieldReferenceExpression to support nested fields filter pushdown

RexFieldAccess in Calcite is the RexNode for nested fields and a corresponding ResolvedExpression is not available in Flink. To support nested fields filter pushdown, introduce NestedFieldReferenceExpression as the resolved expression for nested fields. If a table source doesn't support NestedFieldReferenceExpression, it will be a no-op and NestedFieldReferenceExpression will be returned as remainingFilters. Ideally, this This should ensure backwards compatibility for existing table sources but it depends on the TableSource contract. Therefore, add a new API called supportsNestedFilters to SupportsFilterPushDown which will be defaulted to false for backwards compatibility and Flink applies nested filters only if the TableSource supports nested filters.

Code Block
languagejava
titleNestedFieldReferenceExpression.java
public final class NestedFieldReferenceExpression implements ResolvedExpression {

    private final String[] nestedFieldName;

    private final DataType dataType;

    /**
     * index of an input the field belongs to. e.g. for a join, `inputIndex` of left input is 0 and
     * `inputIndex` of right input is 1.
     */
    fieldNames;

    private final int[] inputIndexfieldIndices;

     /**
     * Nested field reference index to traverse from the top level column to the nested leaf column
     */
    private final int[] nestedFieldIndexArray private final DataType dataType;

    public NestedFieldReferenceExpression(
            String[] nestedFieldName,
            DataType dataType,
            int inputIndexfieldNames,
			int[] fieldIndices,
            int[]DataType nestedFieldIndexArraydataType) {
        this.nestedFieldNamefieldNames = nestedFieldNamefieldNames;
        		this.dataTypefieldIndices = dataTypefieldIndices;
        this.inputIndexdataType = inputIndex;
        this.nestedFieldIndexArray = nestedFieldIndexArraydataType;
    }
Code Block
languagejava
titleSupportsFilterPushdown.java
public interface SupportsFilterPushDown {
    /**
    public * Provides a list of filters in conjunctive form. A source can pick filters and return the
     * accepted and remaining filters.
     *
     * <p>See the documentation of {@link SupportsFilterPushDown} for more information.
     */
    Result applyFilters(List<ResolvedExpression> filters);

    /** Returns whether this source supports nested fields filter pushdown */
    default boolean supportsNestedFilters();

    /**
     * Result of a filter push down. It represents the communication of the source to the planner
     * during optimization.
     */
    @PublicEvolving
    final class Result String[] getFieldNames() {
        private final List<ResolvedExpression> acceptedFiltersreturn fieldNames;
        private final List<ResolvedExpression> remainingFilters;}

        public List<ResolvedExpression>int[] getAcceptedFiltersgetFieldIndices() {
            return acceptedFiltersfieldIndices;
        }

    @Override
    public List<ResolvedExpression>DataType getRemainingFiltersgetOutputDataType() {
            return remainingFiltersdataType;
        }
    }

Compatibility, Deprecation, and Migration Plan

...