Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

@ExecNodeMetadata(
  // main information
  name = "stream-exec-sink",
  version = 1,
  // maintenance information for us/the community and our tests
  added = FlinkVersion.1_15,
consumedOptions = {"table.exec.sink.not-null-enforcer", "table.exec.sink.upsert-materialize"} producedOperators = {"not-null-enforcer", "upsert-materializer"} supportedPlanChangesminPlanVersion = {FlinkVersion.1_15, FlinkVersion.1_16}, supportedSavepointChangesminStateVersion = {FlinkVersion.1_15, FlinkVersion.1_16}) )

Note: Both persisted entities plan and state can evolve independently from each other which is why both reference a Flink version.

An ExecNode might declare multiple annotations with different versions. See example below.

Completeness tests can verify restore tests exist for this Flink version savepoint
PropertyDescription
name

For JSON serialization/deserialization and uid() generation. Clearly identifies the ExecNode together with version.

version

For JSON serialization/deserialization and uid() generation.

added

Clearly identifies the ExecNode together with name.

consumedOptions

Hard coded list of keys in the Flink version when the ExecNode was added. Does not reference instances in the ExecutionConfigOption class in case those get refactored.

Completeness tests can verify that every option is set once in restore and change detection tests.

Completeness tests can verify that the ExecutionConfigOptions class still contains an option (via key or fallback key) for the given key.

Restore can verify whether the restored ExecNode config map contains only options of the given keys.

producedOperators

Set of operator names that can be part of the resulting Transformations.

Restore and completeness tests can verify there exists at least one test that adds each operator and that the created Transformations contain only operators with `uid`s containing the given operator names.

The concrete combinations or existence of these operators in the final pipeline depends on various parameters (both configuration and ExecNode-specific arguments such as interval size etc.).

supportedPlanChanges
(optional for the first version)minPlanVersion

Used for plan validation and potentially plan migration.

Updates when the JSON for the ExecNode changes: e.g. after adding an attribute to the JSON spec of the ExecNode.

The annotation does not need to be updated for every Flink version. As the name suggests it is about the "Changes" (in other words: incompatibilities) that require migrationminimum" version for a restore. If the minimum version is higher than the current Flink version, plan migration is necessary.

Changing this version will always result in a new ExecNode version.

Plan migration tests can use this information.

Completeness tests can verify that restore tests exist for all JSON plan variations.

supportedSavepointChangesminStateVersion

Used for operator and potentially savepoint migration.

Updates when whenever the state layout of an operator in the ExecNode changes. But In some cases, the operator can perform state migration. If the minimum version is higher than the current Flink version, savepoint migration is necessary.

Changing this version will always result in a new ExecNode versionThe annotation does not need to be updated for every Flink version. As the name suggests it is about "Changes" (in other words: incompatibilities) that require migration.

Restore tests can verify that operator migration works for all listed Flink savepoint state versions.

Completeness tests can verify that restore tests exist for all state variations.

Annotation Example

Operator State Change

1. Introduction of a new ExecNode A in Flink 1.15

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}


2. The state layout of A changed slightly in Flink 1.16. The topology is the same but an operator will use a new state layout in the future:

class X extends ProcessFunction {
  ValueState<A> oldStateLayout;
  ValueState<B> newStateLayout;

  open() {
    if (oldStateLayout.get() != null) {
      performOperatorMigration();
    }
    useNewStateLayout();
  }
}

Every state change will increase the ExecNode version. Since the operator supports both state layouts, adding an additional annotation is enough.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNode extends ExecNode {...}

New plans will use the new ExecNode version 2. Operators will perform operator migration under the hood in the meantime.

3. 


1. Let's assume we introduced two ExecNodes A and B in Flink 1.15.

...