Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.
Comment: More content


TBD

Status

Current state"Accepted"

Discussion thread: https://lists.apache.org/thread.html/13c605185f81958ef63c1090f72c82ba979f90f283406f90217dd5db@%3Cdev.flink.apache.org%3E

JIRA: https://issues.apache.org/jira/browse/

Jira
serverASF JIRA
columnskey,summary,type,created,updated,due,assignee,reporter,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-11439

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, the Table & SQL API is implemented in Scala. This decision was made a long-time ago when the initial code base was created as part of a master's thesis. The community kept Scala because of the nice language features that enable a fluent Table API like table.select('field.trim()) and because Scala allows for quick prototyping (e.g. multi-line comments for code generation). The committers enforced not splitting the code-base into two programming languages. Additionally, the API is defined in a single module and tightly coupled to DataSet and DataStream API. The flink-table module has been treated as a library on top of Flink’s main APIs so far.

Nowadays the flink-table module more and more becomes an important and independent part in the Flink ecosystem. Connectors, formats, and SQL client are actually implemented in Java but need to interoperate with flink-table which makes these modules dependent on Scala. As mentioned in an earlier mail thread, using Scala for API classes also exposes member variables and methods in Java that should not be exposed to users. Java is still the most important API language and right now we treat it as a second-class citizen.

Furthermore, many users are using (or are supposed to use) only Table & SQL API with table connectors and catalogs. So a tightly coupling with DataSet and DataStream API is not necessary anymore.

Since bigger code contributions to flink-table are in the pipeline, we need to ensure a smooth transition/replacement of the underlying core at any time without affecting current users. The current module and class structure is not flexible enough without breaking backwards compatibility.

FYI: This FLIP replaces FLIP-28 as it not only reworks the module structure but also discusses future visions of the entire table ecosystem.

Goals

The summarized goals of this document are:

  • Make flink-table Scala-free (long-term goal!).
    We focus on porting the API to Java and keep this goal in mind for future contributions. Scala code can still be merged. But new components can be implemented with a potentially larger/more skilled contributor base and we avoid introducing more technical debt.

  • Remove annoying API design fails
    Currently, the flink-table module contains 7 different TableEnvironments (3 base classes + 4 batch/stream environments for Java and Scala). Let’s make this simpler. We rely on TypeInformation instead of SQL types with proper nullability, decimal precision/scale, and CHAR support. Let’s make this consistent across SQL and Table API.

  • Decouple table programs from DataStream/DataSet API
    Allow table programs to be self-contained. No need for a Stream/ExecutionEnvironment entrypoint anymore. A table program definition is just API that reads and writes to catalog tables.

  • Decouple API from optimizer and runtime
    An underlying planner should be exchangeable at anytime. For future runtime changes and big code contributions such as Blink SQL.

  • Unify batch/streaming
    For pure table programs that read and write to catalog tables, a user should be able to use a unified API with a unified TableEnvironment instance with unified sources and sinks. A dedicated batch table environment will not be necessary anymore.

  • Remain backwards compatible across multiple releases
    In general, we should aim for remaining backwards compatible. However, if there is API that limits us in achieving the goals above. We should deprecate this part of the API, provide an alternative, and remove it one or two releases later. Ideally, we identify these parts for 1.8 such that we can remove them already in Flink 1.9. The flink-table module has no @Public(Evolving) annotations so we can perform changes quickly but selectively. For example, we need to remove the static methods in TableEnvironment. Flink’s BatchTableEnvironment must not be compatible with Blink’s runtime for now.

  • Merge Blink SQL’s features and architectural improvements
    Merge the Blink SQL planner given that necessary Flink core/runtime changes have been completed. The merging will happen in stages (e.g. basic planner framework, then operator by operator). The exact merging plan still needs to be determined.

Disclaimer

This document covers only a general vision. There are multiple subproblems for which we need to find a solution step by step. There might be temporary/intermediate steps to be taken in order to keep reworking efforts low or remain backwards compatible. For example, we can add temporary dependencies to other modules in the dependency structure that is presented below. Of course we should avoid that if possible. But this document does not cover every little subproblem.

Task Dependencies

In order to parallelize the work and unblock people in contributing new features, the following diagram illustrates the tasks and their dependencies. A more detailed implementation plan is attached to this document.

Image Added

Blue: Table API & SQL tasks

Red: Flink Runtime/Core tasks

