...
RexFieldAccess in Calcite is the RexNode for nested fields and a corresponding ResolvedExpression is not available in Flink. To support nested fields filter pushdown, introduce NestedFieldReferenceExpression as the resolved expression for nested fields. If a table source doesn't support NestedFieldReferenceExpression, it will be a no-op and NestedFieldReferenceExpression will be returned as remainingFilters. Ideally, this This should ensure backwards compatibility for existing table sources but it depends on the TableSource contract. Therefore, add a new API called supportsNestedFilters to SupportsFilterPushDown which will be defaulted to false for backwards compatibility and Flink applies nested filters only if the TableSource supports nested filters.
Code Block | ||||
---|---|---|---|---|
| ||||
public final class NestedFieldReferenceExpression implements ResolvedExpression { private final String[] nestedFieldNamefieldNames; private final DataType dataType; /** * index of an input the field belongs to. e.g. for a join, `inputIndex` of left input is 0 and * `inputIndex` of right input is 1. */ private final int inputIndex; /** * Nested field reference index to traverse from the top level column to the nested leaf column */ private final int[] nestedFieldIndexArray; public NestedFieldReferenceExpression( String[] nestedFieldNamefieldNames, DataType dataType, int inputIndex, int[] nestedFieldIndexArray) { this.nestedFieldNamefieldNames = nestedFieldNamefieldNames; this.dataType = dataType; this.inputIndex = inputIndex; this.nestedFieldIndexArray = nestedFieldIndexArray; } | ||||
Code Block | ||||
| ||||
public interface SupportsFilterPushDown {
/**
* Provides a list of filters in conjunctive form. A source can pick filters and return the
* accepted and remaining filters.
*
* <p>See the documentation of {@link SupportsFilterPushDown} for more information.
*/
Result applyFilters(List<ResolvedExpression> filters);
/** Returns whether this source supports nested fields filter pushdown */
default boolean supportsNestedFilters();
/**
* Result of a filter push down. It represents the communication of the source to the planner
* during optimization.
*/
@PublicEvolving
final class Result {
private final List<ResolvedExpression> acceptedFilters;
private final List<ResolvedExpression> remainingFilters;
public List<ResolvedExpression> getAcceptedFilters() {
return acceptedFilters;
}
public List<ResolvedExpression> getRemainingFilters() {
return remainingFilters;
}
}
|
Compatibility, Deprecation, and Migration Plan
...