...
@ExecNodeMetadata(
// main information
name = "stream-exec-sink",
version = 1,
// maintenance information for us/the community and our tests
added = FlinkVersion.1_15,
consumedOptions = {"table.exec.sink.not-null-enforcer", "table.exec.sink.upsert-materialize"}
producedOperators = {"not-null-enforcer", "upsert-materializer"}
supportedPlanChangesminPlanVersion = {FlinkVersion.1_15, FlinkVersion.1_16},
supportedSavepointChangesminStateVersion = {FlinkVersion.1_15, FlinkVersion.1_16})
)
Note: Both persisted entities plan and state can evolve independently from each other which is why both reference a Flink version.
An ExecNode might declare multiple annotations with different versions. See example below.
Property | Description | ||
---|---|---|---|
name | For JSON serialization/deserialization and uid() generation. Clearly identifies the ExecNode together with version. | ||
version | For JSON serialization/deserialization and uid() generation. | added | Completeness tests can verify restore tests exist for this Flink version savepointClearly identifies the ExecNode together with name. |
consumedOptions | Hard coded list of keys in the Flink version when the ExecNode was added. Does not reference instances in the ExecutionConfigOption class in case those get refactored. Completeness tests can verify that every option is set once in restore and change detection tests. Completeness tests can verify that the ExecutionConfigOptions class still contains an option (via key or fallback key) for the given key. Restore can verify whether the restored ExecNode config map contains only options of the given keys. | ||
producedOperators | Set of operator names that can be part of the resulting Transformations. Restore and completeness tests can verify there exists at least one test that adds each operator and that the created Transformations contain only operators with `uid`s containing the given operator names. The concrete combinations or existence of these operators in the final pipeline depends on various parameters (both configuration and ExecNode-specific arguments such as interval size etc.). | ||
supportedPlanChanges | Used for plan validation and potentially plan migration. Updates when the JSON for the ExecNode changes: e.g. after adding an attribute to the JSON spec of the ExecNode. The annotation does not need to be updated for every Flink version. As the name suggests it is about the "Changes" (in other words: incompatibilities) that require migrationminimum" version for a restore. If the minimum version is higher than the current Flink version, plan migration is necessary. Changing this version will always result in a new ExecNode version. Plan migration tests can use this information. Completeness tests can verify that restore tests exist for all JSON plan variations. | ||
supportedSavepointChangesminStateVersion | Used for operator and potentially savepoint migration. Updates when whenever the state layout of an operator in the ExecNode changes. But In some cases, the operator can perform state migration. If the minimum version is higher than the current Flink version, savepoint migration is necessary. Changing this version will always result in a new ExecNode versionThe annotation does not need to be updated for every Flink version. As the name suggests it is about "Changes" (in other words: incompatibilities) that require migration. Restore tests can verify that operator migration works for all listed Flink savepoint state versions. Completeness tests can verify that restore tests exist for all state variations. |
Annotation Example
Operator State Change
1. Introduction of a new ExecNode A in Flink 1.15
@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}
2. The state layout of A changed slightly in Flink 1.16. The topology is the same but an operator will use a new state layout in the future:
class X extends ProcessFunction {
ValueState<A> oldStateLayout;
ValueState<B> newStateLayout;
open() {
if (oldStateLayout.get() != null) {
performOperatorMigration();
}
useNewStateLayout();
}
}
Every state change will increase the ExecNode version. Since the operator supports both state layouts, adding an additional annotation is enough.
@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNode extends ExecNode {...}
New plans will use the new ExecNode version 2. Operators will perform operator migration under the hood in the meantime.
3.
1. Let's assume we introduced two ExecNodes A and B in Flink 1.15.
...