Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Status

...

Page properties


Discussion thread

...

...

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-11439

...

Release1.14


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

The module will not contain a Calcite dependency. This means that LogicalNode and Expression classes won’t have RelNode and RexNode anymore. LogicalNode and Expression become the output of the he Table API and need to be converted into Calcite optimizer RelNode and RexNodes nodes by the planner.

...

The SQL Client logically belongs to `flink-table` and should be moved under this module.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

User Code Examples

→ User wants to write a table program in the table ecosystem using Java.

Instead of passing a certain execution environment, the user is required to specify the desired execution mode. Under the hood the table environment tries to discover exactly one planner that can satisfy this requirement, otherwise an exception is thrown. The planner takes care of instantiating an execution environment, configuring it, and trigger an execution of a job if requested to do so.

import org.apache.flink.table.api.TableEnvironment;

public void main(String[] args) {
    TableConfig config = TableConfig.builder()
        .asStreamingExecution()
        // example of providing configuration that was in StreamExecutionEnvironment before
        .watermarkInterval(100)   
        .build();

    TableEnvironment tEnv = TableEnvironment.create(config);

    tEnv.scan("MyTable").select("*").insertInto("MyOtherTable");

    tEnv.execute();
}


Internal Steps:


  1. Discover StreamPlanner

  2. StreamPlanner configures itself with the given TableConfig (e.g. watermark interval)

  3. Table environment calls are forwarded to the planner

For users that want to try out experimental planner implementations. We can add more methods to the config:


TableConfig config = TableConfig.builder()
        .asUnifiedExecution() // experimental!
        .watermarkInterval(100)
        .build();


→ User wants to write a table program + translate to Java DataStream API.


Compared to the previous example, a user can use a table environment that is specific for the current execution environment. Any configuration can be applied directly on the execution environment. Under the hood the table environment wraps the execution environment into a specific TableConfig that a matching planner can use.

import org.apache.flink.table.api.java.StreamTableEnvironment;

public void main(String[] args) {
    StreamExecutionEnvironment exec = StreamExecutionEnvironment.createRemoteEnvironment(...);
    exec.getConfig().setAutoWatermarkInterval(...);

    StreamTableEnvironment tEnv = StreamTableEnvironment.create(exec);

    DataStream<?> ds = tEnv.fromAppendStream(...).select("*").toAppendStream(Row.class);

    exec.execute(); // or tEnv.execute();
}

Internal Steps:

  1. Wrap execution environment in a specific TableConfig (e.g. StreamTableConfig)

  2. Discover StreamPlanner

  3. StreamPlanner configures itself with the given execution environment wrapped in the config

  4. Table environment calls are forwarded to the planner

→ User wants to write a table program in the table ecosystem using Java in a unified runtime.

Once we unified the runtime, we can make passing a TableConfig instance optional. It will still be needed to specify things like watermark interval etc. but not the execution mode anymore.

import org.apache.flink.table.api.TableEnvironment;

public void main(String[] args) {

    TableEnvironment tEnv = TableEnvironment.create();

    tEnv.scan("MyTable").select("*").insertInto("MyOtherTable");

    tEnv.execute();
}

Implementation Details

The following code shows how a planner interface could look like. It abstracts the main functionality that is currently implemented in different table environments.

The planner interface is the bridge between base API and planner module. We can add and remove methods whenever we notice that Flink and Blink interfaces differ. This code snippet is just an example:

interface Planner<TABLEENV, API> {

    // check if the given table config is supported by this planner
    boolean matches(TableConfig config);

    void initialize(TableConfig config, CatalogManager manager);

    TABLEENV createTableEnvironment();

    // for sqlUpdate()
    void translateSqlUpdate(String statement, QueryConfig queryConfig);

    // for insertInto()
    void translateInsertInto(Table apiPlan, String targetSink, QueryConfig queryConfig);

    // for sqlQuery()
    Table translateSqlQuery(String query);

    // for explain()
    String explain(Table logicalPlan);

    // for getCompletionHints()
    String[] completeSqlStatement(String statement, int pos);

    // for registerTableSource()
    // checks if a StreamTableSource or BatchTableSource can be used for this planner
    boolean isCompatibleTableSource(TableSource source);

    // for registerTableSink()
    boolean isCompatibleTableSink(TableSink sink);

    // for internal fromDataStream() and fromDataSet()
    // implementers must prepare for internal type (i.e. CRow)
    // this interface is meant only for bridging environments, a unified table environment 
    // would not use this.
    Table fromBridgingApi(UpdateMode updateMode, API bridgingApi, TableSchema schema);

