Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


PLACEHOLDER PAGE - WILL BE FILLED SHORTLY

Status

Current state: "Under Discussion"

...

Page properties


Discussion thread

...

...

...

...

rxxxq0q9v9pmzd2ht9nybpg5vrzyhwx7
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25217

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents
outlinetrue

Motivation

Nowadays, the Table & SQL API is as important to Flink as the DataStream API. It is one of the main abstractions for expressing pipelines that perform stateful stream processing. Users expect the same backwards compatibility guarantees when upgrading to a newer Flink version as with the DataStream API:

...

For example: for more efficient execution, we introduce a new rule that pushes a filter through an operator O. Since the filter columns are not required by O anymore, the schema would change. However, a completely different plan might be chosen in the end due to improvements in the cost estimation in a new Flink version. In short: It is difficult to ensure savepoint compatibility in such a dynamic topology creation process.

Terminology

...

For clarification, we distinguish between the following terms:

TermDefinition
table program

List of statements that configure the session, connect to catalogs, register (temporary) catalog objects, define and submit one or more pipelines.

A table program can be expressed using Table API in Java or could be a multi-statement SQL script for SQL Client.

pipeline

A pipeline is a DAG that consists of one or more (potentially disconnected) source-to-sink dataflows.

Statement sets allow for n:n source-to-sink dataflows. A pipeline is compiled into a single JobGraph.

See also here.

state

Any value, list, or map member of an operator that is managed by Flink. Each state is snapshotted using the `operator uid + state name` into a savepoint.

During restoration, state is mapped back to any kind of operator (even if the topology changed) using those two components.

upgrade

The change from one Flink minor version to another. For example, from 1.

13

15 to 1.

14

17, or 1.

MAX

17 to

2

1.

0

18.

A patch version change (e.g. 1.13.2 to 1.13.3) is not considered an upgrade and has already been supported before this FLIP.

For simplification of the design, we assume that upgrades use a step size of a single minor version. We don't guarantee skipping minor versions

Upgrading usually involves work which is why many users perform this task rarely (e.g.

1.11 to 1.14). Nevertheless, we might support

only once per year). Also skipping a versions is common until a new feature has been introduced for which is it worth to upgrade. We will support the upgrade to the most recent Flink version from a set of previous versions. We aim to support upgrades from the last 2-3 releases on a best-effort basis

.

An upgrade assumes that only the Flink version has changed. All pipeline defining parameters remain constant. In other words: table program, catalog objects, configuration options, and external JAR files have not changed.

; maybe even more depending on the maintenance overhead. However, in order to not grow the testing matrix infinitely and to perform important refactoring if necessary, we only guarantee upgrades with a step size of a single minor version (i.e. a cascade of upgrades).

As it is common when skipping versions, we still recommend users to check the release notes and perform migration if instructed to do so.

In other words: A user can upgrade between minors and all following minors. The goal is: The same query can remain up and running. E.g. a user upgrades from 1.15 to 1.16, and then from 1.16 to 1.17 and can expect the original query to work without recomputing the data or the plan from the original SQL. This necessarily means that at some point in future releases we'll need some basic "migration tool" to keep the queries up and running, ending up modifying the compiled plan (see also COMPILE PLAN ... FROM ...) or savepoint.

An upgrade assumes that only the Flink version has changed. All pipeline defining parameters remain constant. In other words: table program, catalog objects, configuration options, and external JAR files have not changed.

migration

Actively transforms entities from one format to the other. A migration can happen on different layers. After migration, the format can be read by the next generation. Thus, the old generation is not necessary anymore.

There are four kinds of migration:

State serializer migration: In case both operator and state name have not changed, it is possible to upgrade the state format using the migration capabilities of TypeSerializer with TypeSerializerSnapshot.

Operator migration: An operator declares both the new state and old state with different state names. The initialization method of the operator accesses the old state and actively moves data into the new state format. Afterwards, the old state is empty and thus could be removed from the new savepoint metadata (given such a functionality exists, which is future work). The operator implementation could look completely different, but as long as `operator uid + state name` match, the migration is possible. 

Plan migration: We transform the JSON plan (introduced later in this document) into a new plan. For example, to remove legacy attributes that are not necessary in a new JSON plan layout. This has no impact on the savepoint itself but on older generations of the JSON parsing and validation logic that can drop legacy attributes. Also, if two operators have been fused into a single one in later versions, the plan can be migrated such that it doesn't reference the two legacy operators anymore.

Savepoint migration: We provide a tool that transforms the savepoint data into a new format. This tool could be a Flink job itself (e.g. using the State Processor API). This provides the highest flexibility as the topology can change completely (at least in theory).

There should be some good logging in place when the upgrade/migration is taking place to be able to track every restoration action, and help debug any potential issues arising from that.

backwards compatibility

A table program that has been written in a previous version behaves the same in the new version. No action of the users or any other modification of the table program is required.

savepoint compatibility

The state stored in a savepoint can still be used to initialize (still existing) operators. 

schema evolution

A column has been renamed, added to, or removed from a table or view. The same applies to a (nested) data type of a column or function that transitively affects the overall dynamic table layout and pipeline.

query evolution

A fundamental change to the query. E.g.

migration

Actively transforms entities from one format to the other. A migration can happen on different layers. After migration, the format can be read by the next generation. Thus, the old generation is not necessary anymore.

There are four kinds of migration:

State serializer migration: In case both operator and state name have not changed, it is possible to upgrade the state format using the migration capabilities of TypeSerializer with TypeSerializerSnapshot.

Operator migration: An operator declares both the new state and old state with different state names. The initialization method of the operator accesses the old state and actively moves data into the new state format. Afterwards, the old state is empty and thus could be removed from the new savepoint metadata (given such a functionality exists, which is future work). The operator implementation could look completely different, but as long as `operator uid + state name` match, the migration is possible. 

Plan migration: We transform the JSON plan (introduced later in this document) into a new plan. For example, to remove legacy attributes that are not necessary in a new JSON plan layout. This has no impact on the savepoint itself but on older generations of the JSON parsing and validation logic that can drop legacy attributes. Also, if two operators have been fused into a single one in later versions, the plan can be migrated such that it doesn't reference the two legacy operators anymore.

Savepoint migration: We provide a tool that transforms the savepoint data into a new format. This tool could be a Flink job itself (e.g. using the State Processor API). This provides the highest flexibility as the topology can change completely (at least in theory).

backwards compatibility

