Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For clarification, we distinguish between the following terms:

TermDefinition
table program

List of statements that configure the session, connect to catalogs, register (temporary) catalog objects, define and submit one or more pipelines.

A table program can be expressed using Table API in Java or could be a multi-statement SQL script for SQL Client.

pipeline

A pipeline is a DAG that consists of one or more (potentially disconnected) source-to-sink dataflows.

Statement sets allow for n:n source-to-sink dataflows. A pipeline is compiled into a single JobGraph.

See also here.

state

Any value, list, or map member of an operator that is managed by Flink. Each state is snapshotted using the `operator uid + state name` into a savepoint.

During restoration, state is mapped back to any kind of operator (even if the topology changed) using those two components.

upgrade

The change from one Flink minor version to another. For example, from 1.13 to 1.14, or 1.MAX to 2.0.

A patch version change (e.g. 1.13.2 to 1.13.3) is not considered an upgrade and has already been supported before this FLIP.

For simplification of the design, we assume that upgrades use a step size of a single minor version. We don't guarantee skipping minor versions (e.g. 1.11 to 1.14). Nevertheless, we might support 2-3 releases on a best-effort basis.

An upgrade assumes that only the Flink version has changed. All pipeline defining parameters remain constant. In other words: table program, catalog objects, configuration options, and external JAR files have not changed.

migration

Actively transforms entities from one format to the other. A migration can happen on different layers. After migration, the format can be read by the next generation. Thus, the old generation is not necessary anymore.

There are four kinds of migration:

State serializer migration: In case both operator and state name have not changed, it is possible to upgrade the state format using the migration capabilities of TypeSerializer with TypeSerializerSnapshot.

Operator migration: An operator declares both the new state and old state with different state names. The initialization method of the operator accesses the old state and actively moves data into the new state format. Afterwards, the old state is empty and thus could be removed from the new savepoint metadata (given such a functionality exists, which is future work). The operator implementation could look completely different, but as long as `operator uid + state name` match, the migration is possible. 

Plan migration: We transform the JSON plan (introduced later in this document) into a new plan. For example, to remove legacy attributes that are not necessary in a new JSON plan layout. This has no impact on the savepoint itself but on older generations of the JSON parsing and validation logic that can drop legacy attributes. Also, if two operators have been fused into a single one in later versions, the plan can be migrated such that it doesn't reference the two legacy operators anymore.

Savepoint migration: We provide a tool that transforms the savepoint data into a new format. This tool could be a Flink job itself (e.g. using the State Processor API). This provides the highest flexibility as the topology can change completely (at least in theory).

backwards compatibility

A table program that has been written in a previous version behaves the same in the new version. No action of the users or any other modification of the table program is required.

savepoint compatibility

The state stored in a savepoint can still be used to initialize (still existing) operators. 

schema evolution

A column has been renamed, added to, or removed from a table or view. The same applies to a (nested) data type of a column or function that transitively affects the overall dynamic table layout and pipeline.

query evolution

A fundamental change to the query. E.g. adding a filter condition, a different aggregation, an additional join or subquery.

Scoping

The scope of this FLIP can be summarized as follows using the terminology above:

...

  1. Modules are not part of the JSON plan. They must have been loaded as before.
  2. Most of the table/session configuration is not part of the JSON plan. It must be the same as before. This is important for table factories and UDFs where we don't know what they have consumed.
  3. Configuration consumed by ExecNode should be persisted if it influences the topology or has runtime implications (i.e. not just forwarding the value but using it for calculation). (e.g. `table.exec.sink.not-null-enforcer`). In other words: it is not necessary to persist code gen options but we should persist `table.exec.` options.
  4. Temporary DDL statements (to init the table environment and then make sure the temporary UDFs and connectors can be found) are not part of the JSON plan.
  5. We do not version types. We assume that the type system is stable or will only be extended by more types but not modified.
  6. The origin of every catalog object is stored with the object identifier in the JSON plan. However, we cannot assume that all catalog metadata stays constant as a catalog might be shared across teams. We can only assume that the catalog instance and type (e.g. Hive catalog) stays constant for a session to provide a table factory.
  7. The JSON plan contains no implementation details (i.e. no classes) and is fully declarative.
  8. The JSON plan is internal to Flink. We don't support external changes to the plan officially. Flink will not perform any kind of validation or consistency checks during restore.
  9. We do not version connectors and formats. If a connector adds a new ability, we can simply not apply this ability for older plans. Removing an ability is a problem and would require a correcting operator afterwards.
  10. The JSON plan is versioned by Flink version. However, the JSON plan format must not change between patch releases.

Remark to 3: We should use hints to change the semantic behavior of sources and sinks instead of global config options. We should avoid having semantic meaning in global config options instead of in the query (and thus in the JSON plan). See also FLINK-24254.

Remark to 3: We add a new JSON `Map<String, String>` property to ExecNode to keep track of the node configuration.

Remark to 8: We will not change the JSON plan format across patch releases. Since every JSON plan is versioned by the compiling Flink version (see assumption 10), external tooling can be build around it but without any guarantees from the Flink community across Flink minor/major versions yet.