    // for internal toAppendStream() and toRetractStream()
    // implementers must wrap the internal type (i.e. CRow) into Java/Scala API
    // this interface is meant only for bridging environments, a unified table environment 
    // would not use this.
    API toBridgingApi(UpdateMode updateMode, Table logicalPlan)

    void execute(Optional<String> jobName);
}

class StreamPlanner extends Planner<DataStream<CRow>> {
	private StreamExecutionEnvironment env;
	// ...
}

class BatchPlanner extends Planner<DataSet<Row>> {
	private ExecutionEnvironment env;
	// ...
}

class BlinkPlanner extends Planner<StreamTransform<???>> {
	// ...
}


Public Interfaces

Public interfaces remain mostly the same. Some exceptions are mentioned in the implementation plan.

Implementation Plan

The previous sections have shown a vision of how the Table & SQL API could look like. This section names concrete implementation steps that can be converted into JIRA issues.

The order of this list represents the implementation order unless marked with [parallelizable] which means it can be implemented at the same time as the previous task.

Module Split

  1. [Flink 1.8] Perform module split in Flink

    1. In order to not force users to update their dependencies again. An example PoC split without breaking compatibility can be found here.

    2. Classes are not changed but only moved to flink-table-planner.

  2. [Blink] Match against new module structure

Minor API Changes

  1. [Flink 1.8] Deprecate static methods in TableEnvironments

    1. Direct to the `Batch/StreamTableEnvionrment.create()` approach. The `create()` method must not necessarily already perform a planner discovery. We can hard-code the target table environment for now.

    2. `TableEnvironment.create()` is not supported.

  2. [parallelizable] [Flink 1.8] Deprecate "new Table()" with a different solution

    1. Once table is an interface we can easily replace the underlying implementation at any time. The constructor call prevents us from converting it into an interface.

API Preparation

  1. [parallelizable] [Flink & Blink] Uncouple the `Expression` class from `RexNode`s

    1. This separates API expressions from Calcite’s RexNode.

    2. The Expression class can be moved to api-base and will contain validation logic only.

    3. Possible implementation: every expression will contain a `toPlannerExpression` method that translates to a `PlannerExpression` instances. For example, `PlannerCall(OperatorTable.TRIM, PlannerLiteral(...), PlannerLiteral(...))`. Introduce a visitor pattern in Flink and Blink to translate `PlannerExpression` to `RexNode`s.
      Alternative implementation: we use a big visitor pattern for every expression and don’t introduce planner expressions.

    4. Expressions can still remain implemented in Scala for now. TableEnvironment needs Table needs Expression. So we should start moving expressions first.

  2. [Flink] Make the `Table, GroupedTable, WindowedTable, WindowGroupedTable, OverWindowedTable, Window, OverWindow` class an interface

    1. Implemented in Java in `api-base`.

    2. We can keep the "Table" Scala implementation in a planner module until it has been converted to Java.

    3. We can add a method to the planner later to give us a concrete instance. This is one possibility to have a smooth transition period instead of changing all classes at once.

    4. Move TemporalTableFunction into flink-table-api-base.

  3. [parallelizable] [Flink] Port and move TableSource and TableSink to flink-table-common

    1. This step only unblockes the TableEnvironment interfaces task. Stream/BatchTableSouce/Sink remain in flink-table-api-java-bridge for now until they have been reworked.

  4. [parallelizable] [Flink] Port and move ExternalCatalog, ExternalCatalogTable, TableNotExistException, CatalogNotExistException in flink-table-common

    1. Unblocks TableEnvironment interface task and catalog contribution.

  5. [parallelizable] [Flink] Move QueryConfig, BatchQueryConfig, StreamQueryConfig, TableDescriptor in flink-table-api-base

    1. Unblocks TableEnvironment interface task.

  6. [Flink] Make the TableEnvironment, Java/Scala Batch/StreamTableEnvironment interfaces

