Status


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The Table API has received many new features over the last year. It supports a new type system (FLIP-37), connectors support changelogs (FLIP-95), we have well defined internal data structures (FLIP-95), support for result retrieval in an interactive fashion (FLIP-84), and soon new TableDescriptors (FLIP-129).

However, the interfaces from and to DataStream API have not been touched during the introduction of these new features and are kind of outdated. The interfaces lack important functionality that is available in Table API but not exposed to DataStream API users. DataStream API is still our most important API which is why a good interoperability is crucial.

This FLIP is a mixture of different topics that improve the interoperability between DataStream and Table API in terms of:

  • DataStream ↔ Table conversion
  • translation of type systems TypeInformation ↔ DataType
  • schema definition (incl. rowtime, watermarks, primary key)
  • changelog handling
  • row handling in DataStream API

Public Interfaces

StreamTableEnvironment.fromDataStream

StreamTableEnvironment.fromChangelogStream

StreamTableEnvironment.toDataStream

StreamTableEnvironment.toChangelogStream

StreamTableEnvironment.createStatementSet(): StreamStatementSet

StreamStatementSet.attachToStream()

Row.withPositions(int length)

Row.withNames()

Row.getField(String)

Row.setField(String, T value)

Proposed Changes

We follow the principle "simple things should be simple, complex things should be possible". Therefore, we propose two ways of converting a DataStream to a Table and vice versa.

Conversion of DataStream to Table

Simple API

The DataStream API itself is not a changelog processor. Therefore, in most of the cases, the DataStream API produces insert-only events of different data types. Insert-only semantics make the handling of different record types easier because we don't require a Row or RowData to contain a RowKind flag. It is therefore possible to offer the conversion of any data type to an insert-only table, without the necessity of performing manual schema work. 

StreamTableEnvironment.fromDataStream(DataStream<T>): Table

StreamTableEnvironment.createTemporaryView(String, DataStream<T>): Unit

Goal: Create a table as easy as possible. 

Notes:

  • We keep the old naming `fromDataStream` in order to not break existing pipelines too much.
  • Changelog processing does not take place. Even for `DataStream<Row>`.
  • It does this by translating the TypeInformation to DataType.
  • This will happen with a new TypeInfoDataTypeConverter that will no longer produce LegacyTypeInformationType.
  • All types from DataStream API should be supported by this converter.
  • TupleTypeInfoBase will be translated into a proper RowType or StructuredType.
  • BigDecimals will be converted to DECIMAL(38,18) by default.
  • Composite types (tuples, POJOs, rows) will be flattened by default if they are used as top-level records (similar to the old behavior).
  • The order of POJO field's is determined by the DataTypeExtractor and must not be defined manually anymore.
  • GenericTypeInfo is converted to RawType immediately by considering the current configuration.
  • A DataStream that originated from Table API will keep its DataType information due to ExternalTypeInfo implementing DataTypeQueryable.
  • Most of the previous user pipelines should still run. However, we cannot guarantee 100% backward compatibility.

StreamTableEnvironment.fromDataStream(DataStream<T>, Schema): Table

StreamTableEnvironment.createTemporaryView(String, DataStream<T>, Schema): Unit

Goal: Allow to influence the schema of the table at any time and add a time attribute.

Notes:

  • Define precision and scale of decimals, timestamps etc. manually.
  • Since the type system of Table API is more powerful than DataStream API that uses TypeExtraction, it enables replacing GenericTypeInfo with a more meaningful DataType. E.g. for supporting immutable POJOs as StructuredType instead of RawType. Or Lists as ArrayTypes instead of RawTypes.
  • We will perform verification based on what is available in TypeInformation.
  • Type coercion between an explicitly specified Schema and DataStream will not happen (e.g. DataStream<Integer> != Schema.column("f", DataTypes.BIGINT())). Because the user specified the desired data type explicitly and expects correctness.
  • It allows to define a `system_rowtime` attribute for using the StreamRecord's timestamp and existing watermarks.
  • It allows to reorder fields and thus has similar functionality as the Expression-based API before but with a more consistent API.

Example:

StreamTableEnvironment.fromDataStream(
  dataStream,

  Schema.newBuilder()
    .column("user_id", DataTypes.BIGINT())
    .column("score", DataTypes.DECIMAL(10, 2))
    .column("proc", system_proctime())
    .column("ts", system_rowtime())
    .watermark("ts", system_watermark())
    .primaryKey("user_id")
    .build(),
)

