You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

PLACEHOLDER PAGE - WILL BE FILLED SHORTLY

Status

Current state: "Under Discussion"

Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: <Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Nowadays, the Table & SQL API is as important to Flink as the DataStream API. It is one of the main abstractions for expressing pipelines that perform stateful stream processing. Users expect the same backwards compatibility guarantees when upgrading to a newer Flink version as with the DataStream API:

https://ci.apache.org/projects/flink/flink-docs-master/docs/ops/upgrading/

Example:

A user would like to use SQL which has some lifetime aggregates and needs to upgrade their Flink cluster from one minor version (1.15) to the next minor version (1.16).

In particular, this means:

  • once the operator topology is defined, it remains static and does not change between Flink versions, unless resulting in better performance,
  • business logic (defined using expressions and functions in queries) behaves identical as before the version upgrade,
  • the state of a Table & SQL API program can be restored from a savepoint of a previous version,
  • adding or removing stateful operators should be made possible in the DataStream API.

However, since a planner with rule-based and cost-based optimization is involved in finding the final execution plan, every rule, function, connector and format change could potentially introduce a completely different topology.

For example: for more efficient execution, we introduce a new rule that pushes a filter through an operator O. Since the filter columns are not required by O anymore, the schema would change. However, a completely different plan might be chosen in the end due to improvements in the cost estimation in a new Flink version. In short: It is difficult to ensure savepoint compatibility in such a dynamic topology creation process.

Terminology:

For clarification, we distinguish between the following terms:

TermDefinition
table program

List of statements that configure the session, connect to catalogs, register (temporary) catalog objects, define and submit one or more pipelines.

A table program can be expressed using Table API in Java or could be a multi-statement SQL script for SQL Client.

pipeline

A pipeline is a DAG that consists of one or more (potentially disconnected) source-to-sink dataflows.

Statement sets allow for n:n source-to-sink dataflows. A pipeline is compiled into a single JobGraph.

See also here.

state

Any value, list, or map member of an operator that is managed by Flink. Each state is snapshotted using the `operator uid + state name` into a savepoint.

During restoration, state is mapped back to any kind of operator (even if the topology changed) using those two components.

upgrade

The change from one Flink minor version to another. For example, from 1.13 to 1.14, or 1.MAX to 2.0.

A patch version change (e.g. 1.13.2 to 1.13.3) is not considered an upgrade and has already been supported before this FLIP.

For simplification of the design, we assume that upgrades use a step size of a single minor version. We don't guarantee skipping minor versions (e.g. 1.11 to 1.14). Nevertheless, we might support 2-3 releases on a best-effort basis.

An upgrade assumes that only the Flink version has changed. All pipeline defining parameters remain constant. In other words: table program, catalog objects, configuration options, and external JAR files have not changed.

migration

Actively transforms entities from one format to the other. A migration can happen on different layers. After migration, the format can be read by the next generation. Thus, the old generation is not necessary anymore.

There are four kinds of migration:

State serializer migration: In case both operator and state name have not changed, it is possible to upgrade the state format using the migration capabilities of TypeSerializer with TypeSerializerSnapshot.

Operator migration: An operator declares both the new state and old state with different state names. The initialization method of the operator accesses the old state and actively moves data into the new state format. Afterwards, the old state is empty and thus could be removed from the new savepoint metadata (given such a functionality exists, which is future work). The operator implementation could look completely different, but as long as `operator uid + state name` match, the migration is possible. 

Plan migration: We transform the JSON plan (introduced later in this document) into a new plan. For example, to remove legacy attributes that are not necessary in a new JSON plan layout. This has no impact on the savepoint itself but on older generations of the JSON parsing and validation logic that can drop legacy attributes. Also, if two operators have been fused into a single one in later versions, the plan can be migrated such that it doesn't reference the two legacy operators anymore.

Savepoint migration: We provide a tool that transforms the savepoint data into a new format. This tool could be a Flink job itself (e.g. using the State Processor API). This provides the highest flexibility as the topology can change completely (at least in theory).

backwards compatibility

A table program that has been written in a previous version behaves the same in the new version. No action of the users or any other modification of the table program is required.

savepoint compatibility

The state stored in a savepoint can still be used to initialize (still existing) operators. 

schema evolution

A column has been renamed, added to, or removed from a table or view. The same applies to a (nested) data type of a column or function that transitively affects the overall dynamic table layout and pipeline.

query evolution