Pluggable Planner

  1. [Flink] Introduce the planner interface and perform a planner discovery.

    1. The previous tasks should have split the API from the Planner so we should be able to make it pluggable.

  2. [Blink] Match Blink runtime code against new planner interface, operator interface, sorted map state & other potential differences

    1. At this step Blink doesn’t need to support UDFs, TableSources, TableSinks, ExternalCatalogs, TableSchemas, … The only supported data input would be DataStream. Thanks to this scope limitation, we do not have to unify type system nor we will not have to agree on UDFs/TableSources/Sinks/Catalogs API before merging planner/runtime code.

    2. Until the type system has been reworked, the Blink code needs to convert TypeInformation to Blink’s type abstraction.

    3. Work on the TypeSystem and further API changes can be done independently and in parallel to the merging planner/runtime code.

  3. [parallelizable] [Blink] Reduce Blink’s Maven dependencies to a minimum

    1. Remove dependencies to:
      flink-python, pyrolite, orc, parque, hadoop, hive

    2. Do not use a custom Calcite version.
      In the past we already forked and changed a couple of Calcite classes to fix bugs. If Calcite becomes a bottleneck or limits us too much, we can think about forking Calcite in the future. However, forking Calcite also means that we will tend to introduce non-standard compliant syntax and semantics. Every shift from standard SQL should be discussed with the community before. So let’s disable Blink specific SQL features for now.

Blink Merge

  1. [Flink] Merge Blink planner/runtime code

    1. Blink runtime/planner will be merged against existing Table API % the previous implementation plan steps.

    2. Merging can start from streaming or batch, depending on which prerequisites will be completed first (depending on the flink-runtime changes).

    3. Merging should be happening in as small steps as possible (details to be worked out in a separate discussion), like for example:

      1. statistics package

      2. cost calculator

      3. planner framework

      4. ...

    4. Non-core features such as Hive UDFs, Python UDFs, queryable state integration are out of scope for now. Let’s focus on main features such as make TPC-DS work on streaming.

    5. Throwing hard exceptions for unsupported features is fine at this stage as users can always fall back to the old Flink planner.

Advance the API and Unblock New Features

  1. [Flink] Rework Table & SQL API type system

    1. This includes a lot of API changes and requires a new design document.

    2. Compared to the changes in Blink, we should aim for backwards compatibility and unification of Scala/Java type extraction.

  2. [Flink] Finalize unified table connector story

    1. Design new connector interfaces for unified connections to batch and streaming data.

    2. The discussion might have started earlier but we need the implementation ready at this stage.

    3. The design also influences DDL and catalog work.

    4. It touches TableSchema, sources and sinks.

  3. [Flink] Cleanup table environments and port to Java

    1. Split the table environments into subcomponents (Catalog, API, Planner).

  4. [Flink] Introduce new unified table environment `TableEnvironment.create()` in `api-base`

    1. First, true unification of batch/streaming with unified connectors and environment.

    2. Goal: End-to-end SQL TPC-DS on both batch and streaming sources.

  5. [parallelizable] [Flink] Port expressions to Java to make `api-base` Scala-free

    1. The `api-base` module might have a temporary Scala dependency at this stage.

    2. Make the API Scala-free.

Future Planning

  1. [Flink] Plan long-term goals

    1. Make Flink code and Blink code feature equivalent. For example:

      1. Table API enhancements (e.g. map(), flatMap())

      2. Compatibility with Hive UDFs

      3. Python UDFs?

    2. Our goal is to unblock people with new API features as quickly as possible.

    3. Add columnar connectors such as Parquet and ORC to their corresponding connector/format modules

    4. Make planner Scala-free

  1. [Flink] Discuss & agree on core changes

    1. This is a prerequisite to merging Blink’s runtime.

    2. Currently, Blink touches components such as:
      TypeSerializer, IOManager, MemoryManger and StreamGraphGenerator

  2. [Flink] Discuss & agree on operator interface changes

    1. This is a prerequisite to merging Blink’s batch runtime.

  3. [Flink] Discuss & agree on Sorted Map State

  1. This is a prerequisite to merging Blink’s streaming runtime.


→ A more detailed description and discussion should happen in one or more separate design documents. This document only covers a schedule for flink-table related changes.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Existing tests and additional unit and IT cases will verify the changes.

Rejected Alternatives

The question was: How far should a unification go?

In theory, we could have just one TableEnvironment for users of the table ecosystem plus bridging Batch/Stream environments for Scala and Java for users of DataSet/DataStream:

org.apache.flink.table.api.TableEnvironment (api-java)
org.apache.flink.table.api.java.StreamTableEnvironment (api-java-bridge)

org.apache.flink.table.api.scala.StreamTableEnvironment (api-scala-bridge)

org.apache.flink.table.api.java.BatchTableEnvironment (api-java-bridge)

