Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

General JSON Plan Assumptions

ddd

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

...

Binary log format

...

The network protocol and api behavior

...

Any class in the public packages under clientsConfiguration, especially client configuration

  • org/apache/kafka/common/serialization

  • org/apache/kafka/common

  • org/apache/kafka/common/errors

  • org/apache/kafka/clients/producer

  • org/apache/kafka/clients/consumer (eventually, once stable)

...

Monitoring

...

Command line tools and arguments

The following assumptions should help in documenting the properties of a JSON plan:

  1. Modules are not part of the JSON plan. They must have been loaded as before.
  2. Most of the table/session configuration is not part of the JSON plan. It must be the same as before. This is important for table factories and UDFs where we don't know what they have consumed.
  3. Configuration consumed by ExecNode should be persisted if it influences the topology or has runtime implications (i.e. not just forwarding the value but using it for calculation). (e.g. `table.exec.sink.not-null-enforcer`). In other words: it is not necessary to persist code gen options but we should persist `table.exec.` options.
  4. Temporary DDL statements (to init the table environment and then make sure the temporary UDFs and connectors can be found) are not part of the JSON plan.
  5. We do not version types. We assume that the type system is stable or will only be extended by more types but not modified.
  6. The origin of every catalog object is stored with the object identifier in the JSON plan. However, we cannot assume that all catalog metadata stays constant as a catalog might be shared across teams. We can only assume that the catalog instance and type (e.g. Hive catalog) stays constant for a session to provide a table factory.
  7. The JSON plan contains no implementation details (i.e. no classes) and is fully declarative.
  8. The JSON plan is internal to Flink. We don't support external changes to the plan officially. Flink will not perform any kind of validation or consistency checks during restore.
  9. We do not version connectors and formats. If a connector adds a new ability, we can simply not apply this ability for older plans. Removing an ability is a problem and would require a correcting operator afterwards.

Remark to 3: We should use hints to change the semantic behavior of sources and sinks instead of global config options. We should avoid having semantic meaning in global config options instead of in the query (and thus in the JSON plan). See also FLINK-24254.

Remark to 3: We add a new JSON `Map<String, String>` property to ExecNode to keep track of the node configuration.

Remark to 9: A plan migration could add such an operator. Alternatively, the connector properties could enable/disable this ability for backward compatibility. We can use catalog table enrichment or plan migration to add a new connector property.

Catalog Objects

Temporary objects will not be persisted in the plan. Many of them have no serializable representation. E.g. a temporary table that is backed by a DataStream API pipeline or a table source instance.

For users, it is easier to understand the rule of thumb to just say "temporary objects are not part of the plan" instead of saying "some can, some cannot".

For inline objects (used with registering) within Table API we do best-effort as shown below.

Views

Views are always inlined. View information will be lost and not part of the plan in any way.

Tables

Temporary tables will not be persisted in the plan. They will be represented by an object identifier.

Inline tables (tables with a generated temporary object identifier) are handled differently:

  • an inline table descriptor will be written into the JSON plan
  • DataStream API cannot be persisted in a JSON plan, they need to be registered first


For persisted catalog tables, depending on the use case, some users would like to

  • persist catalog objects entirely in the JSON plan such that catalog metadata is not required anymore,
  • be able to forward properties (e.g. connection credentials) from the catalog to the compiled plan,
  • not have connector specific options in the plan (e.g. have credentials only stored in the catalog, not in JSON files).

Options

Note: We separate compile and restore options for flexibility. The default values should aim to make most beginner users happy.

OptionTypeDescription

table.plan.compile.catalog-objects

Enum

ALL (Default): All catalog metadata about a table is persisted: Schema + Identifier + Options
The catalog's "metadata" must not be available anymore. It makes the initial experience better with in-memory catalogs (e.g. building demos using SQL Client). However, the catalog "instance" itself must still be available in case a catalog provides a custom table factory. For commercial versions, it might make sense to set this value to `IDENTIFIER`, because in most cases, enterprises have a persistent catalog and lock changes there.

SCHEMA: Only basic catalog metadata for validation is persisted: Schema + Identifier. This allows validation in case the external catalog table has changed. Options will later require a read from the catalog again.

IDENTIFIER: Only the object identifier is persisted. Reduces the size of the plan. A job may contain dozens of connectors and a table may contain hundreds of fields. The user needs to make sure that catalog tables don't change. Changes of the schema could lead to hard to debug mismatch exceptions during restore. Also options that influence the support of the connector abilities could lead to exceptions.

table.plan.restore.catalog-objects

Enum