New Module Structure

As discussed in FLIP-28, a first step to shape the future of the Table & SQL API is to get the module and dependency structure right. We suggest the following structure. Every module is explained in a separate section.

??? = we don’t know yet
* = all modules

flink-table
  flink-table-common
    (dependencies: flink-core)

  flink-table-api-base
    (dependencies: flink-table-common)

  flink-table-api-java
    (dependencies: flink-table-api-base)

  flink-table-api-scala
    (dependencies: flink-table-api-base)

  flink-table-api-java-bridge
    (dependencies: flink-table-api-java, flink-streaming-java)

  flink-table-api-scala-bridge
    (dependencies: flink-table-api-scala, flink-streaming-scala)

  flink-table-planner
    (dependencies: flink-table-api-java-bridge,
                   flink-streaming-scala,
                   Apache Calcite)

  flink-table-planner-blink
    (dependencies: flink-table-runtime-blink, flink-???)

  flink-table-runtime-blink
    (dependencies: flink-???)

  flink-table-dist
    (dependencies: flink-table-*, not flink-table-*-blink?)

At a first glance the structure might appear bloated. But depending on the use case only little dependencies need to be added as shown in the next examples.

A planner module always contains the runtime as well. So a users just needs to add the right planner and API. Additionally, the Blink code is “logically” split into two modules. Thus, in the future it is possible to submit more lightweight table programs to the cluster by just submitting the JobGraph and flink-table-runtime-blink.

Examples:

User wants to write a plugin with external catalog and bunch of sources and sinks.

→ use flink-table-common (vision)
→ use flink-table-api-java-bridge (until TableSources/Sink interfaces have been reworked)

User wants to write a table program in the table ecosystem using Java.

→ use flink-table-api-java

User wants to write a table program in the table ecosystem using Java + execute it in the IDE.

→ use flink-table-api-java + flink-table-planner

User wants to write a table program + translate to Java DataStream API + execute it in the IDE.

→ use flink-table-api-java-bridge + flink-table-planner

Modules

Moved out of flink-libraries as a top-level parent module.

Contains interfaces and common classes that need to be shared across different Flink modules. This module was introduced in Flink 1.7 and its name integrates nicely with the existing Flink naming scheme. A name containing `spi` would also be confusing for users looking for `api`.

There was a discussion about merging flink-table-common with flink-table-api-base. However, it is good software engineering practice to separate API and SPI. It also reduces pollution of other non-table modules with Table API classes.

Connectors, formats, catalogs, and UDFs can use this without depending on the entire Table API stack or Scala. The module contains interface classes such as descriptors, table sink, table source. It will also contain the table factory discovery service such that connectors can discover formats.

The module should only contain Java classes and should have no external dependencies to other modules. A dependency on flink-core should be sufficient. In theory, once we reworked the type system, a dependency to flink-core is not necessary anymore. However, we will need some connection to the core in order to define sources/sinks. Currently, this is DataStream/DataSet API but might be a new source interface in flink-core in the future.

Currently, we cannot add interfaces for connectors into the module as classes such as `StreamTableSource` or `BatchTableSource` require a dependency to `DataStream` or `DataSet` API classes. This might change in the future once we reworked our TableSource and TableSink interfaces. For now, extension points for connectors are located in `flink-table-api-*-bridge` and tightly integrate with the target API.

In the future, we might need to add some basic expression representations (for <, >, ==, !=, field references, literals, symbols, generic call) in order to push down filter predicates into sources without adding a dependency on `flink-table-api-base` or Calcite.

Contains API classes such as expressions, TableConfig and base interfaces and classes for Table and TableEnvironment. It contains most classes from `org.apache.flink.table.api.*` plus some additional classes. It contains subclasses of `org.apache.flink.table.plan.logical.LogicalNode`.

Users will not depend directly on this module. This module will be used by language-specific modules. Users will depend on "flink-table-api-java" or "flink-table-api-scala" when writing a table program.

The module should only contain Java classes.

The module will not contain a Calcite dependency. This means that LogicalNode and Expression classes won’t have RelNode and RexNode anymore. LogicalNode and Expression become the output of the Table API and need to be converted into Calcite optimizer RelNode and RexNodes nodes by the planner.

This module defines a Planner interface for passing the API plan, optimization, and execution. The base table environment performs a Java Service Provider based discovery for finding a planner matching the requirements expressed through the API. This mechanism is explained in the next part of this document.

