Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose to drop the existing method TableEnvironment#connect (deprecated in 1.11) and introduce a new method in TableEnvironment:


Code Block
languagejava
/** creates a temporary table from a descriptor. */
void createTemporaryTable(String tablePath, TableDescriptor tableDescriptor);

...

The TableDescriptor is an unified interface/class to represent a SQL DDL strucutre (or CatalogTable internally). It can be a specific connector descriptor, e.g. Kafka, or a general purpose descriptor, i.e. Connector. All the methods can be chained called on the instance, including .schema(), .partitionedBy(), and .like(). We will discuss TableDescriptor in detail in the Proposed Changes section.

The example will look like this:

Code Block
languagejava
// register a table using specific descriptor
tEnv.createTemporaryTable(
	"MyTable",
	new Kafka()
		.version("0.11")
		.topic("user_logs")
		.property("bootstrap.servers", "localhost:9092")
		.property("group.id", "test-group")
		.startFromEarliest()
		.sinkPartitionerRoundRobin()
		.format(new Json().ignoreParseErrors(false))
		.schema(
			new Schema()
				.column("user_id", DataTypes.BIGINT())
				.column("user_name", DataTypes.STRING())
				.column("score", DataTypes.DECIMAL(10, 2))
				.column("log_ts", DataTypes.STRING())
				.column("part_field_0", DataTypes.STRING())
				.column("part_field_1", DataTypes.INT())
				.proctime("proc") // define a processing-time attribute with column name "proc"
				.computedColumn("my_ts", "TO_TIMESTAMP(log_ts)")  // computed 计算列column
				.watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5))  // defines watermark and rowtime attribute
				.primaryKey("user_id"))
		.partitionedBy("part_field_0", "part_field_1")  // Kafka doesn't support partitioned table yet, this is just an example for the API
);

// register a table using general purpose Connector descriptor, this would be helpful for custom source/sinks
tEnv.createTemporaryTable(
	"MyTable",
	new Connector("kafka-0.11")
		.option("topic", "user_logs")
		.option("properties.bootstrap.servers", "localhost:9092")
		.option("properties.group.id", "test-group")
		.option("scan.startup.mode", "earliest")
		.option("format", "json")
		.option("json.ignore-parse-errors", "true")
		.option("sink.partitioner", "round-robin")
		.schema(
			new Schema()
				.column("user_id", DataTypes.BIGINT())
				.column("user_name", DataTypes.STRING())
				.column("score", DataTypes.DECIMAL(10, 2))
				.column("log_ts", DataTypes.STRING())
				.column("part_field_0", DataTypes.STRING())
				.column("part_field_1", DataTypes.INT())
				.proctime("proc") // define a processing-time attribute with column name "proc"
				.computedColumn("my_ts", "TO_TIMESTAMP(log_ts)")  // computed 计算列column
				.watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5)) // defines watermark and rowtime attribute
				.primaryKey("user_id"))
		.partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API
);


Additionally, we would like propose two new method for better usability for Table API users. 


Code Block
languagejava
interface TableEnvironment {
  /** reads a table from the given descriptor */
  Table from(TableDescriptor tableDescriptor); 
  // we already have a "from(String)" method to get registered table from catalog
}

interface Table {
  /** Writes the Table to a sink that is specified by the given descriptor. */
  TableResult executeInsert(TableDescriptor tableDescriptor); 
  // we already have a "executeInsert(String)" method to write into a registered table in catalog
}


Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

...