Discussion threadhttps://lists.apache.org/thread/686bhgwrrb4xmbfzlk60szwxos4z64t7
Vote threadhttps://lists.apache.org/thread/x5tyg0sb7h3clyrv34t9rfrwn7kwy3t3
JIRA

Unable to render Jira issues macro, execution error.

Release1.19.0

Redo

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Flink redesigned the table source/sink interfaces as part of FLIP-95 and also added optional interfaces to support projection, partitions, filter push down etc to the table source. Currently, it doesn’t support nested fields filter push down. For eg:

 SELECT * FROM users WHERE user.age < 30

In the above example, the filter on the user.age is currently not pushed down to the table source assuming that the table source supports nested fields filters. The goal of this FLIP is to revise the SupportsFilterPushDown API to enable push down support for nested fields filters while maintaining backward compatibility with existing table sources.

Public Interfaces

  • Introduce a new ResolvedExpression called NestedFieldReferenceExpression to support nested fields filter pushdown.

Proposed Changes

Introduce NestedFieldReferenceExpression to support nested fields filter pushdown

RexFieldAccess in Calcite is the RexNode for nested fields and a corresponding ResolvedExpression is not available in Flink. To support nested fields filter pushdown, introduce NestedFieldReferenceExpression as the resolved expression for nested fields. If a table source doesn't support NestedFieldReferenceExpression, it will be a no-op and NestedFieldReferenceExpression will be returned as remainingFilters. This should ensure backwards compatibility for existing table sources.

NestedFieldReferenceExpression.java
public final class NestedFieldReferenceExpression implements ResolvedExpression {

    private final String[] fieldNames;

    private final int[] fieldIndices;

    private final DataType dataType;

    public NestedFieldReferenceExpression(
            String[] fieldNames,
			int[] fieldIndices,
            DataType dataType) {
        this.fieldNames = fieldNames;
		this.fieldIndices = fieldIndices;
        this.dataType = dataType;
    }

    public String[] getFieldNames() {
        return fieldNames;
    }

    public int[] getFieldIndices() {
        return fieldIndices;
    }

    @Override
    public DataType getOutputDataType() {
        return dataType;
    }
}

Compatibility, Deprecation, and Migration Plan

This feature adds support for nested fields filter pushdown and is backward compatible with existing TableSources. For both new and existing TableSources, either they support nested fields filters or it will be a no-op if they don't support nested fields filter pushdown.

Test Plan

The proposed changes must be covered with UT, IT and E2E tests.

Rejected Alternatives

Change FieldReferenceExpression to support nested fields

  • Modify FieldReferenceExpression class to include nestedFieldIndex array that includes the index of the field at every level traversing from top-level to leaf-level column for the nested field.
  • Add a new API called supportsNestedFilters to SupportsFilterPushDown which will be defaulted to false for backwards compatibility.

The other option is preferred because it offers a more streamlined API and doesn't affect the existing table sources negatively.