Status
...
Page properties | |
---|---|
|
...
...
...
|
...
...
...
|
...
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Interface | Change | Comment |
---|---|---|
TableEnvironment | ||
#connect | Remove | Deprecated since Flink 1.11 |
#createTable(path, TableDescriptor) | New | |
#createTemporaryTable(path, TableDescriptor) | New | |
#from(TableDescriptor) | New | |
Table | ||
#executeInsert(TableDescriptor) | New | |
StatementSet | ||
#addInsert(TableDescriptor, Table) | New | |
Other | ||
ConnectTableDescriptor | Remove | |
BatchTableDescriptor | Remove | |
StreamTableDescriptor | Remove | |
ConnectorDescriptor | Remove | |
TableDescriptor | Refactor |
|
Rowtime | Remove |
...
In order for users to register sources and sinks via Table API, we introduce two new methods:
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Creates a new table from the given descriptor. * * The table is created in the catalog defined by the given path. */ void createTable(String path, TableDescriptor descriptor); /** * Creates a new temporary table from the given descriptor. * * Temporary objects can shadow permanent ones. If a permanent object in a given path exists, * it will be inaccessible in the current session. To make the permanent object available again * one can drop the corresponding temporary object. */ void createTemporaryTable(String path, TableDescriptor descriptor); |
...
Code Block | ||||
---|---|---|---|---|
| ||||
TableDescriptor { // Create a builder static TableDescriptorBuilder forConnector(String connector); @Nullable SchemaOptional<Schema> getSchema(); Map<String, String> getOptions(); @Nullable StringOptional<String> getComment(); List<String> getPartitionKeys(); @Nullable TableLikeDescriptorOptional<TableLikeDescriptor> getLikeDescriptor(); } TableDescriptorBuilder<SELF> { SELF schema(Schema schema); SELF comment(String comment); SELF option<T>(ConfigOption<T> configOption, T value); SELF option(String key, String value); SELF format(String format); SELF format(ConfigOption<?> formatOption, String format); SELF format(FormatDescriptor formatDescriptor); SELF format(ConfigOption<?> formatOption, FormatDescriptor formatDescriptor); SELF partitionedBy(String... partitionKeys); SELF like(String tableName, LikeOption... likeOptions); TableDescriptor build(); } TableLikeDescriptor { String getTableName(); List<TableLikeOption> getLikeOptions(); } FormatDescriptor { static FormatDescriptorBuilder forFormat(String format); String getFormat(); Map<String, String> getOptions(); } FormatDescriptorBuilder<SELF> { SELF option<T>(ConfigOption<T> configOption, T value); SELF option(String key, String value); FormatDescriptor build(); } interface LikeOption { enum INCLUDING implements LikeOption { ALL, CONSTRAINTS, GENERATED, OPTIONS, PARTITIONS, WATERMARKS } enum EXCLUDING implements LikeOption { ALL, CONSTRAINTS, GENERATED, OPTIONS, PARTITIONS, WATERMARKS } enum OVERWRITING implements LikeOption { GENERATED, OPTIONS, WATERMARKS } } |
...
We propose introducing TableEnvironment#from in order to create a Table from a given descriptor. This is in line with existing methods, e.g. from(String).
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Returns a {@link Table} backed by the given descriptor. */ Table from(TableDescriptor descriptor); |
...
We propose introducing Table#executeInsert in order to write directly to a sink defined by a descriptor. This is in line with existing methods, e.g. executeInsert(String).
Code Block | ||||
---|---|---|---|---|
| ||||
/**
* Declares that the pipeline defined by this table should be written to a table defined by the given descriptor.
*
* If no schema is defined in the descriptor, it will be inferred automatically.
*/
TableResult executeInsert(TableDescriptor descriptor); |
StatementSet#addInsert
Similarly to Table#executeInsert, we propose extending StatementSet#addInsert to take a descriptor.
Code Block | ||
---|---|---|
| ||
/** * Adds the given table as a sink defined by the descriptor. */ StatementSet addInsert(TableDescriptor descriptor, Table table); |
Package Structure
The new descriptor API will reside in flink-table-common api-java in the org.apache.flink.table.descriptors package.
...