Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Describe the problems you are trying to solve.

...

Flink introduced Descriptor API to configure and instatiate TableSources/TableSinks since 1.5.0, i.e. the TableEnvironment#connect API.

Currently, there are many problems with current Descriptor API which we want to resolve in this FLIP. 

  • The community focused on the new SQL DDL feature in recent releases. SQL DDL is well-designed and has many rich features. However, Descriptor API lacks many key features, e.g. computed columns, primary key, partition key and so on. 
  • Currently, a connector must provide a corresponding Descriptor (e.g. new Kafka()). We hope connectors can be registered without a corresponding Descriptor. This can ease the development of connectors and can be a replacement of registerTableSource/Sink.
  • The underlying implementation of Descriptor API and SQL DDL are different. It’s expensive to maintain two different code path. 

There are many known issues about Descriptor API: FLINK-17548, FLINK-17186, FLINK-15801, FLINK-15943.

Public Interfaces


We propose to drop the existing method TableEnvironment#connect (deprecated in 1.11) and introduce a new method in TableEnvironment:


Code Block
languagejava
void createTemporaryTable(String tablePath, TableDescriptor tableDescriptor);


The TableDescriptor is an unified interface/class to represent a SQL DDL strucutre (or CatalogTable internally). The example will look like this:


Code Block
languagejava
tEnv.createTemporaryTable(
	"MyTable",
	new Kafka()
		.version("0.11")
		.topic("user_logs")
		.property("bootstrap.servers", "localhost:9092")
		.property("group.id", "test-group")
		.startFromEarliest()
		.sinkPartitionerRoundRobin()
		.format(new Json().ignoreParseErrors(false))
		.schema(
			new Schema()
				.column("user_id", DataTypes.BIGINT())
				.column("user_name", DataTypes.STRING())
				.column("score", DataTypes.DECIMAL(10, 2))
				.column("log_ts", DataTypes.STRING())
				.column("part_field_0", DataTypes.STRING())
				.column("part_field_1", DataTypes.INT())
				.proctime("proc")
				.computedColumn("my_ts", "TO_TIMESTAMP(log_ts)")  // 计算列
				.watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5))
				.primaryKey("user_id"))
		.partitionedBy("part_field_0", "part_field_1")
);

tEnv.createTemporaryTable(
	"MyTable",
	new Connector("kafka-0.11")
		.option("topic", "user_logs")
		.option("properties.bootstrap.servers", "localhost:9092")
		.option("properties.group.id", "test-group")
		.option("scan.startup.mode", "earliest")
		.option("format", "json")
		.option("json.ignore-parse-errors", "true")
		.option("sink.partitioner", "round-robin")
		.schema(
			new Schema()
				.column("user_id", DataTypes.BIGINT())
				.column("user_name", DataTypes.STRING())
				.column("score", DataTypes.DECIMAL(10, 2))
				.column("log_ts", DataTypes.STRING())
				.column("part_field_0", DataTypes.STRING())
				.column("part_field_1", DataTypes.INT())
				.proctime("proc")
				.computedColumn("my_ts", "TO_TIMESTAMP(log_ts)")  // 计算列
				.watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5))
				.primaryKey("user_id"))
		.partitionedBy("part_field_0", "part_field_1")
);



Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

...