Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


...

Page properties

...


Discussion

...

threadhttps://

...

...

...

...

rxxxq0q9v9pmzd2ht9nybpg5vrzyhwx7
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25217

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

TermDefinition
table program

List of statements that configure the session, connect to catalogs, register (temporary) catalog objects, define and submit one or more pipelines.

A table program can be expressed using Table API in Java or could be a multi-statement SQL script for SQL Client.

pipeline

A pipeline is a DAG that consists of one or more (potentially disconnected) source-to-sink dataflows.

Statement sets allow for n:n source-to-sink dataflows. A pipeline is compiled into a single JobGraph.

See also here.

state

Any value, list, or map member of an operator that is managed by Flink. Each state is snapshotted using the `operator uid + state name` into a savepoint.

During restoration, state is mapped back to any kind of operator (even if the topology changed) using those two components.

upgrade

The change from one Flink minor version to another. For example, from 1.13 15 to 1.1417, or 1.MAX 17 to 21.018.

A patch version change (e.g. 1.13.2 to 1.13.3) is not considered an upgrade and has already been supported before this FLIP.

For simplification of the design, we assume that upgrades use a step size of a single minor version (i.e. a cascade of upgrades). We don't guarantee skipping minor versions (e.g. 1.11 to 1.14). Nevertheless, we might support Upgrading usually involves work which is why many users perform this task rarely (e.g. only once per year). Also skipping a versions is common until a new feature has been introduced for which is it worth to upgrade. We will support the upgrade to the most recent Flink version from a set of previous versions. We aim to support upgrades from the last 2-3 releases on a best-effort basis; maybe even more depending on the maintenance overhead. However, in order to not grow the testing matrix infinitely and to perform important refactoring if necessary, we only guarantee upgrades with a step size of a single minor version (i.e. a cascade of upgrades).

As it is common when skipping versions, we still recommend users to check the release notes and perform migration if . The reason for requiring a step size of one is mostly to force users to check the release notes and perform migration if instructed to do so.

In other words: A user can upgrade between minors and all following minors. The goal is: The same query can remain up and running. E.g. a user upgrades from 1.15 to 1.16, and then from 1.16 to 1.17 and can expect the original query to work without recomputing the data or the plan from the original SQL. This necessarily means that at some point in future releases we'll need some basic "migration tool" to keep the queries up and running, ending up modifying the compiled plan (see also COMPILE PLAN ... FROM ...) or savepoint.

An upgrade assumes that only the Flink version has changed. All pipeline defining parameters remain constant. In other words: Users need to go through all intermediate versions to end up with the target version they wish. If we are to support skipping versions in the future, i.e. upgrade from 1.14 to 1.17, this means that we need to have a testing infrastructure in place that would test all possible combinations of version upgrades, i.e. from 1.14 to 1.15, from 1.14 to 1.16 and so forth, while still testing and of course supporting all the upgrades from the previous minor version. This could explode our testing matrix.

An upgrade assumes that only the Flink version has changed. All pipeline defining parameters remain constant. In other words: table program, catalog objects, configuration options, and external JAR files have not changed.

table program, catalog objects, configuration options, and external JAR files have not changed.

migration

Actively transforms entities from one format to the other. A migration can happen on different layers. After migration, the format can be read by the next generation. Thus, the old generation is not necessary anymore.

There are four kinds of migration:

State serializer migration: In case both operator and state name have not changed, it is possible to upgrade the state format using the migration capabilities of TypeSerializer with TypeSerializerSnapshot.

Operator migration: An operator declares both the new state and old state with different state names. The initialization method of the operator accesses the old state and actively moves data into the new state format. Afterwards, the old state is empty and thus could

migration

Actively transforms entities from one format to the other. A migration can happen on different layers. After migration, the format can be read by the next generation. Thus, the old generation is not necessary anymore.

There are four kinds of migration:

State serializer migration: In case both operator and state name have not changed, it is possible to upgrade the state format using the migration capabilities of TypeSerializer with TypeSerializerSnapshot.

Operator migration: An operator declares both the new state and old state with different state names. The initialization method of the operator accesses the old state and actively moves data into the new state format. Afterwards, the old state is empty and thus could be removed from the new savepoint metadata (given such a functionality exists, which is future work). The operator implementation could look completely different, but as long as `operator uid + state name` match, the migration is possible. 

Plan migration: We transform the JSON plan (introduced later in this document) into a new plan. For example, to remove legacy attributes that are not necessary in a new JSON plan layout. This has no impact on the savepoint itself but on older generations of the JSON parsing and validation logic that can drop legacy attributes. Also, if two operators have been fused into a single one in later versions, the plan can be migrated such that it doesn't reference the two legacy operators anymore.