Regarding the large expression code base: this structure means that all API expression representations need to be reworked. This is a lot of work and will not happen quickly. As a temporary solution there are different alternatives:

  1. We would let the expression case classes be located in the planner module for now. Since users added a planner module dependency the translation still works.

  2. We translate to just a generic `Call(“functionname”, expr*)` expression and the resolution is done by the planner.

Contains API classes with interfaces targeted to Java users. All classes here will be prefixed with `org.apache.flink.table.api.java`.

A prerequisite for a unified table environment in this module is that the Table API type system has been reworked. Otherwise we need Java type extraction and return type information.

TODO If Java and Scala would share the same type extraction logic, we could simplify the architecture a lot. We would only need one TableEnvironment anymore, because the biggest difference between Scala and Java table environments is the type extraction. In the past this was already very confusing for people. For example, sometimes case classes go through the Java type extractor and sometimes they go through the Scala type analyzer. If we unify the type system, we could unify api-java and api-base into a single module. If we don’t rework the type system, the api-scala module needs a dependency on flink-scala only for the Scala type analyzer macro.

Contains API classes with interfaces targeted to Scala users such as Scala implicit expression conversions (expressionDsl.scala). All classes here will be prefixed with `org.apache.flink.table.api.scala`.

The module should only contain Scala classes.


There were opinions about letting `flink-table-api-scala` depend on `flink-table-api-java` and removing the base module. However, the problem with this approach is that classes such as `BatchTableEnvironment` or `Tumble` would be twice in the classpath. In the past, this led to confusion because people were not always paying attention to their imports and were mixing Java API with Scala API. The current module structure avoids ambiguity.

A prerequisite for a unified table environment in this module is that the Table API type system has been reworked. Otherwise we need Scala macros for type information extraction. These macros are located in `flink-scala` right now and would pull in DataSet dependencies.

Bridges the table ecosystem with the Java DataSet and Java DataStream API. It provides classes for non-unified table environments `org.apache.flink.table.api.java.BatchTableEnvironment` and `org.apache.flink.table.api.java.StreamTableEnvironment` that can convert back and forth between the target API. This means that toAppendStream, fromAppendStream etc. are only available in those bridging table environments. Bridging functionality includes integration with TypeInformation.

Until table sources and sink interfaces have been reworked, this module will also contain StreamTableSource, BatchTableSource, etc. because those interfaces are tightly coupled with DataStream/DataSet API right now.

Bridges the table ecosystem with the Scala DataSet and Scala DataStream API. It provides classes for non-unified table environments `org.apache.flink.table.api.scala.BatchTableEnvironment` and `org.apache.flink.table.api.scala.StreamTableEnvironment` that can convert back and forth between the target API. This means that toAppendStream, fromAppendStream etc. are only available in those bridging table environments. Bridging functionality includes integration with TypeInformation.

Contains the main logic for converting a logical representation into DataStream/DataSet program from the old Flink flink-table logic. The planner module bridges `api` module and runtime code similar to how it is done in the DataSet API of Flink. A user has to add `flink-table-api-scala/java*` and a planner in order to execute a program in an IDE.

This module contains the original `flink-table` module classes until it is replaced with better code :-) API classes will gradually be converted into Java and distributed to their future location in `flink-table-api-*`.

Compared to the future planner this planner contains the runtime code already. The reason for this is because we are currently using Calcite functions during runtime e.g. for LIKE or timestamp conversion. For the future we should aim to separate runtime and planning phase clearly as explained below.

Contains the work-in-progress Blink planner code. It contains the main logic for converting a logical representation into StreamTransformation program. Once this module is stable, it will be renamed to flink-table-planner such that users don’t need to update their dependencies. This module will be the only module with a Apache Calcite dependency.

Contains the work-in-progress Blink runtime code. It contains the main logic for executing a table program. It aims to make JAR files that need to be submitted to the cluster small. It has no dependencies to Calcite.

This module just bundles the table ecosystem into one JAR file that can be moved into the /lib directory from /opt in a Flink distribution.

The module does not contain the Blink code as class name could clash otherwise. If we want to include all planners here, we need to shade/relocate the flink-table-planner, flink-table-planner-blink, and flink-table-runtime-blink packages.

The SQL Client logically belongs to `flink-table` and should be moved under this moduleDescribe the problems you are trying to solve.


Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

...