A table program that has been written in a previous version behaves the same in the new version. No action of the users or any other modification of the table program is required.

savepoint compatibility

The state stored in a savepoint can still be used to initialize (still existing) operators. 

schema evolution

A column has been renamed, added to, or removed from a table or view. The same applies to a (nested) data type of a column or function that transitively affects the overall dynamic table layout and pipeline.

query evolution

A fundamental change to the query. E.g.

adding a filter condition, a different aggregation, an additional join or subquery.

Scoping

The scope of this FLIP can be summarized as follows using the terminology above:

  • The same query can remain up and running after upgrades.
  • Make upgrades possible that enable savepoint restoration and execution of pipelines that were defined as part of a table program from a previous version.
  • A new Flink version should be backwards compatible with the previous version. If the community decides that the semantics of the previous version were incorrect and a change only affects a small group of users, we reserve the right to break backwards compatibility for the greater good. This policy kind of matches with what we also did in the past regarding SQL semantics and API stability.
  • Schema and query evolution are out of scope.
  • Migration of any kind is future work. We will focus on state serializer and operator migrations and potentially plan migrations in the first versions after 1.15 if necessary.

...

// Setup environments
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create a custom DataStream
final DataStream<Row> inputStream = env.fromElements(
  Row.of(RowKind.UPDATE_AFTER, "Alice"),
  Row.of(RowKind.UPDATE_AFTER, "Bob"));

// Assign a unique uid() for the stream
inputStream.uid("my-stream");

// Read JSON plan file or compile + write it first
// We use a Supplier here but of course this can also be completely custom logic
CompiledPlan plan = CompiledPlan.fromJsonFilefromFile(
  "/my/path/my_flink_job.json",
  () -> {
    tableEnv
      .fromDataStream(inputStream)
      .select($("f0").count())
      // the caller will write out this plan into a file
      .compilePlan(); 
  });

DataStream<Row> outputStream = tableEnv.asDataStream(plan, Row.class);

outputStream.executeAndCollect();

...

A JSON plan example for the following program can be found in the View filenamehere:

json_plan_example.jsonheight250.

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
System.out.println(
        tableEnv.explainSql(
                "SELECT word, SUM(frequency) AS `count`\n"
                        + "FROM (\n"
                        + "  VALUES ('Hello', 1), ('Ciao', 1), ('Hello', 2)\n"
                        + ")\n"
                        + "AS WordTable(word, frequency)\n"
                        + "GROUP BY word"));

== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], count=[SUM($1)])
+- LogicalValues(tuples=[[{ _UTF-16LE'Hello', 1 }, { _UTF-16LE'Ciao', 1 }, { _UTF-16LE'Hello', 2 }]])

== Optimized Physical Plan ==
GroupAggregate(groupBy=[word], select=[word, SUM(frequency) AS count])
+- Exchange(distribution=[hash[word]])
   +- Values(type=[RecordType(VARCHAR(5) word, INTEGER frequency)], tuples=[[{ _UTF-16LE'Hello', 1 }, { _UTF-16LE'Ciao', 1 }, { _UTF-16LE'Hello', 2 }]])

== Optimized Execution Plan ==
GroupAggregate(groupBy=[word], select=[word, SUM(frequency) AS count])
+- Exchange(distribution=[hash[word]])
   +- Values(tuples=[[{ _UTF-16LE'Hello', 1 }, { _UTF-16LE'Ciao', 1 }, { _UTF-16LE'Hello', 2 }]])

...

Batch nodes are not considered in this design.

Remark to 6: A plan compilation will fail hard with a helpful exception. For example, this is the case for invalid ExecNodes, inline objects, or DataStream API source's without proper uid(). Once the plan compilation has been successful, we support the plan in the next Flink version.

Remark to 9: We also need to make sure that all sources/sinks define a uid(). Esp. when using DataStreamScanProvider and DataStreamSinkProvider.

General JSON Plan Assumptions

...

  1. Modules are not part of the JSON plan. They must have been loaded as before.
  2. Most of the table/session configuration is not part of the JSON plan. It must be the same as before. This is important for table factories and UDFs where we don't know what they have consumed.
  3. Configuration consumed by ExecNode should be persisted if it influences the topology or has runtime implications (i.e. not just forwarding the value but using it for calculation). (e.g. `table.exec.sink.not-null-enforcer`). In other words: it is not necessary to persist code gen options but we should persist `table.exec.` options.
  4. Temporary DDL statements (to init the table environment and then make sure the temporary UDFs and connectors can be found) are not part of the JSON plan.
  5. We do not version types. We assume that the type system is stable or will only be extended by more types but not modified.
  6. The origin of every catalog object is stored with the object identifier in the JSON plan. However, we cannot assume that all catalog metadata stays constant as a catalog might be shared across teams. We can only assume that the catalog instance and type (e.g. Hive catalog) stays constant for a session to provide a table factory.
  7. The JSON plan contains no implementation details (i.e. no classes) and is fully declarative.
  8. The JSON plan is versioned by Flink version. 
  9. The JSON plan is internal to Flink. We don't support external changes to the plan officially. However, the JSON plan format must not change between patch releases.
  10. Flink will not perform any kind of validation or consistency checks during restore except for the supported Flink plan version.
  11. We do not version connectors and formats. If a connector adds a new ability, we can simply not apply this ability for older plans. Removing an ability is a problem and would require a correcting operator afterwards.

...

Remark to 3: We add a new JSON `Map<String, String>` property to ExecNode to keep track of the node configuration.

Remark to 9: A plan migration could add such an operator. Alternatively, the connector properties could enable/disable this ability for backward compatibility. We can use catalog table enrichment or plan migration to add a new connector property.

Catalog Objects

Temporary objects will not be persisted in the plan. Many of them have no serializable representation. E.g. a temporary table that is backed by a DataStream API pipeline or a table source instance.

For users, it is easier to understand the rule of thumb to just say "temporary objects are not part of the plan" instead of saying "some can, some cannot".

For inline objects (used with registering) within Table API we do best-effort as shown below.

Views

Views are always inlined. View information will be lost and not part of the plan in any way.

Tables

Temporary tables will not be persisted in the plan. They will be represented by an object identifier.

Inline tables (tables with a generated temporary object identifier) are handled differently:

  • an inline table descriptor will be written into the JSON plan
  • DataStream API cannot be persisted in a JSON plan, they need to be registered first