ALL (Default): All catalog info is read from the JSON plan. If information is missing, a lookup will take place. Also enrichment of forwarded options still takes place (see below). If the metadata is not available anymore, the plan can still be executed.

ALL_ENFORCED: All catalog info must be present in the JSON plan. No lookup.

IDENTIFIER: Everything is read from the catalog again. Changes of the schema or options could lead to hard to debug mismatch exceptions. 

table.plan.restore.enrich-table-options

Boolean

By default (true), we always perform a catalog lookup and enrich the plan options with catalog options. In this case, catalog options have precedence. If false, we only use the JSON plan options. 


For the latter option, the factory is the only entity able to judge whether a change causes a modification of semantics or topology. For example, the factory knows how a `topic` option for Kafka is forwarded and can allow a name change that doesn't have any side effects for the source abilities or schema.

DynamicTableFactory {

    /* Declares options that can be modified without runtime implications. */
    Set<ConfigOption<?>> mutableOptions();
    
}

Functions

For UDFs, we will only support the new function stack in the JSON plan.

System functions are temporary and will only be persisted by name (and version) in the plan. Therefore, we can also support function instances there (not only classes). Also, temporary catalog functions will only be persisted by name in the plan and can support instances. No Java serialization required.

Parameterized inline functions will not be supported. Users will get an error message to register the UDF as a temporary function:

val udf = new MyUdf("a");
table.select(udf($("myField))));

However, we will support inline functions that have no member variables for which we can serialize the class name. So the following kind of calls are supported:

val udf = new MyUdf();
table.select(udf($("myField))));

tableEnv.createTemporarySystemFunction("myUdf", new MyUdf("a"));
table.select(call("myUdf", $("myField))));

If this use case is a hard requirement, we can offer `UserDefinedFunction.toConfiguration()/fromConfiguration()` methods in UDFs in the future to properly de-/serialize its configuration. But I don't think that many users use parameterized inline functions, so this error will not happen often. And there is a workaround by registering it as a temporary function.

It should be possible for the user to change the internal UDF implementation (e.g. upgrade a library etc) as long as input/output parameters stay the same.

Options

Persisted catalog functions are stored by object identifier and class. Similar options as to catalog tables apply:

OptionTypeDescription

table.plan.compile.catalog-objects

Enum

ALL (Default): All catalog metadata about a function is persisted: Identifier + Class.
The catalog's "metadata" must not be available anymore.

SCHEMA: Same with IDENTIFIER because we store the schema anyway.

IDENTIFIER: Only the object identifier is persisted. The user needs to make sure that catalog functions signatures don't change. The class can change.

table.plan.restore.catalog-objects

Enum

ALL (Default): All catalog info is read from the JSON plan. If information is missing, a lookup will take place.

ALL_ENFORCED: All catalog info must be present in the JSON plan. No lookup.

IDENTIFIER: Everything is read from the catalog again.


We only version built-in system functions, not user-defined/catalog ones.

Types

We don't fully support user-defined types yet. But the mechanism would be similar to the one of functions.

Inline structured types will be written into the JSON plan.

We can further simplify the current JSON plan by using all `LogicalType#asSerializableString` if possible. Most basic types support LogicaType#asSerializableString. However, time attribute meta information doesn't belong into LogicaType#asSerializableString. So in the end, it might be safer to separate LogicaType and JSON plans completely. And treat them as orthogonal concepts.

Special cases will be represented with special JSON. Currently, everything is represented as special JSON which makes the JSON plans large.

For example:

"type" : {
 "typeName" : "TIMESTAMP",
 "nullable" : false,
 "precision" : 3
}

Can be:

TIMESTAMP(3) NOT NULL

Side note: LegacyTypeInformationType will not be supported. RawType has a stable serializable string representation using serializer snapshots.

Public Interfaces

SQL

EXECUTE

We suggest introducing a consistent block syntax. This is also useful for future stored procedures and other block style definitions in SQL scripts.

The current STATEMENT SET syntax is confusing as a `BEGIN` should always be at the beginning of the block. Similar to SQL Server stored procedures:

https://www.sqlservertutorial.net/sql-server-stored-procedures/sql-server-begin-end/

Statement sets receive a more consistent syntax using the EXECUTE keyword. This also indicates that an "execution" is happening "for this block".

EXECUTE STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;

// The following statement is supported for legacy reasons.
// This syntax has various issues (BEGIN at the wrong location, semicolon at the beginning of a block).

BEGIN STATEMENT SET;
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;


Any DML or set of DML can be executed with EXECUTE:

// the following statement are equivalent

INSERT INTO pageview_pv_sink SELECT page_id FROM clicks;

EXECUTE INSERT INTO pageview_pv_sink SELECT page_id FROM clicks;


Having EXECUTE as a DDL clause allows for parameterizing the execution. This might be helpful in the future and will immediately solve the definition of a plan in a table program + potential parameters in the future.

EXECUTE PLAN will always (as the name suggests) take a JSON plan path instead of a statement. The meaning of the path depends on the deployment. It should work similar to ADD JAR and should be absolute.

// execute a plan from a JSON file,
// the plan must be present and self-contained,
// otherwise temporary DDL is necessary before this statement
EXECUTE PLAN '/mydir/plan.json';

COMPILE

Similarly we offer a COMPILE PLAN statement for creating a JSON plan file for both INSERT INTO and STATEMENT SETs.

If a plan file exists already, an exception will be thrown. The IF NOT EXISTS allows multiple runs of the same SQL script without creating an exception.

// create a plan without execution
COMPILE PLAN '/mydir/plan.json' FOR INSERT INTO pageview_pv_sink SELECT page_id FROM clicks;

COMPILE PLAN '/mydir/plan.json' IF NOT EXISTS FOR INSERT INTO pageview_pv_sink SELECT page_id FROM clicks;

COMPILE PLAN '/mydir/plan.json' [IF NOT EXISTS] FOR STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;

Usually, bigger SQL scripts are defined idempotent for multiple runs which means that DDL is ignored if a table/function is created already. Both COMPILE PLAN IF NOT EXISTS and EXECUTE satisfy this property. However, a path needs to be defined twice if both steps should be part of the same SQL script which is likely to be the most common case.

We support the following combination of both statements using COMPILE AND EXECUTE PLAN. If the plan file doesn't exist, it will be compiled from the statement. Once the plan file exists, it will be executed immediately.

// plan will be executed and generated from a given DML if necessary,
// statement itself is idempotent for multiple runs of the same script,
// overwriting a file is not allowed, it will only be created once
COMPILE AND EXECUTE PLAN '/mydir/plan.json' FOR INSERT INTO pageview_pv_sink SELECT page_id FROM clicks;

// similar syntax for statement sets,
// plan will be generated if necessary
// statement is idempotent for multiple runs
COMPILE AND EXECUTE PLAN '/mydir/plan.json' FOR STATEMENT SET
BEGIN
  INSERT INTO pageview_pv_sink
  SELECT page_id, count(1) FROM clicks GROUP BY page_id;

  INSERT INTO pageview_uv_sink
  SELECT page_id, count(distinct user_id) FROM clicks GROUP BY page_id;
END;


Other kinds of statements that might be useful when dealing with plans:


// allows to get insights (which insights need to be discussed)
// and basic validation of the plan (e.g. resolution of catalog objects)
EXPLAIN PLAN '/mydir/plan.json';

// during debugging it should be possible to already define COMPILE AND EXECUTE PLAN … FOR
// with a proper path but always recompile
SET 'table.plan.restore.force-recompile'='true';

// -------------------------
// not in the first version
// -------------------------

// drop a plan
DROP PLAN '/mydir/plan.json';

// Perform plan migration in the future.
// This statement has been added for completeness. Users will need to execute it 
// when we instruct them to do so in the release notes. Plan migration will be one
// way of dropping legacy code in Flink before we have savepoint migration.
COMPILE PLAN '/mydir/plan_new.json' FROM '/mydir/plan_old.json'

Table API

In contrast to SQL, the Table API resolves tables, functions, and expressions eagerly for every operation. This means we cannot declare the pipeline fluently without interpreting it already. It makes idempotent statements such as `tableEnv.from("T1").select(...).compileAndExecute()` impossible as both `from()` and `select()` are interpreted eagerly. In programming languages, it is easy to first compile a plan and then pass the instance to an execute method. We offer a supplier method for this.

We provide the concept of a `CompiledPlan`. It is an immutable instance that represents a plan string.

// Read string from local file system
CompiledPlan.fromJsonFile(String path)
CompiledPlan.fromJsonFile(Path path)

// We provide a supplier pattern for convenience
CompiledPlan.fromJsonFile(Path path, Supplier<CompiledPlan> ifNotExist)

// Or create with a string
CompiledPlan.fromJsonString(String json)

// Access the raw content
// (the CompiledPlan.toString might be used for a summary representation)
CompiledPlan.asJsonString(): String

// Use convenience method to write it to a file
CompiledPlan.writeToFile(String path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)
CompiledPlan.writeToFile(Path path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)

...


Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...