Page properties | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
- Introduce a new ResolvedExpression called NestedFieldReferenceExpression to support nested fields filter pushdown.Add a new API called supportsNestedFilters to SupportsFilterPushDown.
Proposed Changes
Introduce NestedFieldReferenceExpression to support nested fields filter pushdown
RexFieldAccess in Calcite is the RexNode for nested fields and a corresponding ResolvedExpression is not available in Flink. To support nested fields filter pushdown, introduce NestedFieldReferenceExpression as the resolved expression for nested fields. If a table source doesn't support NestedFieldReferenceExpression, it will be a no-op and NestedFieldReferenceExpression will be returned as remainingFilters. Ideally, this This should ensure backwards compatibility for existing table sources but it depends on the TableSource contract.
Therefore, add a new API called supportsNestedFilters to SupportsFilterPushDown which will be defaulted to false for backwards compatibility and Flink applies nested filters only if the TableSource supports nested filters.
Code Block | ||||
---|---|---|---|---|
| ||||
public final class NestedFieldReferenceExpression implements ResolvedExpression { private final String[] nestedFieldNamefieldNames; private final DataType dataType; /** * index of an input the field belongs to. e.g. for a join, `inputIndex` of left input is 0 and * `inputIndex` of right input is 1. */ private final int inputIndexint[] fieldIndices; /** * Nested field reference index to traverse from the top level column to the nested leaf column */ private final int[] nestedFieldIndexArray private final DataType dataType; public NestedFieldReferenceExpression( String[] nestedFieldName, DataType dataType, int inputIndexfieldNames, int[] fieldIndices, int[]DataType nestedFieldIndexArraydataType) { this.nestedFieldNamefieldNames = nestedFieldNamefieldNames; this.dataTypefieldIndices = dataTypefieldIndices; this.inputIndexdataType = inputIndexdataType; this.nestedFieldIndexArray = nestedFieldIndexArray; } | ||||
Code Block | ||||
| ||||
public interface SupportsFilterPushDown { /** public * Provides a list of filters in conjunctive form. A source can pick filters and return the * accepted and remaining filters. * * <p>See the documentation of {@link SupportsFilterPushDown} for more information. */ Result applyFilters(List<ResolvedExpression> filters); /** Returns whether this source supports nested fields filter pushdown */ default boolean supportsNestedFilters(); /** * Result of a filter push down. It represents the communication of the source to the planner * during optimization. */ @PublicEvolving final class Result String[] getFieldNames() { private final List<ResolvedExpression> acceptedFiltersreturn fieldNames; private final List<ResolvedExpression> remainingFilters;} public List<ResolvedExpression>int[] getAcceptedFiltersgetFieldIndices() { return acceptedFiltersfieldIndices; } @Override public List<ResolvedExpression>DataType getRemainingFiltersgetOutputDataType() { return remainingFiltersdataType; } } |
Compatibility, Deprecation, and Migration Plan
...