For persisted catalog tables, depending on the use case, some users would like to

  • persist catalog objects entirely in the JSON plan such that catalog metadata is not required anymore,
  • be able to forward properties (e.g. connection credentials) from the catalog to the compiled plan,
  • not have connector specific options in the plan (e.g. have credentials only stored in the catalog, not in JSON files).

Options

Note: We separate compile and restore options for flexibility. The default values should aim to make most beginner users happy.

7: We use ExecNode name and version instead of class names.

Remark to 8 & 9: We will not change the JSON plan format across patch releases. Since every JSON plan is versioned by the compiling Flink version, external tooling can be build around it but without any guarantees from the Flink community across Flink minor/major versions yet. This is future work and can be offered once the plan design has settled.

Remark to 10: In other words: We verify whether the Flink version contained in the plan is supported. This must not necessarily be the last Flink version. But after that the only consistency check will be the restore deserialization itself.

Remark to 11: A plan migration could add such an operator. Alternatively, the connector properties could enable/disable this ability for backward compatibility. We can use catalog table enrichment or plan migration to add a new connector property.

Catalog Objects

Temporary objects will not be persisted in the plan. Many of them have no serializable representation. E.g. a temporary table that is backed by a DataStream API pipeline or a table source instance.

For users, it is easier to understand the rule of thumb to just say "temporary objects are not part of the plan" instead of saying "some can, some cannot".

For inline objects (used with registering) within Table API we do best-effort as shown below.

Views

Views are always inlined. View information will be lost and not part of the plan in any way.

Tables

Temporary tables will not be persisted in the plan. They will be represented by an object identifier.

Inline tables (tables with a generated temporary object identifier) are handled differently:

  • an inline table descriptor will be written into the JSON plan
  • DataStream API cannot be persisted in a JSON plan, they need to be registered first


For persisted catalog tables, depending on the use case, some users would like to

  • persist catalog objects entirely in the JSON plan such that catalog metadata is not required anymore,
  • be able to forward properties (e.g. connection credentials) from the catalog to the compiled plan,
  • not have connector specific options in the plan (e.g. have credentials only stored in the catalog, not in JSON files).

Options

Note: We separate compile and restore options for flexibility. The default values should aim to make most beginner users happy.

OptionTypeDescription

table.plan.compile

OptionTypeDescription

table.plan.compile.catalog-objects

Enum

ALL (Default): All catalog metadata about a table is persisted: Schema + Identifier + Options
The catalog's "metadata" must not be available anymore. It makes the initial experience better with in-memory catalogs (e.g. building demos using SQL Client). However, the catalog "instance" itself must still be available in case a catalog provides a custom table factory. For commercial versions, it might make sense to set this value to `IDENTIFIER`, because in most cases, enterprises have a persistent catalog and lock changes there.

SCHEMA: Only basic catalog metadata for validation is persisted: Schema + Identifier. This allows validation in case the external catalog table has changed. Options will later require a read from the catalog again.

IDENTIFIER: Only the object identifier is persisted. Reduces the size of the plan. A job may contain dozens of connectors and a table may contain hundreds of fields. The user needs to make sure that catalog tables don't change. Changes of the schema could lead to hard to debug mismatch exceptions during restore. Also options that influence the support of the connector abilities could lead to exceptions.

table.plan.restore

.catalog-objects

Enum

ALL (Default): All catalog

info is read from the JSON plan. If information is missing, a lookup will take place. Also enrichment of forwarded options still takes place (see below). If the metadata is not available anymore, the plan can still be executed.

ALL_ENFORCED: All catalog info must be present in the JSON plan. No lookup.

metadata about a table is persisted: Schema + Identifier + Options
The catalog's "metadata" must not be available anymore. It makes the initial experience better with in-memory catalogs (e.g. building demos using SQL Client). However, the catalog "instance" itself must still be available in case a catalog provides a custom table factory. For commercial versions, it might make sense to set this value to `IDENTIFIER`, because in most cases, enterprises have a persistent catalog and lock changes there.

SCHEMA: Only basic catalog metadata for validation is persisted: Schema + Identifier. This allows validation in case the external catalog table has changed. Options will later require a read from the catalog again.

IDENTIFIER: Only the object identifier is persisted. Reduces the size of the plan. A job may contain dozens of connectors and a table may contain hundreds of fields. The user needs to make sure that catalog tables don't change. Changes of the schema

IDENTIFIER: Everything is read from the catalog again. Changes of the schema or options

could lead to hard to debug mismatch exceptions during restore. Also options that influence the support of the connector abilities could lead to exceptions.

 

table.plan.restore.

enrich

catalog-

table-options

objects

Boolean
Enum
By default

ALL (

true), we always perform a catalog lookup and enrich the plan options with catalog options. In this case, catalog options have precedence. If false, we only use the JSON plan options. 

Default): All catalog info is read from the JSON plan. If information is missing, a lookup will take place. Also enrichment of forwarded options still takes place (see below). If the metadata is not available anymore, the plan can still be executed.

ALL_ENFORCED: All catalog info must be present in the JSON plan. No lookup.

IDENTIFIER: Everything is read from the catalog again. Changes of the schema or options could lead to hard to debug mismatch exceptions. 

table.plan.restore.enrich-table-options

Boolean

By default (true), we always perform a catalog lookup and enrich the plan options with catalog options. In this case, catalog options have precedence. If false, we only use the JSON plan options. 


For the latter option, the factory is the only entity able to judge whether a change causes a modification of semantics or topology. For example, the factory For the latter option, the factory is the only entity able to judge whether a change causes a modification of semantics or topology. For example, the factory knows how a `topic` option for Kafka is forwarded and can allow a name change that doesn't have any side effects for the source abilities or schema.

...

System functions are temporary and will only be persisted by name (and version) in the plan. Therefore, we can also support function instances there (not only classes). Also, temporary catalog functions will only be persisted by name object identifier in the plan and can support instances. No Java serialization required.

...

It should be possible for the user to change the internal UDF implementation (e.g. upgrade a library etc) as long as input/output parameters stay the same. 

Options

Persisted catalog functions are stored by object identifier and class. Similar options as to catalog tables apply:

OptionTypeDescription

table.plan.compile.catalog-objects

Enum

ALL (Default): All catalog metadata about a function is persisted: Identifier + Class.
The catalog's "metadata" must not be available anymore.

SCHEMA: Same with IDENTIFIER because we store the schema anyway.

IDENTIFIER: Only the object identifier is persisted. The user needs to make sure that catalog functions signatures don't change. The class can change.