A fundamental change to the query. E.g. adding a filter condition, a different aggregation, an additional join or subquery.

Scoping

The scope of this FLIP can be summarized as follows using the terminology above:

  • Make upgrades possible that enable savepoint restoration and execution of pipelines that were defined as part of a table program from a previous version.
  • A new Flink version should be backwards compatible with the previous version. If the community decides that the semantics of the previous version were incorrect and a change only affects a small group of users, we reserve the right to break backwards compatibility for the greater good. This policy kind of matches with what we also did in the past regarding SQL semantics and API stability.
  • Schema and query evolution are out of scope.
  • Migration of any kind is future work. We will focus on state serializer and operator migrations and potentially plan migrations in the first versions after 1.15 if necessary.

Important: We will mark the first version of this FLIP probably in 1.15 as a BETA feature. The community needs to learn how to evolve the code base with these hard constraints. 1.16 will be the first version with a restore scenario. If this turns out to be successful with no or minor issues, we can mark this feature stable in 1.17.

Full Example SQL

A pure SQL example in SQL Client could look like this:

-- use temporary objects
CREATE TEMPORARY TABLE clicks (
  user_id BIGINT,
  page_id BIGINT,
  viewtime TIMESTAMP
) WITH (
  'connector' = 'kafka',
  'topic' = 'clicks',
  'properties.bootstrap.servers' = '...',
  'format' = 'avro'
);

-- set configuration
SET 'parallism.default' = '10';
SET 'pipeline.name' = 'my_flink_job';

-- compile the pipeline into a plan that can be executed
COMPILE AND EXECUTE '/my/path/my_flink_job.json' FOR STATEMENT SET
BEGIN

  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;

END;

Full Example Table API

A complex Table API with DataStream API as virtual source and sink could look like this:

// Setup environments
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create a custom DataStream
final DataStream<Row> inputStream = env.fromElements(
  Row.of(RowKind.UPDATE_AFTER, "Alice"),
  Row.of(RowKind.UPDATE_AFTER, "Bob"));

// Assign a unique uid() for the stream
inputStream.uid("my-stream");

// Read JSON plan file or compile + write it first
// We use a Supplier here but of course this can also be completely custom logic
CompiledPlan plan = CompiledPlan.fromJsonFile(
  "/my/path/my_flink_job.json",
  () -> {
    tableEnv
      .fromDataStream(inputStream)
      .select($("f0").count())
      // the caller will write out this plan into a file
      .compilePlan(); 
  });

DataStream<Row> outputStream = tableEnv.asDataStream(plan, Row.class);

outputStream.executeAndCollect();

Basic Design / Current Internal Design

The basic design has already been introduced in Flink 1.14. It is based on a JSON plan that serializes the optimized execution plan and is able to restore a pipeline from that. The JSON plan must be stored explicitly next to the table program.

Internal prototypes and products have shown that the JSON plan is already useful and only needs minor improvements before making it publicly available and a long-term feature for Flink users.

A JSON plan example for the following program can be found in the Appendix.

TableEnvironment tableEnv = TableEnvironment.create(EnvironmentSettings.inStreamingMode());
System.out.println(
        tableEnv.explainSql(
                "SELECT word, SUM(frequency) AS `count`\n"
                        + "FROM (\n"
                        + "  VALUES ('Hello', 1), ('Ciao', 1), ('Hello', 2)\n"
                        + ")\n"
                        + "AS WordTable(word, frequency)\n"
                        + "GROUP BY word"));

== Abstract Syntax Tree ==
LogicalAggregate(group=[{0}], count=[SUM($1)])
+- LogicalValues(tuples=[[{ _UTF-16LE'Hello', 1 }, { _UTF-16LE'Ciao', 1 }, { _UTF-16LE'Hello', 2 }]])

== Optimized Physical Plan ==
GroupAggregate(groupBy=[word], select=[word, SUM(frequency) AS count])
+- Exchange(distribution=[hash[word]])
   +- Values(type=[RecordType(VARCHAR(5) word, INTEGER frequency)], tuples=[[{ _UTF-16LE'Hello', 1 }, { _UTF-16LE'Ciao', 1 }, { _UTF-16LE'Hello', 2 }]])

== Optimized Execution Plan ==
GroupAggregate(groupBy=[word], select=[word, SUM(frequency) AS count])
+- Exchange(distribution=[hash[word]])
   +- Values(tuples=[[{ _UTF-16LE'Hello', 1 }, { _UTF-16LE'Ciao', 1 }, { _UTF-16LE'Hello', 2 }]])


