Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

TermDefinition
table program

List of statements that configure the session, connect to catalogs, register (temporary) catalog objects, define and submit one or more pipelines.

A table program can be expressed using Table API in Java or could be a multi-statement SQL script for SQL Client.

pipeline

A pipeline is a DAG that consists of one or more (potentially disconnected) source-to-sink dataflows.

Statement sets allow for n:n source-to-sink dataflows. A pipeline is compiled into a single JobGraph.

See also here.

state

Any value, list, or map member of an operator that is managed by Flink. Each state is snapshotted using the `operator uid + state name` into a savepoint.

During restoration, state is mapped back to any kind of operator (even if the topology changed) using those two components.

upgrade

The change from one Flink minor version to another. For example, from 1.15 to 1.17, or 1.17 to 1.18.

A patch version change (e.g. 1.13.2 to 1.13.3) is not considered an upgrade and has already been supported before this FLIP.

Upgrading usually involves work which is why many users perform this task rarely (e.g. only once per year). Also skipping a versions is common until a new feature has been introduced for which is it worth to upgrade. We will support the upgrade to the most recent Flink version from a set of previous versions. We aim to support upgrades from the last 2-3 releases on a best-effort basis but maybe even more depending on the maintenance overhead. However, in order to not grow the testing matrix infinitely and to perform important refactoring if necessary, we only guarantee upgrades with a step size of a single minor version (i.e. a cascade of upgrades).

As it is common when skipping versions, we still recommend users to check the release notes and perform migration if instructed to do so.

In other words: A user can upgrade between minors and all following minors. The goal is: The same query can remain up and running. E.g. a user upgrades from 1.15 to 1.16, and then from 1.16 to 1.17 and can expect the original query to work without recomputing the data or the plan from the original SQL. This necessarily means that at some point in future releases we'll need some basic "migration tool" to keep the queries up and running, ending up modifying the compiled plan (see also COMPILE PLAN ... FROM ...) or savepoint.

An upgrade assumes that only the Flink version has changed. All pipeline defining parameters remain constant. In other words: table program, catalog objects, configuration options, and external JAR files have not changed.

migration

Actively transforms entities from one format to the other. A migration can happen on different layers. After migration, the format can be read by the next generation. Thus, the old generation is not necessary anymore.

There are four kinds of migration:

State serializer migration: In case both operator and state name have not changed, it is possible to upgrade the state format using the migration capabilities of TypeSerializer with TypeSerializerSnapshot.

Operator migration: An operator declares both the new state and old state with different state names. The initialization method of the operator accesses the old state and actively moves data into the new state format. Afterwards, the old state is empty and thus could be removed from the new savepoint metadata (given such a functionality exists, which is future work). The operator implementation could look completely different, but as long as `operator uid + state name` match, the migration is possible. 

Plan migration: We transform the JSON plan (introduced later in this document) into a new plan. For example, to remove legacy attributes that are not necessary in a new JSON plan layout. This has no impact on the savepoint itself but on older generations of the JSON parsing and validation logic that can drop legacy attributes. Also, if two operators have been fused into a single one in later versions, the plan can be migrated such that it doesn't reference the two legacy operators anymore.

Savepoint migration: We provide a tool that transforms the savepoint data into a new format. This tool could be a Flink job itself (e.g. using the State Processor API). This provides the highest flexibility as the topology can change completely (at least in theory).

There should be some good logging in place when the upgrade/migration is taking place to be able to track every restoration action, and help debug any potential issues arising from that.

backwards compatibility

A table program that has been written in a previous version behaves the same in the new version. No action of the users or any other modification of the table program is required.

savepoint compatibility

The state stored in a savepoint can still be used to initialize (still existing) operators. 

schema evolution

A column has been renamed, added to, or removed from a table or view. The same applies to a (nested) data type of a column or function that transitively affects the overall dynamic table layout and pipeline.

query evolution

A fundamental change to the query. E.g. adding a filter condition, a different aggregation, an additional join or subquery.

...