table.plan.restore.catalog-objects

Enum

ALL (Default): All catalog info is read from the JSON plan. If information is missing, a lookup will take place.

ALL_ENFORCED: All catalog info must be present in the JSON plan. No lookup.

IDENTIFIER: Everything is read from the catalog again.


We only version built-in system functions, not user-defined/catalog ones.

...

Other kinds of statements that might be useful when dealing with plans:


// allowsduring todebugging getit insightsshould (whichbe insightspossible need to be discussed)
// and basic validation of the plan (e.g. resolution of catalog objects)
EXPLAIN PLAN '/mydir/plan.json';

// during debugging it should be possible to already define COMPILE AND EXECUTEalready define COMPILE AND EXECUTE PLAN … FOR
// with a proper path but always recompile
SET 'table.plan.restore.force-recompile'='true';

// -------------------------
// not in the first version
// -------------------------

// drop a plan
DROP PLAN '/mydir/plan.json';

// Perform plan migration in the future.
// This statement has been added for completeness. Users will need to execute it 
// when we instruct them to do so in the release notes. Plan migration will be one
// way of dropping legacy code in Flink before we have savepoint migration.
COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

Table API

In contrast to SQL, the Table API resolves tables, functions, and expressions eagerly for every operation. This means we cannot declare the pipeline fluently without interpreting it already. It makes idempotent statements such as `tableEnv.from("T1").select(...).compileAndExecute()` impossible as both `from()` and `select()` are interpreted eagerly. In programming languages, it is easy to first compile a plan and then pass the instance to an execute method. We offer a supplier method for this.

We have various locations where the optimizer is triggered:

  • single SQL statements
  • statement sets
  • statement sets that are added to DataStream API
  • Table API with sink
  • Table API with implicit collect() sink
  • Table API with DataStream sink (toDataStream and toChangelog).

In the first version, we might not implement all these methods but focus on the most important ones.

CompiledPlan

We provide the concept of a `CompiledPlan`. It is an immutable instance that represents a plan string.

// Read string from local file system
CompiledPlan.fromJsonFile(String path)
CompiledPlan.fromJsonFile(Path path)

// We provide a supplier pattern for convenience
CompiledPlan.fromJsonFile(Path path, Supplier<CompiledPlan> ifNotExist)

// Or create with a string
CompiledPlan.fromJsonString(String json)

// Access the raw content
// (the CompiledPlan.toString might be used for a summary representation)
CompiledPlan.asJsonString(): String

// Use convenience method to write it to a file
CompiledPlan.writeToFile(String path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)
CompiledPlan.writeToFile(Path path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)

Single SQL Statements

// --------------------
// COMPILE
// --------------------

// Interface
TableEnvironment.compilePlanSql(String): CompiledPlan

// Example
tableEnv.compilePlanSql("SELECT * FROM T1");
tableEnv.compilePlanSql("INSERT INTO T2 SELECT * FROM T1");

// --------------------
// RESTORE
// --------------------

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromJsonFile("/my/path")).print();

SQL Query and Table Query

// --------------------
// COMPILE
// --------------------

// Interface
Table.compilePlan(): CompiledPlan

// Example
tableEnv.sqlQuery("SELECT * FROM T1").compilePlan();
tableEnv.from("T1").select($("name")).compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface
TableEnvironment.fromPlan(CompiledPlan): Table

// Example
tableEnv
  .fromPlan(CompiledPlan.fromJsonFile("/my/path"))
  .select($("name"))
  .execute()
  .print();

Statement Sets

EXPLAIN

The current EXPLAIN command is inconsistent. Currently, we support both EXPLAIN and EXPLAIN PLAN FOR:

EXPLAIN [([ExplainDetail[, ExplainDetail]*]) | PLAN FOR] <query_statement_or_insert_statement>

However, the PLAN FOR provides no additional benefit and disables specifying the explain details.

Note: Since "PLAN" should be reserved for the JSON plan, we suggest to deprecate the PLAN FOR syntax.

For consistent EXPLAIN statements we suggest the following syntax:

// regular query
EXPLAIN SELECT * FROM MyTable;

// regular statement set
EXPLAIN STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;

// parameters for explain
EXPLAIN ESTIMATED_COST, CHANGELOG_MODE STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;

// allows to get insights (which insights need to be discussed)
// and basic validation of the plan (e.g. resolution of catalog objects)
EXPLAIN PLAN '/mydir/plan.json';

Table API

In contrast to SQL, the Table API resolves tables, functions, and expressions eagerly for every operation. This means we cannot declare the pipeline fluently without interpreting it already. It makes idempotent statements such as `tableEnv.from("T1").select(...).compileAndExecute()` impossible as both `from()` and `select()` are interpreted eagerly. In programming languages, it is easy to first compile a plan and then pass the instance to an execute method. We offer a supplier method for this.

We have various locations where the optimizer is triggered:

  • single SQL statements
  • statement sets
  • statement sets that are added to DataStream API
  • Table API with sink
  • Table API with implicit collect() sink
  • Table API with DataStream sink (toDataStream and toChangelog).

In the first version, we might not implement all these methods but focus on the most important ones.

CompiledPlan

We provide the concept of a `CompiledPlan`. It is an immutable instance that represents a plan string.

// Read string from local file system
CompiledPlan.fromFile(String path)
CompiledPlan.fromFile(Path path)

// We provide a supplier pattern for convenience
CompiledPlan.fromFile(Path path, Supplier<CompiledPlan> ifNotExist)

// Or create with a string
CompiledPlan.fromJsonString(String json)

// Access the raw content
// (the CompiledPlan.toString might be used for a summary representation)
CompiledPlan.asJsonString(): String

// Use convenience method to write it to a file
CompiledPlan.writeToFile(String path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)
CompiledPlan.writeToFile(Path path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)

Single SQL Statements

// --------------------
// COMPILE
// --------------------

// Interface
StatementSet.compilePlan(): CompiledPlan

// Example
tableEnv
  .createStatementSet()
  .addInsertSql("INSERT INTO T2 SELECT * FROM T1")
  .addInsertSql("INSERT INTO T3 SELECT * FROM T1")
  .compilePlan();

// --------------------
// RESTORECOMPILE
// --------------------

// Same API as single SQL queries.Interface
TableEnvironment.compilePlanSql(String): CompiledPlan

// Example
tableEnv.compilePlanSql("SELECT * FROM T1");
tableEnv.compilePlanSql("INSERT INTO T2 SELECT * FROM T1");