Remark to 9: A plan migration could add such an operator. Alternatively, the connector properties could enable/disable this ability for backward compatibility. We can use catalog table enrichment or plan migration to add a new connector property.

...

Note: We separate compile and restore options for flexibility. The default values should aim to make most beginner users happy.

OptionTypeDescription

table.plan.compile.catalog-objects

Enum

ALL (Default): All catalog metadata about a table is persisted: Schema + Identifier + Options
The catalog's "metadata" must not be available anymore. It makes the initial experience better with in-memory catalogs (e.g. building demos using SQL Client). However, the catalog "instance" itself must still be available in case a catalog provides a custom table factory. For commercial versions, it might make sense to set this value to `IDENTIFIER`, because in most cases, enterprises have a persistent catalog and lock changes there.

SCHEMA: Only basic catalog metadata for validation is persisted: Schema + Identifier. This allows validation in case the external catalog table has changed. Options will later require a read from the catalog again.

IDENTIFIER: Only the object identifier is persisted. Reduces the size of the plan. A job may contain dozens of connectors and a table may contain hundreds of fields. The user needs to make sure that catalog tables don't change. Changes of the schema could lead to hard to debug mismatch exceptions during restore. Also options that influence the support of the connector abilities could lead to exceptions.

table.plan.restore.catalog-objects

Enum

ALL (Default): All catalog info is read from the JSON plan. If information is missing, a lookup will take place. Also enrichment of forwarded options still takes place (see below). If the metadata is not available anymore, the plan can still be executed.

ALL_ENFORCED: All catalog info must be present in the JSON plan. No lookup.

IDENTIFIER: Everything is read from the catalog again. Changes of the schema or options could lead to hard to debug mismatch exceptions. 

table.plan.restore.enrich-table-options

Boolean

By default (true), we always perform a catalog lookup and enrich the plan options with catalog options. In this case, catalog options have precedence. If false, we only use the JSON plan options. 


For the latter option, the factory is the only entity able to judge whether a change causes a modification of semantics or topology. For example, the factory knows how a `topic` option for Kafka is forwarded and can allow a name change that doesn't have any side effects for the source abilities or schema.

...

Persisted catalog functions are stored by object identifier and class. Similar options as to catalog tables apply:

OptionTypeDescription

table.plan.compile.catalog-objects

Enum

ALL (Default): All catalog metadata about a function is persisted: Identifier + Class.
The catalog's "metadata" must not be available anymore.

SCHEMA: Same with IDENTIFIER because we store the schema anyway.

IDENTIFIER: Only the object identifier is persisted. The user needs to make sure that catalog functions signatures don't change. The class can change.

table.plan.restore.catalog-objects

Enum

ALL (Default): All catalog info is read from the JSON plan. If information is missing, a lookup will take place.

ALL_ENFORCED: All catalog info must be present in the JSON plan. No lookup.

IDENTIFIER: Everything is read from the catalog again.


We only version built-in system functions, not user-defined/catalog ones.

...

@ExecNodeMetadata(
  // main information
  name = "stream-exec-sink",
  version = 1,
  // maintenance information for us/the community and our tests
  added = FlinkVersion.1_15,
consumedOptions = {"table.exec.sink.not-null-enforcer", "table.exec.sink.upsert-materialize"} producedOperators = {"not-null-enforcer", "upsert-materializer"} supportedPlanChanges = {FlinkVersion.1_15, FlinkVersion.1_16}, supportedSavepointChanges = {FlinkVersion.1_15, FlinkVersion.1_16})


PropertyDescription
name

For JSON serialization/deserialization and uid() generation.

version

For JSON serialization/deserialization and uid() generation.

added

Completeness tests can verify restore tests exist for this Flink version savepoint.

consumedOptions

Hard coded list of keys in the Flink version when the ExecNode was added. Does not reference instances in the ExecutionConfigOption class in case those get refactored.

Completeness tests can verify that every option is set once in restore and change detection tests.

Completeness tests can verify that the ExecutionConfigOptions class still contains an option (via key or fallback key) for the given key.

Restore can verify whether the restored ExecNode config map contains only options of the given keys.

producedOperators

Set of operator names that can be part of the resulting Transformations.

Restore and completeness tests can verify there exists at least one test that adds each operator and that the created Transformations contain only operators with `uid`s containing the given operator names.

The concrete combinations or existence of these operators in the final pipeline depends on various parameters (both configuration and ExecNode-specific arguments such as interval size etc.).

supportedPlanChanges
(optional for the first version)

Used for plan migration.

Updates when the JSON for the ExecNode changes: e.g. after adding an attribute to the JSON spec of the ExecNode.

Plan migration tests can use this information.

Completeness tests can verify that restore tests exist for all JSON plan variations.

supportedSavepointChanges

Used for operator migration.

Restore tests can verify that operator migration works for all listed Flink savepoint versions.

Restore Tests

We will come up with a new testing infrastructure that has

...