Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Note: Both persisted entities plan and state can evolve independently from each other which is why both reference a Flink version. A max/end version is not required as we would drop the annotation in this case.

An ExecNode might declare multiple annotations with different versions. See example below.

...

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

2. The state layout of A changed slightly in Flink 1.16. The topology is the same but an operator will use a new state layout in the future:

...

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.15, minStateVersion=1.16)
class MyExecNode extends ExecNode {...}

New plans will use the new ExecNode version 2. Operators will perform operator migration under the hood in the meantime.


3.  

1. Let's assume we introduced two ExecNodes A and B in Flink 1.15.

The initial annotations in 1.15 are:We would like to drop the support of the old state layout and assume that most operator states have been migrated by now. In this example, we assume already Flink 1.17 for dropping the legacy.

@ExecNodeMetadata(name=A, version=2, supportedPlanChangesminPlanVersion=1.15, supportedSavepointChangesminStateVersion=1.1516)
@ExecNodeMetadata(name=B, supportedPlanChanges=1.15, supportedSavepointChanges=1.15)

2. We change an operator state of B in Flink 1.16. We perform the change in the operator of B in a way to support both state layouts. Thus, no need for a new ExecNode version. The versions in the annotations are "start version"s. An end version would mean that we drop the ExecNode from the code base and, thus, must not be part of the annotation.

class MyExecNode extends ExecNode {...}

ExecNode version 1 is not supported anymore. Even though the state is actually compatible. The plan restore will fail with a helpful exception that forces users to perform plan migration.

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The plan migration will safely replace the old version 1 with 2. The JSON plan flinkVersion changes to 1.17.

Plan Change

1. Introduction of a new ExecNode A in Flink 1.15The annotations in 1.16 are:

@ExecNodeMetadata(name=A, supportedPlanChangesversion=1.15, supportedSavepointChangesminPlanVersion=1.15)
@ExecNodeMetadata(name=B, supportedPlanChangesminStateVersion=1.15, supportedSavepointChanges=1.15, 1.16)

3. In Flink 1.18, we assume that all pipelines have upgraded. So drop support for operator migration in B:

...

)
class MyExecNode extends ExecNode {...}


2. The JSON node of ExecNode A gets an additional property in Flink 1.16:

Before:

{
some-prop: 42
}

After:

{
some-prop: 42,
some-flag: false
}

Every plan change will increase the ExecNode version. Since the ExecNode supports both plan layouts, adding an additional annotation is enough.

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.16, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

New plans will use the new ExecNode version 2.


3. We would like to drop the support of the old plan layout. In this example, we assume already Flink 1.17 for dropping the legacy.

@ExecNodeMetadata(name=A, version=2, minPlanVersion=1.16, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}

ExecNode version 1 is not supported anymore. Even though the state is actually compatible. The plan restore will fail with a helpful exception that forces users to perform plan migration.

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The plan migration will safely insert the new default value for `some-flag`. The JSON plan flinkVersion changes to 1.17.

Topology Change

Note that this is likely the most common case when performing a change.

1. Introduction of a new ExecNode A in Flink 1.15

@ExecNodeMetadata(name=A, version=1, minPlanVersion=1.15, minStateVersion=1.15)
class MyExecNode extends ExecNode {...}


2. The state layout and topology of A changed fundamentally in Flink 1.16.

We will maintain two ExecNode classes with different versions.

Note:

A 1.15 JSON plan can still be used for all Flink versions 1.15, 1.16, 1.17 and even 1.18. It doesn't need to be recompiled or migrated.

However, a restore from a 1.15 savepoint would fail in 1.18 during runtime. We cannot check this eagerly based on the JSON plan itself.

4. We can recommend performing a plan upgrade in the release notes of 1.17:

COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json';

The JSON plan flinkVersion changes to 1.17.

The annotations in 1.18 are:

@ExecNodeMetadata(name=A, supportedPlanChangesversion=1, minPlanVersion=1.15, supportedSavepointChangesminStateVersion=1.15).15)
class MyExecNodeVersion1 extends ExecNode {...}

@ExecNodeMetadata(name=A, version=B2, supportedPlanChangesminPlanVersion=1.1715, supportedSavepointChangesminStateVersion=1.16)

Now we can eagerly throw an exception because the plan Flink version must be at least 1.17 for B.


class MyExecNodeVersion2 extends ExecNode {...}

If the JSON format has not changed for version 1 and 2 we could keep minPlanVersion=1.15 in version 2. This might be useful for users to replace a faulty ExecNode implementation and accepting loosing state. Otherwise, we can set minPlanVersion=1.16Note: Of course this is only an example and supportedPlanChanges=1.16 would have been enough. It only illustrates that both persisted entities plan and savepoint can evolve independently from each other.

Restore Tests

We will come up with a new testing infrastructure that has

...