// --------------------
// RESTORE
// --------------------

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromJsonFilefromFile("/my/path")).print();

SQL Query and Table

...

Query

// Refactor the Table.executeInsert() into declarative insertInto() methods
// because otherwise we would need to add a lot of overloading

// This makes the API way easier and solves other shortcomings of the current design.
Table.insertInto(String targetPath): TablePipeline
Table.insertInto(String targetPath, boolean overwrite): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor, boolean overwrite): TablePipeline

TablePipeline.compilePlan(): CompiledPlan
TablePipeline.execute(): TableResult

// Nice improvement on the way,
// this makes the statement set API more concise.
// No need for several overloaded methods any more.
// This method fits perfectly --------------------
// COMPILE
// --------------------

// Interface
Table.compilePlan(): CompiledPlan

// Example
tableEnv.sqlQuery("SELECT * FROM T1").compilePlan();
tableEnv.from("T1").select($("name")).compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface
TableEnvironment.fromPlan(CompiledPlan): Table

// Example
tableEnv
  .fromPlan(CompiledPlan.fromFile("/my/path"))
  .select($("name"))
  .execute()
  .print();

Statement Sets

next to `addInsertSql`.
StatementSet.addInsert(TablePipeline)

// --------------------
// COMPILE
// --------------------

// Interface
TablePipelineStatementSet.compilePlan(): CompiledPlan

// Example
tableEnv
  .fromcreateStatementSet("T1")
  .insertIntoaddInsertSql("INSERT INTO T2 SELECT * FROM T1")
  .addInsertSql("INSERT INTO T3 SELECT * FROM T1")
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Same API as single SQL queries.

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromJsonFilefromFile("/my/path")).print();

...

Table with Sink

// --------------------
// RESTORE
// --------------------

// Interface
StatementSet.addPlan(CompiledPlan)

// Example
tableEnv.createStatementSet()
  .addPlan(CompiledPlan.fromJsonFile("/my/path"))
  .attachAsDataStream();

Table with DataStream API Sink

// Refactor also the insertInto DataStream methods,
// because otherwise we would need to add a lot of overloading.

StreamTableEnvironment.insertIntoDataStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoDataStream(Table, AbstractDataType): TableStreamPipeline<T>
StreamTableEnvironment.insertIntoChangelogStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema, ChangelogMode): TableStreamPipeline<Row>

TableStreamPipeline Refactor the Table.executeInsert() into declarative insertInto() methods
// because otherwise we would need to add a lot of overloading

// This makes the API way easier and solves other shortcomings of the current design.
Table.insertInto(String targetPath): TablePipeline
Table.insertInto(String targetPath, boolean overwrite): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor, boolean overwrite): TablePipeline

TablePipeline.compilePlan(): CompiledPlan
TablePipeline.execute(): TableResult

// Nice improvement on the way,
// this makes the statement set API more concise.
// No need for several overloaded methods any more.
// This method fits perfectly next to `addInsertSql`.
StatementSet.addInsert(TablePipeline)

// --------------------
// COMPILE
// --------------------

// Interface
TablePipeline.compilePlan(): CompiledPlan

// theExample
tableEnv
 naming `asDataStream` is consistent with StreamStatementSet.attachAsDataStream
TableStreamPipeline.asDataStream(): DataStream<T>

// We can also leverage this new API to get a DataStream side output
// within a StreamStatementSet.
// This is future work but has been requested multiple times by users.

// - .from("T1")
  .insertInto("T2")
  .compilePlan();

// --------------------
// COMPILERESTORE
// --------------------

// Same as single SQL queries.

// Interface
TableStreamPipelineTableEnvironment.compilePlanexecutePlan(CompiledPlan): CompiledPlanTableResult

// Example
tableEnv
  .insertIntoDataStreamexecutePlan(tableEnvCompiledPlan.fromfromFile("T1/my/path"))
  .compilePlanprint();

/

Statement Sets added to DataStream API

// --------------------
// RESTORE
// --------------------

// Interface
StatementSet.addPlan(CompiledPlan)

// Example
tableEnv.createStatementSet()
  .addPlan(CompiledPlan.fromFile("/my/path"))
  .attachAsDataStream();

Table with DataStream API Sink

// Refactor also the insertInto DataStream methods,
// because otherwise we would need to add a lot of overloading.

StreamTableEnvironment.asDataStream(CompiledPlan, Class): DataStream<T>

// Example
tableEnv.asDataStream(CompiledPlan.fromJsonFile("/my/path"), Row.class).map(...).executeAndCollect();

Note: We need to store the expected target data type in the JSON plan. This means a JSON representation is not only required for logical type but also conversion classes of all fields. We use a `Class` argument in `asDataStream` to additionally verify the output of the CompiledPlan.

DataStream API Sources

We use a DataStream's uid() to map nodes of CompiledPlan back to a source DataStream.

An exception is thrown if a plan contains input data streams without uid().

DataStream references are always temporary and can be treated similar to temporary views.

CatalogManager can store a mapping from uid -> Transformation in case of a restore. This mapping cleared with `unregisterDataStreams`.

// --------------------
// COMPILE
// --------------------

// Example

DataStream<Row> stream1 = env.fromElements(1, 2, 3);

stream1.uid("stream-1"); // compilation would fail if uid() is not set

tableEnv
  .fromDataStream(stream1)
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface

// We use non-SQL terminology to differentiate from catalog operations.
// Similar to TableEnvironment.registerCatalog().
StreamTableEnvironment.registerDataStream(DataStream<?> dataStreamWithUid)
StreamTableEnvironment.unregisterDataStreams()

// Example

DataStream<Row> stream1 = env.fromElements(1, 2, 3);

stream1.uid("stream-1");

tableEnv.registerDataStream(stream1);

tableEnv.fromPlan(CompiledPlan.fromJsonFile("/my/path")).execute().print();

Note: We need to store the expected source data type in the JSON plan. This means a JSON representation is not only required for logical type but also conversion classes of all fields.

Function Versioning

A question was whether we should serialize Expression instead of RexNode and AggregateCall? At this location of the plan, we need to serialize RexNode but that doesn't mean that we need to expose Calcite names, classes, enums, etc. We should define our JSON format without concepts such as Expression or RexNode.

However, the past has shown that functions change frequently both for type inference and runtime behavior.

Also, the current implementation of `RexNodeJsonDeserializer` looks very fragile and does not support modules.