Savepoint migration: We provide a tool that transforms the savepoint data into a new format. This tool could be a Flink job itself (e.g. using the State Processor API). This provides the highest flexibility as the topology can change completely (at least in theory).

There should be some good logging in place when the upgrade/migration is taking place to be able to track every restoration action, and help debug any potential issues arising from that.

backwards compatibility

A table program that has been written in a previous version behaves the same in the new version

backwards compatibility

A table program that has been written in a previous version behaves the same in the new version. No action of the users or any other modification of the table program is required.

savepoint compatibility

The state stored in a savepoint can still be used to initialize (still existing) operators. 

schema evolution

A column has been renamed, added to, or removed from a table or view. The same applies to a (nested) data type of a column or function that transitively affects the overall dynamic table layout and pipeline.

query evolution

A fundamental change to the query. E.g. adding a filter condition, a different aggregation, an additional join or subquery.

...

Remark to 3: We add a new JSON `Map<String, String>` property to ExecNode to keep track of the node configuration.

Remark to 7: We use ExecNode name and version instead of class names.

Remark to 8 & 9: We will not change the JSON plan format across patch releases. Since every JSON plan is versioned by the compiling Flink version, external tooling can be build around it but without any guarantees from the Flink community across Flink minor/major versions yet. This is future work and can be offered once the plan design has settled.

...


// during debugging it should be possible to already define COMPILE AND EXECUTE PLAN … FOR
// with a proper path but always recompile
SET 'table.plan.restore.force-recompile'='true';

// -------------------------
// not in the first version
// -------------------------

// drop a plan
DROP PLAN '/mydir/plan.json';

// Perform plan migration in the future.
// This statement has been added for completeness. Users will need to execute it 
// when we instruct them to do so in the release notes. Plan migration will be one
// way of dropping legacy code in Flink before we have savepoint migration.
COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

...

@ExecNodeMetadata(
  // main information
  name = "stream-exec-sink",
  version = 1,
  // maintenance information for us/the community and our tests
  added = FlinkVersion.1_15,
consumedOptions = {"table.exec.sink.not-null-enforcer", "table.exec.sink.upsert-materialize"} producedOperators = {"not-null-enforcer", "upsert-materializer"} supportedPlanChangesminPlanVersion = {FlinkVersion.1_15, FlinkVersion.1_16}, supportedSavepointChangesminStateVersion = {FlinkVersion.1_15, FlinkVersion.1_16})

Note: Both persisted entities plan and state can evolve independently from each other which is why both reference a Flink version. A max/end version is not required as we would drop the annotation in this case.

An ExecNode might declare multiple annotations with different versions. See example below.

Completeness tests can verify restore tests exist for this Flink version savepoint
PropertyDescription
name

For JSON serialization/deserialization and uid() generation. Clearly identifies the ExecNode together with version.

PropertyDescription
name

For JSON serialization/deserialization and uid() generation.

version

For JSON serialization/deserialization and uid() generation.

added

Clearly identifies the ExecNode together with name.

consumedOptions

Hard coded list of keys in the Flink version when the ExecNode was added. Does not reference instances in the ExecutionConfigOption class in case those get refactored.

Completeness tests can verify that every option is set once in restore and change detection tests.

Completeness tests can verify that the ExecutionConfigOptions class still contains an option (via key or fallback key) for the given key.

Restore can verify whether the restored ExecNode config map contains only options of the given keys.

producedOperators

Set of operator names that can be part of the resulting Transformations.

Restore and completeness tests can verify there exists at least one test that adds each operator and that the created Transformations contain only operators with `uid`s containing the given operator names.

The concrete combinations or existence of these operators in the final pipeline depends on various parameters (both configuration and ExecNode-specific arguments such as interval size etc.).

minPlanVersion

supportedPlanChanges
(optional for the first version)

Used for plan validation and potentially plan migration.

Updates when the JSON for the ExecNode changes: e.g. after adding an attribute to the JSON spec of the ExecNode.

The annotation does not need to be updated for every Flink version. As the name suggests it is about the "Changes" (in other words: incompatibilities) that require migrationminimum" version for a restore. If the minimum version is higher than the current Flink version, plan migration is necessary.

Changing this version will always result in a new ExecNode version.

Plan migration tests can use this information.

Completeness tests can verify that restore tests exist for all JSON plan variations.

supportedSavepointChangesminStateVersion

Used for operator and potentially savepoint migration.

Updates when whenever the state layout of an operator in the ExecNode changes. But In some cases, the operator can perform state migration. If the minimum version is higher than the current Flink version, savepoint migration is necessary.

Changing this version will always result in a new ExecNode versionThe annotation does not need to be updated for every Flink version. As the name suggests it is about "Changes" (in other words: incompatibilities) that require migration.

Restore tests can verify that operator migration works for all listed Flink savepoint versionsstate versions.

Completeness tests can verify that restore tests exist for all state variations.

Annotation Example

1. Let's assume we introduced two ExecNodes A and B in Flink 1.15.

The initial annotations in 1.15 are:

Operator State Change

1. Introduction of a new ExecNode A in Flink 1.15

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

2. The state layout of A changed slightly in Flink 1.16. The topology is the same but an operator will use a new state layout in the future:

class X extends ProcessFunction {
  ValueState<A> oldStateLayout;
  ValueState<B> newStateLayout;

  open() {
    if (oldStateLayout.get() != null) {
      performOperatorMigration();
    }
    useNewStateLayout();
  }
}

Every state change will increase the ExecNode version. Since the operator supports both state layouts, adding an additional annotation is enough.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNode extends ExecNode {...}

New plans will use the new ExecNode version 2. Operators will perform operator migration under the hood in the meantime.


3. We would like to drop the support of the old state layout and assume that most operator states have been migrated by now. In this example, we assume already Flink 1.17 for dropping the legacy.

@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNode extends ExecNode {...}

ExecNode version 1 is not supported anymore. Even though the state is actually compatible. The plan restore will fail with a helpful exception that forces users to perform plan migration.

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The plan migration will safely replace the old version 1 with 2. The JSON plan flinkVersion changes to 1.17.

Plan Change

1. Introduction of a new ExecNode A in Flink 1.15

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}


2. The JSON node of ExecNode A gets an additional property in Flink 1.16:

Before:

{
some-prop: 42
}

After:

{
some-prop: 42,
some-flag: false
}

Every plan change will increase the ExecNode version. Since the ExecNode supports both plan layouts, adding an additional annotation is enough.

@ExecNodeMetadata(name=A, version=1, minPlanVersion@ExecNodeMetadata(name=A, supportedPlanChanges=1.15, supportedSavepointChangesminStateVersion=1.15)
@ExecNodeMetadata(name=BA, version=2, supportedPlanChangesminPlanVersion=1.1516, supportedSavepointChangesminStateVersion=1.15)

2. We change an operator state of B in Flink 1.16. We perform the change in the operator of B in a way to support both state layouts. Thus, no need for a new ExecNode version. The versions in the annotations are "start version"s. An end version would mean that we drop the ExecNode from the code base and, thus, must not be part of the annotation.

class MyExecNode extends ExecNode {...}

New plans will use the new ExecNode version 2.


3. We would like to drop the support of the old plan layout. In this example, we assume already Flink 1.17 for dropping the legacy.The annotations in 1.16 are:

@ExecNodeMetadata(name=A, version=2, supportedPlanChanges=1.15, supportedSavepointChanges=1.15)
@ExecNodeMetadata(name=B, supportedPlanChanges=1.15, supportedSavepointChanges=1.15, 1.16)

3. In Flink 1.18, we assume that all pipelines have upgraded. So drop support for operator migration in B:

...

minPlanVersion=1.16, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

ExecNode version 1 is not supported anymore. Even though the state is actually compatible. The plan restore will fail with a helpful exception that forces users to perform plan migration.

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The plan migration will safely insert the new default value for `some-flag`. The JSON plan flinkVersion changes to 1.17.

Topology Change

Note that this is likely the most common case when performing a change.

1. Introduction of a new ExecNode A in Flink 1.15

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}


2. The state layout and topology of A changed fundamentally in Flink 1.16.

We will maintain two ExecNode classes with different versions.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNodeVersion1 extends ExecNode {...}

@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNodeVersion2 extends ExecNode {...}

If the JSON format has not changed for version 1 and 2, we could keep minPlanVersion=1.15 in version 2. This might be useful for users to replace a faulty ExecNode implementation and accepting loosing state. Otherwise, we can set minPlanVersion=1.16

Note:

A 1.15 JSON plan can still be used for all Flink versions 1.15, 1.16, 1.17 and even 1.18. It doesn't need to be recompiled or migrated.

However, a restore from a 1.15 savepoint would fail in 1.18 during runtime. We cannot check this eagerly based on the JSON plan itself.

4. We can recommend performing a plan upgrade in the release notes of 1.17:

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The JSON plan flinkVersion changes to 1.17.

The annotations in 1.18 are:

...

Now we can eagerly throw an exception because the plan Flink version must be at least 1.17 for B.

Note: Of course this is only an example and supportedPlanChanges=1.16 would have been enough. It only illustrates that both persisted entities plan and savepoint can evolve independently from each other.

Restore Tests

We will come up with a new testing infrastructure that has

...

Additional parameter for window table functions
https://github.com/apache/flink/pull/16215
→ Optional parameter for ExecNode. No new ExecNode version necessary.

Additional parameter for sink nodes
https://github.com/apache/flink/pull/17699

Compatibility, Deprecation, and Migration Plan

...