Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

2. We change an operator state of B in Flink 1.16. We perform the change in the operator of B in a way to support both state layouts. Thus, no need for a new ExecNode version. The versions in the annotations are "start version"s. An end version would mean that we drop the ExecNode from the code base and, thus, must not be part of the annotation.

The annotations in 1.16 are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15, supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15, supportedSavepointChanges=1.15, 1.16)

So the versions in the annotations are "start version"s. End version would mean that we drop the ExecNode from the code base.


3. In Flink 1.18, we assume that all pipelines have upgraded. So drop support for operator migration in B:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15, supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.15, supportedSavepointChanges=1.16)


Note:

A 1.15 JSON plan can still be used for all Flink versions 1.15, 1.16, 1.17 and even 1.18. It doesn't need to be recompiled or migrated.

However, a restore from a 1.15 savepoint would fail in 1.18 during runtime. We cannot check this eagerly based on the JSON plan itself.


4. We can recommend performing a plan upgrade in the release notes of 1.17:

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The JSON plan flinkVersion changes to 1.17.


The annotations in 1.18 are:

@ExecNodeMetadata(name=A, supportedPlanChanges=1.15, supportedSavepointChanges=1.15)

@ExecNodeMetadata(name=B, supportedPlanChanges=1.17, supportedSavepointChanges=1.16)

Now we can eagerly throw an exception because the plan Flink version must be at least 1.17 for B. Of course this is only an example and supportedPlanChanges=1.16 would have been enough.

Restore Tests

We will come up with a new testing infrastructure that has

...