We should store a version with every serialized RexNode. In order to keep the plan simple, we derive the function's version from the Flink version of the JSON plan by default if no function version has been defined. We extend ModuleManager to lookup FunctionDefinitions by name and version:

ModuleManager.getFunctionDefinition(String name, @Nullable Integer version)

`FunctionCatalogOperatorTable` should then be able to convert to versioned functions.

We will introduce a declarative concept to `BuiltInFunctionDefinitions` and `FlinkSqlOperatorTable` that maintain a function name + version to instance mapping.

For functions that are not using `BridgingSqlFunction`, we might introduce a wrapper SqlOperator that returns the original SqlKind but encodes a version as well that can be accessed by code generator.

Note that the internal digest of an expression therefore might change between Flink versions after restoring from a JSON plan. However, the Web UI should be stable since we encode the operator description in the JSON plan.

Topology Versioning

We consider the ExecNode to Operator 1:n relationship in naming of Transformations. 

We define the following naming convention for the uid() of operators for state restore:

<ExecNodeID>_ExecNodeKind-ExecNodeVersion_OperatorKind
13_stream-exec-sink-1_upsert-materializer

ExecNodeKind and ExecNodeVersion uniquely identify the topology structure for a group of operators. An OperatorKind is responsible for ensuring uniqueness among operators. The ExecNodeID is responsible for uniqueness among multiple usages of the same ExecNode.

We use annotations for code maintenance. See next section.

We will not have a version for operators but only ExecNodes.

In future Flink versions, we might support that a new operator can subscribe to multiple uid()'s of previous operators during restore. This would make operator migration easier.

Testing Infrastructure

ExecNode Tests

The testing infrastructure is crucial for version upgrades. We define three kinds of tests:

  • restore tests
  • change detection tests
  • completeness tests

We introduce annotations for ExecNodes that can be read by the JSON serializer and tests:

@ExecNodeMetadata(
  // main information
  name = "stream-exec-sink",
  version = 1,
  // maintenance information for us/the community and our tests
  added = FlinkVersion.1_15,
consumedOptions = {"table.exec.sink.not-null-enforcer", "table.exec.sink.upsert-materialize"} producedOperators = {"not-null-enforcer", "upsert-materializer"} supportedPlanChanges = {FlinkVersion.1_15, FlinkVersion.1_16}, supportedSavepointChanges = {FlinkVersion.1_15, FlinkVersion.1_16})

...

For JSON serialization/deserialization and uid() generation.

...

For JSON serialization/deserialization and uid() generation.

...

Completeness tests can verify restore tests exist for this Flink version savepoint.

...

Hard coded list of keys in the Flink version when the ExecNode was added. Does not reference instances in the ExecutionConfigOption class in case those get refactored.

Completeness tests can verify that every option is set once in restore and change detection tests.

Completeness tests can verify that the ExecutionConfigOptions class still contains an option (via key or fallback key) for the given key.

Restore can verify whether the restored ExecNode config map contains only options of the given keys.

...

Set of operator names that can be part of the resulting Transformations.

Restore and completeness tests can verify there exists at least one test that adds each operator and that the created Transformations contain only operators with `uid`s containing the given operator names.

The concrete combinations or existence of these operators in the final pipeline depends on various parameters (both configuration and ExecNode-specific arguments such as interval size etc.).

...

supportedPlanChanges
(optional for the first version)

...

Used for plan migration.

Updates when the JSON for the ExecNode changes: e.g. after adding an attribute to the JSON spec of the ExecNode.

Plan migration tests can use this information.

Completeness tests can verify that restore tests exist for all JSON plan variations.

insertIntoDataStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoDataStream(Table, AbstractDataType): TableStreamPipeline<T>
StreamTableEnvironment.insertIntoChangelogStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema, ChangelogMode): TableStreamPipeline<Row>

TableStreamPipeline.compilePlan(): CompiledPlan
// the naming `asDataStream` is consistent with StreamStatementSet.attachAsDataStream
TableStreamPipeline.asDataStream(): DataStream<T>

// We can also leverage this new API to get a DataStream side output
// within a StreamStatementSet.
// This is future work but has been requested multiple times by users.

// --------------------
// COMPILE
// --------------------

// Interface
TableStreamPipeline.compilePlan(): CompiledPlan

// Example
tableEnv
  .insertIntoDataStream(tableEnv.from("T1"))
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface
StreamTableEnvironment.asDataStream(CompiledPlan, Class): DataStream<T>

// Example
tableEnv.asDataStream(CompiledPlan.fromFile("/my/path"), Row.class).map(...).executeAndCollect();


Note: We need to store the expected target data type in the JSON plan. This means a JSON representation is not only required for logical type but also conversion classes of all fields. We use a `Class` argument in `asDataStream` to additionally verify the output of the CompiledPlan.

DataStream API Sources

We use a DataStream's uid() to map nodes of CompiledPlan back to a source DataStream.

An exception is thrown if a plan contains input data streams without uid().

DataStream references are always temporary and can be treated similar to temporary views.

CatalogManager can store a mapping from uid -> Transformation in case of a restore. This mapping cleared with `unregisterDataStreams`.

// --------------------
// COMPILE
// --------------------

// Example

DataStream<Row> stream1 = env.fromElements(1, 2, 3);

stream1.uid("stream-1"); // compilation would fail if uid() is not set

tableEnv
  .fromDataStream(stream1)
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface

// We use non-SQL terminology to differentiate from catalog operations.
// Similar to TableEnvironment.registerCatalog().
StreamTableEnvironment.registerDataStream(DataStream<?> dataStreamWithUid)
StreamTableEnvironment.unregisterDataStreams()

// Example

DataStream<Row> stream1 = env.fromElements(1, 2, 3);

stream1.uid("stream-1");

tableEnv.registerDataStream(stream1);

tableEnv.fromPlan(CompiledPlan.fromFile("/my/path")).execute().print();


Note: We need to store the expected source data type in the JSON plan. This means a JSON representation is not only required for logical type but also conversion classes of all fields.

Function Versioning

A question was whether we should serialize Expression instead of RexNode and AggregateCall? At this location of the plan, we need to serialize RexNode but that doesn't mean that we need to expose Calcite names, classes, enums, etc. We should define our JSON format without concepts such as Expression or RexNode.

However, the past has shown that functions change frequently both for type inference and runtime behavior.

Also, the current implementation of `RexNodeJsonDeserializer` looks very fragile and does not support modules.

