Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Page properties


Document the state by adding a label to the FLIP page with one of "discussion", "accepted", "released", "rejected".

here (<- link to
Discussion threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)/thread/686bhgwrrb4xmbfzlk60szwxos4z64t7
Vote threadhere (<- link to https://lists.apache.org/list.html?dev@flink.apache.org)thread/x5tyg0sb7h3clyrv34t9rfrwn7kwy3t3
JIRAJIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-20767
)

Release1.19.180

Redo

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Flink redesigned the table source/sink interfaces as part ofFLIP-95 and also added optional interfaces to support projection, partitions, filter push down etc to the table source. Currently, it doesn’t support nested fields filter push down. For eg:

Code Block

...

 SELECT * FROM users WHERE user.age < 30

In the above example, the filter on the user.age is currently not pushed down to the table source assuming that the table source supports nested fields filters. The goal of this FLIP is to revise the SupportsFilterPushDown API to enable push down support for nested fields filters while maintaining backward compatibility with existing table sources.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

  • Introduce a new ResolvedExpression called NestedFieldReferenceExpression to support nested fields filter pushdown.

Proposed Changes

Introduce NestedFieldReferenceExpression to support nested fields filter pushdown

RexFieldAccess in Calcite is the RexNode for nested fields and a corresponding ResolvedExpression is not available in Flink. To support nested fields filter pushdown, introduce NestedFieldReferenceExpression as the resolved expression for nested fields. If a table source doesn't support NestedFieldReferenceExpression, it will be a no-op and NestedFieldReferenceExpression will be returned as remainingFilters. This should ensure backwards compatibility for existing table sources.

Code Block
languagejava
titleNestedFieldReferenceExpression.java
public final class NestedFieldReferenceExpression implements ResolvedExpression {

    private final String[] fieldNames;

    private final int[] fieldIndices;

    private final DataType dataType;

    public NestedFieldReferenceExpression(
            String[] fieldNames,
			int[] fieldIndices,
            DataType dataType) {
        this.fieldNames = fieldNames;
		this.fieldIndices = fieldIndices;
        this.dataType = dataType;
    }

    public String[] getFieldNames() {
        return fieldNames;
    }

    public int[] getFieldIndices() {
        return fieldIndices;
    }

    @Override
    public DataType getOutputDataType() {
        return dataType;
    }
}

Compatibility, Deprecation, and Migration Plan

This feature adds support for nested fields filter pushdown and is backward compatible with existing TableSources. For both new and existing TableSources, either they support nested fields filters or it will be a no-op if they don't support nested fields filter pushdown.

Test Plan

The proposed changes must be covered with UT, IT and E2E tests.

Rejected Alternatives

Change FieldReferenceExpression to support nested fields

  • Modify FieldReferenceExpression class to include nestedFieldIndex array that includes the index of the field at every level traversing from top-level to leaf-level column for the nested field.
  • Add a new API called supportsNestedFilters to SupportsFilterPushDown which will be defaulted to false for backwards compatibility.

The other option is preferred because it offers a more streamlined API and doesn't affect the existing table sources negativelyIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.