...
We propose to drop the existing method TableEnvironment#connect
(deprecated in 1.11) and introduce a new method in TableEnvironment
:
Code Block | ||
---|---|---|
| ||
/** creates a temporary table from a descriptor. */
void createTemporaryTable(String tablePath, TableDescriptor tableDescriptor); |
...
The TableDescriptor
is an unified interface/class to represent a SQL DDL strucutre (or CatalogTable internally). It can be a specific connector descriptor, e.g. Kafka
, or a general purpose descriptor, i.e. Connector
. All the methods can be chained called on the instance, including .schema()
, .partitionedBy()
, and .like()
. We will discuss TableDescriptor
in detail in the Proposed Changes section.
The example will look like this:
Code Block | ||
---|---|---|
| ||
// register a table using specific descriptor tEnv.createTemporaryTable( "MyTable", new Kafka() .version("0.11") .topic("user_logs") .property("bootstrap.servers", "localhost:9092") .property("group.id", "test-group") .startFromEarliest() .sinkPartitionerRoundRobin() .format(new Json().ignoreParseErrors(false)) .schema( new Schema() .column("user_id", DataTypes.BIGINT()) .column("user_name", DataTypes.STRING()) .column("score", DataTypes.DECIMAL(10, 2)) .column("log_ts", DataTypes.STRING()) .column("part_field_0", DataTypes.STRING()) .column("part_field_1", DataTypes.INT()) .proctime("proc") // define a processing-time attribute with column name "proc" .computedColumn("my_ts", "TO_TIMESTAMP(log_ts)") // computed 计算列column .watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5)) // defines watermark and rowtime attribute .primaryKey("user_id")) .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API ); // register a table using general purpose Connector descriptor, this would be helpful for custom source/sinks tEnv.createTemporaryTable( "MyTable", new Connector("kafka-0.11") .option("topic", "user_logs") .option("properties.bootstrap.servers", "localhost:9092") .option("properties.group.id", "test-group") .option("scan.startup.mode", "earliest") .option("format", "json") .option("json.ignore-parse-errors", "true") .option("sink.partitioner", "round-robin") .schema( new Schema() .column("user_id", DataTypes.BIGINT()) .column("user_name", DataTypes.STRING()) .column("score", DataTypes.DECIMAL(10, 2)) .column("log_ts", DataTypes.STRING()) .column("part_field_0", DataTypes.STRING()) .column("part_field_1", DataTypes.INT()) .proctime("proc") // define a processing-time attribute with column name "proc" .computedColumn("my_ts", "TO_TIMESTAMP(log_ts)") // computed 计算列column .watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5)) // defines watermark and rowtime attribute .primaryKey("user_id")) .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API ); |
Additionally, we would like propose two new method for better usability for Table API users.
Code Block | ||
---|---|---|
| ||
interface TableEnvironment { /** reads a table from the given descriptor */ Table from(TableDescriptor tableDescriptor); // we already have a "from(String)" method to get registered table from catalog } interface Table { /** Writes the Table to a sink that is specified by the given descriptor. */ TableResult executeInsert(TableDescriptor tableDescriptor); // we already have a "executeInsert(String)" method to write into a registered table in catalog } |
Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.
...