Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note: We need to store the expected target data type in the JSON plan. This means a JSON representation is not only required for logical type but also conversion classes of all fields. We use a `Class` argument in `asDataStream` to additionally verify the output of the CompiledPlan.

DataStream API Sources

We use a DataStream's uid() to map nodes of CompiledPlan back to a source DataStream.

An exception is thrown if a plan contains input data streams without uid().

DataStream references are always temporary and can be treated similar to temporary views.

CatalogManager can store a mapping from uid -> Transformation in case of a restore. This mapping cleared with `unregisterDataStreams`.

// --------------------
// COMPILE
// --------------------

// Example

DataStream<Row> stream1 = env.fromElements(1, 2, 3);

stream1.uid("stream-1"); // compilation would fail if uid() is not set

tableEnv
  .fromDataStream(stream1)
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface

// We use non-SQL terminology to differentiate from catalog operations.
// Similar to TableEnvironment.registerCatalog().
StreamTableEnvironment.registerDataStream(DataStream<?> dataStreamWithUid)
StreamTableEnvironment.unregisterDataStreams()

// Example

DataStream<Row> stream1 = env.fromElements(1, 2, 3);

stream1.uid("stream-1");

tableEnv.registerDataStream(stream1);

tableEnv.fromPlan(CompiledPlan.fromJsonFile("/my/path")).execute().print();


Note: We need to store the expected source data type in the JSON plan. This means a JSON representation is not only required for logical type but also conversion classes of all fields.

Function Versioning

A question was whether we should serialize Expression instead of RexNode and AggregateCall? At this location of the plan, we need to serialize RexNode but that doesn't mean that we need to expose Calcite names, classes, enums, etc. We should define our JSON format without concepts such as Expression or RexNode.

However, the past has shown that functions change frequently both for type inference and runtime behavior.

Also, the current implementation of `RexNodeJsonDeserializer` looks very fragile and does not support modules.

We should store a version with every serialized RexNode. In order to keep the plan simple, we derive the function's version from the Flink version of the JSON plan by default if no function version has been defined. We extend ModuleManager to lookup FunctionDefinitions by name and version:

ModuleManager.getFunctionDefinition(String name, @Nullable Integer version)

`FunctionCatalogOperatorTable` should then be able to convert to versioned functions.

We will introduce a declarative concept to `BuiltInFunctionDefinitions` and `FlinkSqlOperatorTable` that maintain a function name + version to instance mapping.

For functions that are not using `BridgingSqlFunction`, we might introduce a wrapper SqlOperator that returns the original SqlKind but encodes a version as well that can be accessed by code generator.

Note that the internal digest of an expression therefore might change between Flink versions after restoring from a JSON plan. However, the Web UI should be stable since we encode the operator description in the JSON plan.

Topology Versioning

We consider the ExecNode to Operator 1:n relationship in naming of Transformations. 

We define the following naming convention for the uid() of operators for state restore:

<ExecNodeID>_ExecNodeKind-ExecNodeVersion_OperatorKind
13_stream-exec-sink-1_upsert-materializer

ExecNodeKind and ExecNodeVersion uniquely identify the topology structure for a group of operators. An OperatorKind is responsible for ensuring uniqueness among operators. The ExecNodeID is responsible for uniqueness among multiple usages of the same ExecNode.

We use annotations for code maintenance. See next section.

We will not have a version for operators but only ExecNodes.

In future Flink versions, we might support that a new operator can subscribe to multiple uid()'s of previous operators during restore. This would make operator migration easier.

Testing Infrastructure

ExecNode Tests

The testing infrastructure is crucial for version upgrades. We define three kinds of tests:

  • restore tests
  • change detection tests
  • completeness tests

We introduce annotations for ExecNodes that can be read by the JSON serializer and tests:

@ExecNodeMetadata(
  // main information
  name = "stream-exec-sink",
  version = 1,
  // maintenance information for us/the community and our tests
  added = FlinkVersion.1_15,
consumedOptions = {"table.exec.sink.not-null-enforcer", "table.exec.sink.upsert-materialize"} producedOperators = {"not-null-enforcer", "upsert-materializer"} supportedPlanChanges = {FlinkVersion.1_15, FlinkVersion.1_16}, supportedSavepointChanges = {FlinkVersion.1_15, FlinkVersion.1_16})


PropertyDescription
name

For JSON serialization/deserialization and uid() generation.

version

For JSON serialization/deserialization and uid() generation.

added

Completeness tests can verify restore tests exist for this Flink version savepoint.

consumedOptions

Hard coded list of keys in the Flink version when the ExecNode was added. Does not reference instances in the ExecutionConfigOption class in case those get refactored.