Additional Notes from the ML discussion:

  • Users can place the definition time attributes at arbitrary locations between existing fields.
  • We can give time attributes a name in the same way as in regular DDL.
  • By using `system_rowtime().as("rowtime")` the watermark would be assigned implicitly.
  • The same functions should also be available in a computed column in SQL and TableDescriptor API:
    `.column("proctime", system_proctime())` and `proctime AS SYSTEM_PROCTIME()`
  • We will allow `system_proctime` everywhere in the query. Also for SQL, we should have done that earlier already to give users the chance to have time based operations also at later stages.

Deprecation:

StreamTableEnvironment.fromDataStream(DataStream<T>, Expression...): Table

StreamTableEnvironment.createTemporaryView(String, DataStream<T>, Expression...): Unit

Notes:

  • Deprecation happens in both Scala and Java StreamTableEnvironment.
  • The expression syntax with index and name based declaration of fields combined into one method confused users in the past.
  • Furthermore, projection of fields was supported but calling functions and other transformations were not.
  • Calling `.rowtime()` and `.proctime()` on fields that don't exist caused further misunderstandings.


Powerful API

The more powerful API exposes the same features as a DynamicTableSource in FLIP-95. It enables ingesting a changelog stream with primary key, computed columns, and watermarks.

StreamTableEnvironment.fromChangelogStream(DataStream<Row>): Table

GoalCreate a table from a Changelog as easy as possible. Derive schema entirely.

StreamTableEnvironment.fromChangelogStream(DataStream<Row>, Schema): Table

Goal: Create a table from a Changelog with Schema similar to a source.

Notes:

  • We only accept `DataStream<Row>` for now. RowData and other internal data structures are not officially supported in the DataStream API. 
  • The DataType of fields can be defined using `Schema.column()`. So in theory, structures such as StringData, TimestampData can still be used by power users. In that sense, Row can behave like a GenericRowData.
  • We can still allow RowData in the future by introducing a common interface for the two kinds of rows and accept a `DataStream<? extends AbstractRow>`.
  • We will not expose a StreamTableEnvironment.fromChangelogStream(ChangelogMode, ...) for now. This can be future work or won't be necessary.

Defining a Schema:

We propose to move the Schema class defined in FLIP-129 to `org.apache.flink.table.api`. The class is not only useful for descriptors but also handy for all kinds of connector definitions and core API that eventually replaces TableSchema.

For using the watermark of the underlying DataStream we propose a `system_watermark()` function. The previously mentioned `system_rowtime()` can be used if the timestamp is not part of the schema.

A full example where both timestamp and watermark are extracted from the DataStream looks like:

StreamTableEnvironment.fromChangelogStream(
  dataStream,
  Schema.newBuilder()
    .column("user_id", DataTypes.BIGINT())
    .column("score", DataTypes.DECIMAL(10, 2))
    .column("proc", system_proctime())
    .column("ts", system_rowtime())
    .watermark("ts", system_watermark())
    .primaryKey("user_id")
    .build()

)

Notes:

  • If the given schema contains no columns, we will add all the fields of the RowTypeInfo and merge it with the given schema:
    row columns + computed columns + watermark + primary key
    This allows to easily define primary key and time attributes.
  • We can use the Schema class also for CatalogTable in the future after we have deprecated TableSchema (see also discussion in  Unable to render Jira issues macro, execution error. ).
  • Other developments such as FLIP-132 show that using `Schema` in `from/toChangelogStream` methods seems to be a good long-term solution for future evolution of the API.

Conversion of Table to DataStream

Simple API

Similar to a conversion from DataStream to Table, we provide an easy method for dealing with insert-only tables that can convert to any DataType.

StreamTableEnvironment.toDataStream(Table): DataStream<Row>

StreamTableEnvironment.toDataStream(Table, AbstractDataType<?>): DataStream<T>