org.apache.flink.table.api.scala.BatchTableEnvironment (api-scala-bridge) → 5

If this unification is too much, we would end up with 2 API language-specific TableEnvironments plus bridging Batch/Stream environments for Scala and Java users of DataSet/DataStream:

org.apache.flink.table.api.TableEnvironment (api-base)

org.apache.flink.table.api.java.TableEnvironment (api-java)

org.apache.flink.table.api.scala.TableEnvironment (api-scala)
org.apache.flink.table.api.java.StreamTableEnvironment (api-java-bridge)

org.apache.flink.table.api.scala.StreamTableEnvironment (api-scala-bridge)

org.apache.flink.table.api.java.BatchTableEnvironment (api-java-bridge)

org.apache.flink.table.api.scala.BatchTableEnvironment (api-scala-bridge) → 7

→ Decision: We will rework the type system but at a later stage. As can be seen in the implementation plan, a unified TableEnvironment class will come afterwards.

The question was: Should we allow a custom Calcite version?

→ Decision: We can still think about forking Calcite for the features of Blink such as:

1. support temporal table join, i.e. FOR SYSTEM_TIME AS OF syntax

2. support EMIT syntax, used for window early-fire and late-arrival

3. implicit Type Cast

4. support dynamic table parameter
   (like ‘select * from t (key1=value1, key2=value2)’ for Kafka offsets)

5. support more features in MATCH_RECOGNIZE

But every change of SQL syntax and semantics should be discussed thoroughly with the community. These discussions should not block the MVP Blink runtime merging so let's postpone them for now.

Appendix: Porting Guidelines


Some general guidelines when we start porting code to Java or developing new code.


This section is copied from the outdated FLIP-28.


We suggest the following steps to unblock people from developing new features but also start porting existing code either when touching the corresponding class or as a voluntary contribution.


Porting of Existing Classes


In order to clarify the terms here, "porting" means that Scala code is rewritten to Java code without changing the original logic. Breaking existing APIs should be avoided.


Due to different class member visibility principles in Scala and Java, it might be necessary to adapt class structures. For example, `private[flink]` is used quite often and would be `public` in Java which is not always intended, thus, we need to find a reasonable abstraction for these cases.


Since migrating code is a good chance for a code base review, an implementer should pay attention to code deduplication, exposing methods/fields, and proper annotations with `@Internal`, `@PublicEvolving` when performing the migration.


Tests should be migrated in a separate commit. This makes it possible to validate the ported code first before touching test classes.


Development of New Classes


New classes should always be implemented in Java if the surrounding code does not force Scala-specific code.


Examples:


A runtime class that only depends on `ProcessFunction` should be implemented in Java.


A new planner rule or node that only depends on Calcite and runtime classes should be implemented in Java.


If the surrounding code requires Scala, we leave it up to the implementer and committer to decide if related classes should be adapted or even migrated for Java code. If they are not adapted/migrated, a Jira issue should track such a shortcoming and the new class can still be implemented in Scala.


Examples:


A new class needs to implement a trait that requires a Scala collection or `Option` in parameters. The Java code should not depend on Scala classes. Therefore, the trait should be adapted to require Java collections or Java `Optional`. This should happen in a separate commit. If adapting the signature for this trait is too much work because it touches a lot of classes and thus is out of scope for the actual issue, implement a Scala class for now. But open an issue for it to track bigger migration blockers.


A new code generating class needs to be implemented but there are no utility methods for Java so far. Doing multiline code generation with variables and expressions inside is inconvenient in Java. We need to introduce proper tooling for this first, it is acceptable to implement this in Scala for now.


Porting Priorities


The following steps should enable a smooth migration from Java to Scala.


  1. Migrate `flink-table-runtime` classes
    All runtime classes have little dependencies to other classes.

  2. Migrate connector classes to `flink-table-api-*`
    Once we implemented improvements to the unified connector interface, we can also migrate the classes. Among others, it requires a refactoring of the timestamp extractors which are the biggest blockers because they transitively depending on expressions.

  3. Migrate remaining `flink-table-common` classes
    While doing tasks for the new external catalog integration or improvements to the unified connector interfaces, we can migrate the remaining classes.

  4. Migrate remaining API classes
    This includes expressions, logical nodes etc.

  5. Load Scala in `flink-table-planner` into a separate classloader
    After this stage, `flink-table` would be Scala-free from a dependency perspective.

  6. Migrate `flink-table-planner` classes
    Final goal of Scala-free `flink-table`.