We should store a version with every serialized RexNode. We extend ModuleManager to lookup FunctionDefinitions by name and version. If no version is defined, we take the most recent version.

ModuleManager.getFunctionDefinition(String name, @Nullable Integer version)

`FunctionCatalogOperatorTable` should then be able to convert to versioned functions.

We will introduce a declarative concept to `BuiltInFunctionDefinitions` and `FlinkSqlOperatorTable` that maintain a function name + version to instance mapping.

For functions that are not using `BridgingSqlFunction`, we might introduce a wrapper SqlOperator that returns the original SqlKind but encodes a version as well that can be accessed by code generator.

Note that the internal digest of an expression therefore might change between Flink versions after restoring from a JSON plan. However, the Web UI should be stable since we encode the operator description in the JSON plan.

Example

Let's assume we have the following (possibly overloaded) function definition:

TO_TIMESTAMP_LTZ(BIGINT | [STRING ',' STRING])

The JSON plan should look similar to:

{
  "kind" : "CALL",
  "function" : {
    "name" : "TO_TIMESTAMP_LTZ"
    "version": 1
  }
}

If the latest version is specified, the EXPLAIN plan will look similar to:

== Optimized Physical Plan ==
Calc(select=[TO_TIMESTAMP_LTZ(f0)])


Let's assume we change the runtime behavior of TO_TIMESTAMP_LTZ. The signature might remain unchanged in this case. We create a new function definition with the same name for the new behavior.

We keep the old function definition around under an internal name. The physical plan after restore might look similar to:

== Optimized Physical Plan ==
Calc(select=[$TO_TIMESTAMP_LTZ$1(f0)])

$TO_TIMESTAMP_LTZ$1 has been returned by the ModuleManager.

Topology Versioning

ExecNode

We consider the ExecNode to Operator 1:n relationship in naming of Transformations. 

We define the following naming convention for the uid() of operators for state restore:

<ExecNodeID>_ExecNodeKind-ExecNodeVersion_OperatorKind
13_stream-exec-sink-1_upsert-materializer

ExecNodeKind and ExecNodeVersion uniquely identify the topology structure for a group of operators. An OperatorKind is responsible for ensuring uniqueness among operators. The ExecNodeID is responsible for uniqueness among multiple usages of the same ExecNode.

We use annotations for code maintenance. See next section.

We will not have a version for operators but only ExecNodes.

In future Flink versions, we might support that a new operator can subscribe to multiple uid()'s of previous operators during restore. This would make operator migration easier.

Connectors

For sources and sinks that might be composed of multiple operators, we need to propagate the uid suffix for providers. Currently, this is only necessary for DataStreamScanProvider and DataStreamSinkProvider.

// introduce new context
DataStreamSinkProvider.consumeDataStream(DataStream<RowData> dataStream, ProviderContext): DataStreamSink<?> // introduce new context
DataStreamScanProvider.produceDataStream(StreamExecutionEnvironment execEnv, ProviderContext): DataStream<RowData> // Creates one or more ExecNode-aware UIDs ProviderContext.generateUid(String operatorId): String // for example: // ProviderContext.generateUid("preprocessor-1") // leads to: // "13_stream-exec-sink-1_provider_preprocessor-1"

It is the responsibility of the connector implementer to ensure that the operatorId is unique within a provider.

Testing Infrastructure

ExecNode Tests

The testing infrastructure is crucial for version upgrades. We define three kinds of tests:

  • restore tests
  • change detection tests
  • completeness tests

We introduce annotations for ExecNodes that can be read by the JSON serializer and tests:

@ExecNodeMetadata(
  // main information
  name = "stream-exec-sink",
  version = 1,
  // maintenance information for us/the community and our tests
consumedOptions = {"table.exec.sink.not-null-enforcer", "table.exec.sink.upsert-materialize"} producedOperators = {"not-null-enforcer", "upsert-materializer"} minPlanVersion = FlinkVersion.1_15, minStateVersion = FlinkVersion.1_15)

Note: Both persisted entities plan and state can evolve independently from each other which is why both reference a Flink version. A max/end version is not required as we would drop the annotation in this case.

An ExecNode might declare multiple annotations with different versions. See example below.

PropertyDescription
name

For JSON serialization/deserialization and uid() generation. Clearly identifies the ExecNode together with version.

version

For JSON serialization/deserialization and uid() generation. Clearly identifies the ExecNode together with name.

consumedOptions

Hard coded list of keys in the Flink version when the ExecNode was added. Does not reference instances in the ExecutionConfigOption class in case those get refactored.

Completeness tests can verify that every option is set once in restore and change detection tests.

Completeness tests can verify that the ExecutionConfigOptions class still contains an option (via key or fallback key) for the given key.

Restore can verify whether the restored ExecNode config map contains only options of the given keys.

producedOperators

Set of operator names that can be part of the resulting Transformations.

Restore and completeness tests can verify there exists at least one test that adds each operator and that the created Transformations contain only operators with `uid`s containing the given operator names.

The concrete combinations or existence of these operators in the final pipeline depends on various parameters (both configuration and ExecNode-specific arguments such as interval size etc.).

minPlanVersion

Used for plan validation and potentially plan migration.

Updates when the JSON for the ExecNode changes: e.g. after adding an attribute to the JSON spec of the ExecNode.

The annotation does not need to be updated for every Flink version. As the name suggests it is about the "minimum" version for a restore. If the minimum version is higher than the current Flink version, plan migration is necessary.

Changing this version will always result in a new ExecNode version.

Plan migration tests can use this information.

Completeness tests can verify that restore tests exist for all JSON plan variations.

minStateVersion

Used for operator and potentially savepoint migration.

Updates whenever the state layout of an ExecNode changes. In some cases, the operator can perform state migration. If the minimum version is higher than the current Flink version, savepoint migration is necessary.

Changing this version will always result in a new ExecNode version.

Restore tests can verify that operator migration works for all Flink state versions.

Completeness tests can verify that restore tests exist for all state variations.

Annotation Example

Operator State Change

1. Introduction of a new ExecNode A in Flink 1.15

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

2. The state layout of A changed slightly in Flink 1.16. The topology is the same but an operator will use a new state layout in the future:

class X extends ProcessFunction {
  ValueState<A> oldStateLayout;
  ValueState<B> newStateLayout;

  open() {
    if (oldStateLayout.get() != null) {
      performOperatorMigration();
    }
    useNewStateLayout();
  }
}