Notes:

  • For now, this method would only be useful for pipelines with streaming operators (without retractions) and simple ETL jobs.
  • In the future, this method will become more useful once we introduce a table to changelog conversion in SQL using EMIT CHANGELOG mentioned in FLIP-105. It will also be useful for bounded streams.
  • If no data type is specified, we use the default conversion classes similar to `Table.execute().collect()`.
  • If a data type is specified, we convert the top-level record and nested fields to the desired representation.
  • The TypeInformation of the DataStream is ExternalTypeInfo with ExternalSerializer.
  • As before, the rowtime attribute is written back into the DataStream record timestamp.
  • Type coercion is performed similar to table sinks (first on a logical level, then on a class level).

Deprecation:

StreamTableEnvironment.toAppendStream(Table table, Class<T> clazz): DataStream<T>

StreamTableEnvironment.toAppendStream(Table table, TypeInformation<T> typeInfo): DataStream<T>

Powerful API

The more powerful API exposes the same features as a DynamicTableSink in FLIP-95.

StreamTableEnvironment.toChangelogStream(Table): DataStream<Row>

StreamTableEnvironment.toChangelogStream(Table, Schema): DataStream<Row>

Notes:

  • We use the default conversion classes similar to `Table.execute().collect()` and always return DataStream of Row.
  • The `toChangelogStream(Schema, Table)` exists for completeness to have a symmetric API. It allows for declaring the data type for output similar to DynamicTableSinks. Additionally, internal structures such as StringData, TimestampData can still be used by power users. In that sense, Row can behave like a GenericRowData.
  • The TypeInformation of the DataStream is ExternalTypeInfo with ExternalSerializer.
  • We will not expose a StreamTableEnvironment.toChangelogStream(ChangelogMode, ...) for now. This can be future work or won't be necessary.
  • Type coercion is performed similar to table sinks (first on a logical level, then on a class level).

Deprecation:

StreamTableEnvironment.toRetractStream(Table table, Class<T> clazz): DataStream<Tuple2<Boolean, T>>

StreamTableEnvironment.toRetractStream(Table table, TypeInformation<T> typeInfo): DataStream<Tuple2<Boolean, T>>

Notes:

  • Deprecation happens in both Scala and Java StreamTableEnvironment.

Conversion of StatementSet to DataStream API

FLIP-84 added the concept of a "statement set" to group multiple INSERT INTO statements (SQL or Table API) together. The statements in a statement set are jointly optimized and executed as a single Flink job. Both the TableEnvironment, Table, and StatementSet have explicit `execute` methods like (`Table.executeInsert(...)` or `StatementSet.execute()`) that immediately submit a Flink job.

However, when constructing a bigger DataStream API pipeline that might go back and forth between Table API and DataStream API, it might be necessary to "attach" or "mount" an INSERT INTO statement to the main DataStream API pipeline. In other words: we would like to avoid submitting two or more Flink jobs.

StatementSet solves this use case for pure SQL & Table API pipelines. However, currently there is no way of creating StatementSet for a DataStream API job.

We propose the following API:

StreamTableEnvironment.createStatementSet(): StreamStatementSet // return a stream-specific set

StreamStatementSet extends StatementSet {
  /**
   * Attaches the optimized statement set to the DataStream pipeline.
   */
  attachToStream(): Unit
}

An example could look like:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv
  .createStatementSet()
  .addInsert(tEnv.from("FromTable"))
  .attachToStream();

tEnv.from("OtherTable").toDataStream(...) // continue with further operations

Notes:

  • The `attachToStream` command will create one or more ModifyOperations that are translated into Transformations by the planner and added to the underlying StreamExecutionEnvironment.
  • We offer this special method only to users of StreamTableEnvironment, it is not visible in TableEnvironment.
  • Users of `Table.executeInsert` need to use a StatementSet if they want to use the DataStream API afterwards.
  • We need to make sure that StatementSet always contains all `executeInsert` methods that Table offers. Currently, according to FLIP-129 a TableDescriptor approach is missing. We should add a `StatementSet.addInsert(Table, TableDescriptor)` and `StatementSet.addInsert(Table, TableDescriptor, boolean)`.
  • The proposed approach allows continuing with the fluent programming with toChangelogStream etc. and joining or using connect() with a different DataStream source.

Improve dealing with Row in DataStream API

Working with indices for accessing a row's fields is cumbersome especially if a row consists of many columns (some pipelines have 500+ columns) and a column name needs to be mapped to an index before a field can be accessed. We can leverage the available schema information to make dealing with rows more convenient. The runtime impact should be neglectable if implemented correctly. If maximum performance is required people can fallback to index-based field access or even internal RowData structure at any time.

