Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

RexFieldAccess in Calcite is the RexNode for nested fields and a corresponding ResolvedExpression is not available in Flink. To support nested fields filter pushdown, introduce NestedFieldReferenceExpression as the resolved expression for nested fields. If a table source doesn't support NestedFieldReferenceExpression, it will be a no-op and NestedFieldReferenceExpression will be returned as remainingFilters. Ideally, this This should ensure backwards compatibility for existing table sources but it depends on the TableSource contract. Therefore, add a new API called supportsNestedFilters to SupportsFilterPushDown which will be defaulted to false for backwards compatibility and Flink applies nested filters only if the TableSource supports nested filters.

Code Block
languagejava
titleNestedFieldReferenceExpression.java
public final class NestedFieldReferenceExpression implements ResolvedExpression {

    private final String[] nestedFieldNamefieldNames;

    private final DataType dataType;

    /**
     * index of an input the field belongs to. e.g. for a join, `inputIndex` of left input is 0 and
     * `inputIndex` of right input is 1.
     */
    private final int inputIndex;

    /**
     * Nested field reference index to traverse from the top level column to the nested leaf column
     */
    private final int[] nestedFieldIndexArray;

    public NestedFieldReferenceExpression(
            String[] nestedFieldNamefieldNames,
            DataType dataType,
            int inputIndex,
            int[] nestedFieldIndexArray) {
        this.nestedFieldNamefieldNames = nestedFieldNamefieldNames;
        this.dataType = dataType;
        this.inputIndex = inputIndex;
        this.nestedFieldIndexArray = nestedFieldIndexArray;
    }
Code Block
languagejava
titleSupportsFilterPushdown.java
public interface SupportsFilterPushDown {
    /**
     * Provides a list of filters in conjunctive form. A source can pick filters and return the
     * accepted and remaining filters.
     *
     * <p>See the documentation of {@link SupportsFilterPushDown} for more information.
     */
    Result applyFilters(List<ResolvedExpression> filters);

    /** Returns whether this source supports nested fields filter pushdown */
    default boolean supportsNestedFilters();

    /**
     * Result of a filter push down. It represents the communication of the source to the planner
     * during optimization.
     */
    @PublicEvolving
    final class Result {
        private final List<ResolvedExpression> acceptedFilters;
        private final List<ResolvedExpression> remainingFilters;

        public List<ResolvedExpression> getAcceptedFilters() {
            return acceptedFilters;
        }

        public List<ResolvedExpression> getRemainingFilters() {
            return remainingFilters;
        }
    }

Compatibility, Deprecation, and Migration Plan

...