Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

InterfaceChangeComment
TableEnvironment
#connectRemoveDeprecated since Flink 1.11
#createTable(path, TableDescriptor)New
#createTemporaryTable(path, TableDescriptor)New
#from(TableDescriptor)New
Table
#executeInsert(TableDescriptor)New
StatementSet
#addInsert(TableDescriptor, Table)New
Other
ConnectTableDescriptorRemove
BatchTableDescriptorRemove
StreamTableDescriptorRemove
ConnectorDescriptorRemove
TableDescriptorRefactor
  • Removed in its current form
  • Name is re-used for the new API
RowtimeRemove

...

In order for users to register sources and sinks via Table API, we introduce two new methods:

Code Block
languagejava
titleTableEnvironment
/**
  * Creates a new table from the given descriptor.
  *
  * The table is created in the catalog defined by the given path.
  */
void createTable(String path, TableDescriptor descriptor);

/**
  * Creates a new temporary table from the given descriptor.
  *
  * Temporary objects can shadow permanent ones. If a permanent object in a given path exists,
  * it will be inaccessible in the current session. To make the permanent object available again
  * one can drop the corresponding temporary object.
  */
void createTemporaryTable(String path, TableDescriptor descriptor);

...

Code Block
languagejava
titleTableDescriptor
TableDescriptor {
  // Create a builder
  static TableDescriptorBuilder forConnector(String connector);
  
  @NullableOptional<Schema> Schema getSchema();
  Map<String, String> getOptions();
  @Nullable StringOptional<String> getComment();
  List<String> getPartitionKeys();
  @NullableOptional<TableLikeDescriptor> TableLikeDescriptor getLikeDescriptor();
}

TableDescriptorBuilder<SELF> {
  SELF schema(Schema schema);
  
  SELF comment(String comment);

  SELF option<T>(ConfigOption<T> configOption, T value);
  SELF option(String key, String value);

  SELF format(String format);
  SELF format(ConfigOption<?> formatOption, String format);  
  SELF format(FormatDescriptor formatDescriptor);
  SELF format(ConfigOption<?> formatOption, FormatDescriptor formatDescriptor);
  
  SELF partitionedBy(String... partitionKeys);
  
  SELF like(String tableName, LikeOption... likeOptions);
  
  TableDescriptor build();
}

TableLikeDescriptor {
  String getTableName();
  List<TableLikeOption> getLikeOptions();
}

FormatDescriptor {
  static FormatDescriptorBuilder forFormat(String format);
  
  String getFormat();
  Map<String, String> getOptions();
}

FormatDescriptorBuilder<SELF> {
  SELF option<T>(ConfigOption<T> configOption, T value);
  SELF option(String key, String value);
  FormatDescriptor build();
}

interface LikeOption {
	enum INCLUDING implements LikeOption {
		ALL,
		CONSTRAINTS,
		GENERATED,
		OPTIONS,
		PARTITIONS,
		WATERMARKS
	}

	enum EXCLUDING implements LikeOption {
		ALL,
		CONSTRAINTS,
		GENERATED,
		OPTIONS,
		PARTITIONS,
		WATERMARKS
	}

	enum OVERWRITING implements LikeOption {
		GENERATED,
		OPTIONS,
		WATERMARKS
	}
}

...

We propose introducing TableEnvironment#from in order to create a Table from a given descriptor. This is in line with existing methods, e.g. from(String).

Code Block
languagejava
titleTableEnvironment
/**
  * Returns a {@link Table} backed by the given descriptor.
  */
Table from(TableDescriptor descriptor); 

...

We propose introducing Table#executeInsert in order to write directly to a sink defined by a descriptor. This is in line with existing methods, e.g. executeInsert(String).

Code Block
languagejava
titleTable
/**
  * Declares that the pipeline defined by this table should be written to a table defined by the given descriptor.
  *
  * If no schema is defined in the descriptor, it will be inferred automatically.
  */
TableResult executeInsert(TableDescriptor descriptor);

StatementSet#addInsert

Similarly to Table#executeInsert, we propose extending StatementSet#addInsert to take a descriptor.

Code Block
languagejava
/**
  * Adds the given table as a sink defined by the descriptor.
  */
StatementSet addInsert(TableDescriptor descriptor, Table table);

Package Structure

The new descriptor API will reside in flink-table-common api-java in the org.apache.flink.table.descriptors package.

...