Proposed interface changes:

class Row {

// allows to use index-based setters and getters (equivalent to new Row(int))
// method exists for completeness
public static withPositions(int length);

// allows to use name-based setters and getters
public static withNames();

/**
 * Returns the field's content at the specified position. Throws an exception if row uses name-based field access.
 */
public <T> T getField(int pos); // new generic avoids manual casting

/**
 * Sets the field's content at the specified position. Throws an exception if row uses name-based field access.
 */
public <T> void setField(int pos, T value);

/**
 * Returns the field with the given name. Throws an exception if row uses index-based field access.
 */
public <T> T getField(String name);

/**
 * Sets the field's content for the given name. Throws an exception if row uses index-based field access.
 */
public <T> void setField(String name, T value);

/**
 * Returns the field names if row supports name-based field access.
 */
public @Nullable Set<String> getFieldNames();

}

Notes:

  • We avoid an additional `NamedRow` and reduce confusion about too many different kinds of rows.
  • By definition above, a Row has three modes index-based (no name map, set/getField(int)), name-based (name map, set/getField(String)), or both (internally set by utility).
  • Both RowSerializer and RowRowConverter will make sure that field mappings are filled before output to the user code or used to reorder the fields before serialization.
  • We introduce a default-scoped constructor that will be called by some internal utility to create rows with both names and positions in serializers and converters.

Example:

// in functions ...

@FunctionHint(accumulator = @DataTypeHint("ROW<myField INT, myOtherField STRING>"))
public class SomeAggregateFunction extends AggregateFunction<String, Row> {

  @Override
  public Row createAccumulator() {
    final Row row = Row.withNames();
    row.setField("myField", 12);
    row.setField("myOtherField", "This is a test");
    return row;
  }

  public void accumulate(Row acc, String value) {
    final String myField = acc.getField("myField"); 
    if (myField == null || myField.length() < value.length()) {
      acc.setField("myOtherField", value);
    }
  }

  @Override
  public String getValue(Row acc) {
    return acc.getField("myOtherField");
  }
}

// or in DataStream API ...

StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

Table table = ... // some table with many many columns

DataStream<Integer> result = tEnv.toAppendStream(table, Row.class)
  .map(row -> row.getField("myColumn"));


Compatibility, Deprecation, and Migration Plan

See the corresponding sections of lists of deprecations.

We will keep the deprecated methods for at least 1-2 releases to give users the time to update their implementations.

All deprecated methods will have a more powerful alternative as a migration plan.

Test Plan

We will introduce new API tests.

Rejected Alternatives

Rejected Alternatives for "Conversion of StatementSet to DataStream API"

1) Introduce `StreamTableEnvironment.attachStatementSet(StatementSet)`.

Pro:

  • Feature is only exposed in StreamTableEnvironment, does not spam other API classes.

Con:

  • Feature is not visible when listing methods of StatementSet or Table.


2) Introduce `StatementSet.attach()` and `Table.attachInsert(...)`

Pro:

  • Feature is available at a fluent programming location

Con:

  • Feature would throw an exception for regular TableEnvironment, thus, would have non-unified semantics


3) Introduce proper buffering semantics

We propose the following API:

StatementSet.buffer(): Unit

Table.bufferInsert(String tablePath): Unit

Table.bufferInsert(String tablePath, boolean overwrite): Unit

An example could look like:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);

tEnv
  .from("FromTable")
  .bufferInsert("ToTable");

tEnv
  .createStatementSet()
  .addInsert(tEnv.from("FromTable"))
  .buffer();

Notes:

  • Every `executeXXX` method can have a `bufferXXX` sibling.
  • The buffer functionality can be implemented in the `Executor` by having a `Executor.buffer(List<Transformation<?>> transformations)`.
  • StreamExecutor can add the buffered Transformations immediately to the StreamExecutionEnvironment and clear the list.
  • BatchExecutor can store the buffered Transformations internally and add them once `BatchExecutor.createPipeline(...)` is called.
  • In any case, another `execute` must be called. Either from `StreamExecutionEnvironment.execute()` or one of `StatementSet.execute()`/`Table.execute()`

Pro:

  • Explicit buffering is possible

Con:

  • Users could be confused by this method that is not required very often.
  • Unclear TableResult output if multiple buffered transformations are executed.