Every state change will increase the ExecNode version. Since the operator supports both state layouts, adding an additional annotation is enough.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNode extends ExecNode {...}

New plans will use the new ExecNode version 2. Operators will perform operator migration under the hood in the meantime.


3. We would like to drop the support of the old state layout and assume that most operator states have been migrated by now. In this example, we assume already Flink 1.17 for dropping the legacy.

@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNode extends ExecNode {...}

ExecNode version 1 is not supported anymore. Even though the state is actually compatible. The plan restore will fail with a helpful exception that forces users to perform plan migration.

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The plan migration will safely replace the old version 1 with 2. The JSON plan flinkVersion changes to 1.17.

Plan Change

1. Introduction of a new ExecNode A in Flink 1.15

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}


2. The JSON node of ExecNode A gets an additional property in Flink 1.16:

Before:

{
some-prop: 42
}

After:

{
some-prop: 42,
some-flag: false
}

Every plan change will increase the ExecNode version. Since the ExecNode supports both plan layouts, adding an additional annotation is enough.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.16, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

New plans will use the new ExecNode version 2.


3. We would like to drop the support of the old plan layout. In this example, we assume already Flink 1.17 for dropping the legacy.

@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.16, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

ExecNode version 1 is not supported anymore. Even though the state is actually compatible. The plan restore will fail with a helpful exception that forces users to perform plan migration.

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The plan migration will safely insert the new default value for `some-flag`. The JSON plan flinkVersion changes to 1.17.

Topology Change

Note that this is likely the most common case when performing a change.

1. Introduction of a new ExecNode A in Flink 1.15

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}


2. The state layout and topology of A changed fundamentally in Flink 1.16.

We will maintain two ExecNode classes with different versions.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNodeVersion1 extends ExecNode {...}

@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNodeVersion2 extends ExecNode {...}

If the JSON format has not changed for version 1 and 2, we could keep minPlanVersion=1.15 in version 2. This might be useful for users to replace a faulty ExecNode implementation and accepting loosing state. Otherwise, we can set minPlanVersion=1.16

...

Used for operator migration.

...

.

Restore Tests

We will come up with a new testing infrastructure that has

...

We will have completeness tests that verify that every ExecNode version has corresponding tests via classpath scanning and annotation checking.

Connector / Format Tests

We can also use the testing infrastructure for connectors and formats. Esp. connector that use DataStreamSourceProvider and DataStreamSinkProvider need tests for correctly set uid(). This can be done via Completeness Tests.

Function Tests

For functions, we don't need annotations. All built-in functions are either declared in BuiltInFunctionDefinitions or FlinkSqlOperatorTable and we can use reflections for completeness tests, change detection tests, and restore tests.

...

Additional parameter for window table functions
https://github.com/apache/flink/pull/16215
→ Optional parameter for ExecNode. No new ExecNode version necessary.

Rejected Alternatives

Configure per CompiledPlan

Pro: My experience is that user often failed to figure out where they should put the config, they get lost among TableConfig, ExecutionConfig#GlobalJobParameters,StreamExecutionEnvironment#config. So I think it is better that we can only set them at a  single place.

Con: My experience is that user often failed to figure out where they should put the config, they get lost among TableConfig, ExecutionConfig#GlobalJobParameters,StreamExecutionEnvironment#config. So I think it is better that we can only set them at a  single place.

// configure

// define options such as `table.plan.compile.catalog-function`
// per plan/per compilation
CompiledPlan.withOptions(ReadableConfig): CompiledPlan

CompiledPlan.getOptions(): ReadableConfig

COMPILE PLAN OVERWRITE

Pro: We should declare that the file of plan json should not be overwritten, and provide `COMPILE PLAN OVERWRITE '/mydir/plan.json' INSERT ...` to let users compile plan multiple times. Just like "INSERT INTO" and "INSERT OVERWRITE", "INSERT INTO" works in most scenarios.

Con: We decided against OVERWRITE and for a global option `table.plan.restore.force-recompile`. OVERWRITE is not useful in production, only for development/debugging purposes, right? It would be dangerous to accidentally leave this keyword in a SQL script.

Having an option could disable this for production code and removing it disables it for all compilation steps. No need to remove OVERWRITE from every COMPILE statement. It could actually simplify the development process. If OVERWRITE is really needed in the future, we can extend the syntax then. So I'm fine with the current design.

Additional parameter for sink nodes
https://github.com/apache/flink/pull/17699

Compatibility, Deprecation, and Migration Plan

Compatibility is not affected.

We deprecate:

  • BEGIN STATEMENT SET;

In the long-term, we might deprecate the methods that can be expressed with:

  • Table.insertInto(...)
  • StreamTableEnvironment.insertIntoDataStream/ChangelogStream(...)
  • StatementSet.addInsert(TablePipeline)

Rejected Alternatives

Configure per CompiledPlan

Pro: My experience is that user often failed to figure out where they should put the config, they get lost among TableConfig, ExecutionConfig#GlobalJobParameters,StreamExecutionEnvironment#config. So I think it is better that we can only set them at a  single place.

Con: My experience is that user often failed to figure out where they should put the config, they get lost among TableConfig, ExecutionConfig#GlobalJobParameters,StreamExecutionEnvironment#config. So I think it is better that we can only set them at a  single place.

// configure

// define options such as `table.plan.compile.catalog-function`
// per plan/per compilation
CompiledPlan.withOptions(ReadableConfig): CompiledPlan

CompiledPlan.getOptions(): ReadableConfig

COMPILE PLAN OVERWRITE

Pro: We should declare that the file of plan json should not be overwritten, and provide `COMPILE PLAN OVERWRITE

...

'/mydir/plan.json'

...

INSERT

...

...

...

Appendix

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...

` to let users compile plan multiple times. Just like "INSERT INTO" and "INSERT OVERWRITE", "INSERT INTO" works in most scenarios.

Con: We decided against OVERWRITE and for a global option `table.plan.restore.force-recompile`. OVERWRITE is not useful in production, only for development/debugging purposes, right? It would be dangerous to accidentally leave this keyword in a SQL script.

Having an option could disable this for production code and removing it disables it for all compilation steps. No need to remove OVERWRITE from every COMPILE statement. It could actually simplify the development process. If OVERWRITE is really needed in the future, we can extend the syntax then. So I'm fine with the current design.

COMPILE PLAN OVERWRITE '/mydir/plan.json' INSERT ...