Completeness tests can verify that every option is set once in restore and change detection tests.

Completeness tests can verify that the ExecutionConfigOptions class still contains an option (via key or fallback key) for the given key.

Restore can verify whether the restored ExecNode config map contains only options of the given keys.

producedOperators

Set of operator names that can be part of the resulting Transformations.

Restore and completeness tests can verify there exists at least one test that adds each operator and that the created Transformations contain only operators with `uid`s containing the given operator names.

The concrete combinations or existence of these operators in the final pipeline depends on various parameters (both configuration and ExecNode-specific arguments such as interval size etc.).

supportedPlanChanges
(optional for the first version)

Used for plan migration.

Updates when the JSON for the ExecNode changes: e.g. after adding an attribute to the JSON spec of the ExecNode.

Plan migration tests can use this information.

Completeness tests can verify that restore tests exist for all JSON plan variations.

supportedSavepointChanges

Used for operator migration.

Restore tests can verify that operator migration works for all listed Flink savepoint versions.

Restore Tests

We will come up with a new testing infrastructure that has

  • input data
    split into two parts:
    - one for creating the savepoint
    - one for testing the restored plan incl. state
  • savepoint,
  • JSON plan + original pipeline definition (SQL/Table API) for future compilation,
  • and output data

for every ExecNode version.

We will not maintain savepoints for every Flink version but only for the Flink version where a new ExecNode version was introduced. We might extend the testing matrix if necessary but initially we try to keep the release overhead low.

We will add one complex end-to-end test for which we test savepoints and JSON plans of different versions. During cutting a feature branch, we can regenerate a JSON plan and savepoint, similar to our serializer upgrade tests.

Once we have more experience with ExecNodes that have been changed, we might reorganize the ExecNode structure in dedicated packages or modules. Since the JSON plan and savepoint should be independent of concrete classes, this can be future work.

Change Detection Tests

We will generate and check-in the JSON plan for the current ExecNode versions in master. This happens already today, it should help us detect plan changes for which we need to find alternatives.

The change detection tests are derived from the restore tests by compiling the given pipeline definition in the most recent ExecNode version.

Completeness Tests

Completeness tests should help contributors not to forget to add tests when performing a code change.

We will have completeness tests that verify that every ExecNode version has corresponding tests via classpath scanning and annotation checking.

Function Tests

For functions, we don't need annotations. All built-in functions are either declared in BuiltInFunctionDefinitions or FlinkSqlOperatorTable and we can use reflections for completeness tests, change detection tests, and restore tests.

We can extend BuiltInFunctionTestBase to test different versions.

Currently, not all functions are ported to this test base. We can at least port one parameter combination of every function to it.

We can additionally compute a hash from the explain string of input and output type inference to detect changes in the function definition. Runtime changes should be visible via the tests of BuiltInFunctionTestBase.

Code Change Guidelines

We will formulate guidelines to help contributors when a change requires a new ExecNode or function version.

However, this is not part of this FLIP and will happen once the completeness tests and other checks are in place. E.g. "if test XYZ fails, a new ExecNode version needs to be added".

Examples for Design Verification

Runtime utility has been moved to a different package
https://github.com/apache/flink/pull/17454
→ No migration necessary. Refactored utility will be used by all versions of ExecNodes.

Source ability interface has been extended by an additional parameter
https://github.com/apache/flink/pull/17662
→ Plan migration necessary. We would have needed to remove metadata columns from the projection type.
→ Alternatively, this could also have been done by the `ProjectPushDownSpec`.

Coalesce has been reworked and introduces rules
https://github.com/apache/flink/pull/17256
→ No migration necessary because the old plan is still runnable even though it might be incorrect.

Changing the semantics of CAST
https://issues.apache.org/jira/browse/FLINK-24403
→ Either we version this function or (more likely) we introduce a hard config option to restore the old behavior. Since the old behavior was invalid, we could also argue that the change is incompatible with previous versions.

Additional information is passed to ExecNode for incremental aggregation with distinct and count star
https://github.com/apache/flink/pull/16759
→ ExecNode version upgrade is not necessary. But old JSON plans will produce an invalid result due to missing information.

Typo change in LogicalType and thus change in distinct views
https://github.com/apache/flink/pull/16020 
→ Should not be a problem if distinct views don't work with Java serialization anymore.
→ Bug in StructuredType was forwarded to JSON and could have been avoided.

Preserve function identifier during filter pushdown
https://github.com/apache/flink/pull/16396 
→ Change in JSON plan could have been avoided if better function JSON representation.

Additional parameter for window table functions
https://github.com/apache/flink/pull/16215
→ Optional parameter for ExecNode. No new ExecNode version necessary.

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...