Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

PropertyDescription
name

For JSON serialization/deserialization and uid() generation.

version

For JSON serialization/deserialization and uid() generation.

added

Completeness tests can verify restore tests exist for this Flink version savepoint.

consumedOptions

Hard coded list of keys in the Flink version when the ExecNode was added. Does not reference instances in the ExecutionConfigOption class in case those get refactored.

Completeness tests can verify that every option is set once in restore and change detection tests.

Completeness tests can verify that the ExecutionConfigOptions class still contains an option (via key or fallback key) for the given key.

Restore can verify whether the restored ExecNode config map contains only options of the given keys.

producedOperators

Set of operator names that can be part of the resulting Transformations.

Restore and completeness tests can verify there exists at least one test that adds each operator and that the created Transformations contain only operators with `uid`s containing the given operator names.

The concrete combinations or existence of these operators in the final pipeline depends on various parameters (both configuration and ExecNode-specific arguments such as interval size etc.).

supportedPlanChanges
(optional for the first version)

Used for plan migration.

Updates when the JSON for the ExecNode changes: e.g. after adding an attribute to the JSON spec of the ExecNode.

The annotation does not need to be updated for every Flink version. As the name suggests it is about "Changes" (in other words: incompatibilities) that require migration.

Plan migration tests can use this information.

Completeness tests can verify that restore tests exist for all JSON plan variations.

supportedSavepointChanges

Used for operator migration.

Updates when the state layout of an operator in the ExecNode changes. But the operator can perform state migration.

The annotation does not need to be updated for every Flink version. As the name suggests it is about "Changes" (in other words: incompatibilities) that require migration.

Restore tests can verify that operator migration works for all listed Flink savepoint versions.

Annotation Example

1. Let's assume we introduced two ExecNodes A and B in Flink 1.15.

The initial annotations in 1.15 are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15, supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15, supportedSavepointChanges=1.15)


2. We change an operator state of B in Flink 1.16. We perform the change in the operator of B in a way to support both state layouts. Thus, no need for a new ExecNode version.

The annotations in 1.16 are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15, supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15, supportedSavepointChanges=1.15, 1.16)


So the versions in the annotations are "start version"s. End version would mean that we drop the ExecNode from the code base.


3. In Flink 1.18, we assume that all pipelines have upgraded. So drop support for operator migration in B:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15, supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15, supportedSavepointChanges=1.16)

Restore Tests

We will come up with a new testing infrastructure that has

...