The optimized execution plan is a graph of ExecNode. An ExecNode is a static template for creating Transformations.

An ExecNode might translate into multiple Transformations with various operators.

For example, the ExecNode for changelog sinks (StreamExecSink) might consist of a NotNullEnforcer, KeyBy, and SinkUpsertMaterializer transformation.

Sources and sinks will be restored using factories.


See also:

https://issues.apache.org/jira/browse/FLINK-20435 (Introduction of the ExecNode layer)

https://issues.apache.org/jira/browse/FLINK-21091 (Initial version of a JSON plan for all nodes)

ExecNodes

Currently, we list the following ExecNodes:

Supported:

StreamExecCalc
StreamExecChangelogNormalize
StreamExecCorrelate
StreamExecDeduplicate
StreamExecDropUpdateBefore
StreamExecExchange
StreamExecExpand
StreamExecGlobalGroupAggregate
StreamExecGlobalWindowAggregate
StreamExecGroupAggregate
StreamExecGroupWindowAggregate
StreamExecIncrementalGroupAggregate
StreamExecIntervalJoin
StreamExecJoin
StreamExecLimit
StreamExecLocalGroupAggregate
StreamExecLocalWindowAggregate
StreamExecLookupJoin
StreamExecMatch
StreamExecMiniBatchAssigner
StreamExecOverAggregate
StreamExecPythonCalc
StreamExecPythonCorrelate
StreamExecPythonGroupAggregate
StreamExecPythonGroupWindowAggregate
StreamExecPythonOverAggregate
StreamExecRank
StreamExecSink
StreamExecSortLimit
StreamExecTableSourceScan
StreamExecTemporalJoin
StreamExecTemporalSort
StreamExecUnion
StreamExecValues
StreamExecWatermarkAssigner
StreamExecWindowAggregate
StreamExecWindowJoin
StreamExecWindowRank
StreamExecWindowTableFunction

Unsupported yet:

StreamExecGroupTableAggregate
StreamExecDataStreamScan
StreamExecPythonGroupTableAggregate
StreamExecSort
StreamExecMultipleInput

Limitations:

Some additional limitations of the current design:

  1. Batch ExecNodes cannot be serialized into a JSON plan.
  2. Only SQL INSERT INTO queries and STATEMENT SET are supported.
  3. Table API is not supported. Including `TableResult#collect()`.
  4. Bridging from and to DataStream API is not supported.
  5. Catalog tables are serialized into the plan. The catalog is not read again after an upgrade for tables.
  6. User functions are serialized into the plan using Java serialization.
  7. Built-in functions are not versioned and will be looked up in the current built-in catalog again after an upgrade.
  8. JSON plan is quite verbose. It can grow quickly even for smaller queries.

Proposed Changes

We propose the following changes to the current design in order to reduce friction and provide a consistent API experience:

  1. Expose the JSON plan concept to users.
  2. Support generating and providing a JSON plan in every part of the API where the optimizer is triggered.
    sqlQuery().execute().collect(), Table.executeInsert(), toChangelogStream(), etc.
  3. Consider multi-statement and mixed DataStream API/Table API pipelines.
  4. Expose the JSON plan with SQL syntax.
  5. Make the JSON plan independent of any class/package name.
    no Java serialization anywhere, no "o.a.f.t.planner.plan.nodes.exec.stream.*" or other implementation details, no MapView/ListView stuff
  6. Come up with a simple rule of thumb for the user when a JSON plan cannot be generated.
  7. Let users configure strategies how to deal with catalog objects
    serialize them into the plan, serialize them partially into the plan, ignore plan info during restore
  8. Version functions as well
  9. Have well-defined uid() for all operators to use a Table API program with DataStream API

Batch nodes are not considered in this design.

General JSON Plan Assumptions


ddd

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • Binary log format

  • The network protocol and api behavior

  • Any class in the public packages under clientsConfiguration, especially client configuration

    • org/apache/kafka/common/serialization

    • org/apache/kafka/common

    • org/apache/kafka/common/errors

    • org/apache/kafka/clients/producer

    • org/apache/kafka/clients/consumer (eventually, once stable)

  • Monitoring

  • Command line tools and arguments

  • Anything else that will likely break existing users in some way when they upgrade

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users?
  • If we are changing behavior how will we phase out the older behavior?
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels