Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will discuss in detail about the new interfaces/classes in this section.

All the classes will be located in org.apache.flink.table.descriptors package and in flink-table-common module.

TableDescriptor is an abstract class, it represents a SQL DDL strucutre or a CatalogTable. It can be divided into several parts: schema, partitionedKey, and options. The TableDescriptor determines how to define schema and partitionedKeys, but leaves options to be implemented by subclasses. Specific connectors can extend to TableDescriptor and provide handy methods to set connector options (e.g. Kafka#topic(..)). We also propose to provide a built-in and general implementation of TableDescriptor, i.e. Connector. The Connector class provides a general option(String key, String value) method, thus it can support arbitrary custom connector implementations (based on FLIP-95). The Connector class can reduce the effort of development of custom connectors without implementing a specific descriptor. 

...

Code Block
languagejava
titleTableDescriptor
/**
 * Describes a table to connect. It is a same representation of SQL CREATE TABLE DDL.
 */
@PublicEvolving
public abstract class TableDescriptor {

	/**
	 * Specifies the table schema.
	 */
	public final TableDescriptor schema(Schema schema) {...}

	/**
	 * Specifies the partition keys of this table.
	 */
	public final TableDescriptor partitionedBy(String... columnNames) {...}

	/**
	 * Extends some parts from the original regsitered table path.
	 */
	public final TableDescriptor like(String originalTablePath, LikeOption... likeOptions) {...}

	/**
	 * Extends some parts from the original table descriptor.
	 */
	public final TableDescriptor like(TableDescriptor originalTableDescriptor, LikeOption... likeOptions) {...}

	/**
	 * Specifies the connector options of this table, subclasses should override this method.
	 */
	protected abstract Map<String, String> connectorOptions();
}


Code Block
languagejava
titleLikeOption
public class Connector extends TableDescriptorinterface LikeOption {

	privateenum finalINCLUDING Map<String,implements String> options = new HashMap<>();

	public Connector(String identifier) {
		this.options.put(CONNECTOR.key(), identifier);
	}

	public Connector option(String key, String value) {
		options.put(key, value);
		return this;LikeOption {
		ALL,
		CONSTRAINTS,
		GENERATED,
		OPTIONS,
		PARTITIONS,
		WATERMARKS
	}

	protectedenum Map<String,EXCLUDING String>implements toConnectorOptions()LikeOption {
		return new HashMap<>(options);
	}
}

ALL,
		CONSTRAINTS,
		GENERATED,
		OPTIONS,
		PARTITIONS,
		WATERMARKS
	}

	enum OVERWRITING implements LikeOption {
		GENERATED,
		OPTIONS,
		WATERMARKS
	}
}


Code Block
languagejava
titleConnector
public class KafkaConnector extends TableDescriptor {

	private final Map<String, String> options = new HashMap<>();

	public KafkaConnector(String identifier) {
		this.options.put(CONNECTOR.key(), "kafka")identifier);
	}

	public KafkaConnector versionoption(String key, String versionvalue) {
		this.options.put(CONNECTOR.key(), "kafka-" + versionvalue);
		return this;
	}

	public Kafka topic(String topicprotected Map<String, String> toConnectorOptions() {
		this.options.put("topic", topicreturn new HashMap<>(options);
		return this;}
}


Code Block
languagejava
titleKafka
public class Kafka extends TableDescriptor {

	private final Map<String, String> options = new HashMap<>();

	public Kafka(	}

	public Kafka format(FormatDescriptor formatDescriptor) {
		this.options.putAllput(formatDescriptorCONNECTOR.toFormatOptionskey());
		return this, "kafka");
	}

	public Kafka  ...
}

Schema

The current Rowtime class will be removed. The current Schema class will be refactored into:

version(String version) {
		this.options.put(CONNECTOR.key(), "kafka-" + version);
		return this;
	}

	public Kafka topic(String topic) {
		this.options.put("topic", topic);
		return this;
	}

	public Kafka format(FormatDescriptor formatDescriptor) {
		this.options.putAll(formatDescriptor.toFormatOptions());
		return this;
	}

   ...
}


Schema

The current Rowtime class will be removed. The current Schema class will be refactored into:

Code Block
languagejava
titleSchema
/**
 * Describes a schema of a table.
 */
@PublicEvolving
public class Schema {

	/**
	 * Adds a column with the column name and the data type
Code Block
languagejava
titleSchema
/**
 * Describes a schema of a table.
 */
@PublicEvolving
public class Schema {

	/**
	 * Adds a column with the column name and the data type.
	 */
	public Schema column(String columnName, DataType columnType) {...}

	/**
	 * Adds a computed column with the column name and the SQL expression string.
	 */
	public Schema column(String columnName, StringDataType sqlExpressioncolumnType) {...}

	/**
	 * Adds a processing-timecomputed column with the givencolumn name and the column nameExpression.
	 */
	public Schema proctimecolumn(String columnName, Expression columnExpr) {...}


	/**
	 * Specifies the primary key constraint for a set of given columns.
	 */
	public Schema primaryKey(String... columnNames) {...}

	/**
	 * Specifies the watermark strategy for rowtime attribute.
	 */
	public SchemaWithWatermark watermarkFor(String rowtimeColumn, Expression watermarkExpr) {...}

	public static class SchemaWithWatermark {

		/**
		 * Specifies a custom watermark strategy using the given SQL expression string.
		 */
        public Schema as(String watermarkSqlExpr) {...}

		/**
		 * Specifies a watermark strategy for situations with monotonously ascending timestamps.
		 */
		public Schema ascendingTimestamps() {...}

		/**
		 * Specifies a watermark strategy for situations where records are out of order, but you can place
		 * an upper bound on how far the events are out of order. An out-of-order bound B means that
		 * once the an event with timestamp T was encountered, no events older than {@code T - B} will
		 * follow any more.
		 */
		public Schema boundedOutOfOrderTimestamps(Duration maxOutOfOrderness) {...}
	}
}}


Implementation

I propose to only support this in blink planner as we are going to drop old planner in the near future and old planner doesn't support FLIP-95 connectors.

The descriptors TableDescriptor/Schema can be used in TableEnvironment#from(), TableEnvironment#createTemporaryTable(), Table#executeInsert(). So does the StreamTableEnvironment, but BatchTableEnvironment will not support this as it is implemented in old planner. 

The descriptors TableDescriptor/Schema only defines the meta information (just like DDL string) used to build the CatalogTable. The implementation of TableEnvironment#createTemporaryTable(path, descriptor) will translate the descriptor into CatalogTable.

TableDescriptor stores the meta information in the package-visible member fields/methods, e.g. schema, partitionedKeys, connectorOptions(), so does the Schema class. 

TableEnvironmentImpl#createTemporaryTable will create a new instance of TableDescriptorRegistration to register descriptor as a CatalogTable into catalog. It is an @Internal class located in org.apache.flink.table.descriptors. So that TableDescriptorRegistration can access member fields in TableDescriptor/SchemaTableDescriptorRegistration will convert schema into TableSchema (with the help of CatalogTableSchemaResolver), and convert partitionedKeys, options, tableSchema into CatalogTableImpl

TableEnvironment#from() will register descriptor under a system generated table path (just like TableImpl#toString) first, and scan from the table path to get the Table. Table#executeInsert() does it in the similar way. 

Compatibility, Deprecation, and Migration Plan

...

We have a migration plan for users who are still using TableEnvironment#connect and want to migrate to new Descriptor APIs. The following tables list the API changes:


Schema API Changes

Current InterfaceNew Interface
.field("name", DataTypes.STRING()).column("name", DataTypes.STRING())
.field("name", "STRING").column("name", DataTypes.STRING())
.field("proctime", DataTypes.TIMESTAMP(3)).proctime().
proctime
column("proc", proctime
"
())
.from("originalName").column("newName", $("originalName"))
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicAscending()).watermarkFor("time", $("time")
.ascendingTimestamps(
)
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicBounded(2)).watermarkFor("time", $("time").
boundedOutOfOrderTimestamps(Duration.ofMillis(2
minus(lit(2).milli()))
rowtime.timestampsFromExtractor(TimestampExtractor).column(fieldName, expr).watermarkFor(fieldName, watermarkExpr)
rowtime.watermarksFromStrategy(WatermarkStrategy).watermarkFor(
rowtimeField).as(
fieldName, watermarkExpr)
rowtime.watermarksFromSource()removed (never implemented by any connectors before)
rowtime.timestampsFromSource()removed (never implemented by any connectors before)
.toProperties()removed (not needed anymore)

ConnectTableDescriptor API Changes


Current InterfaceNew Interface
.withSchema(new Schema()).schema(new Schema())
.withFormat(new Json())new Kafka().format(new Json())
.withPartitionKeys(Arrays.asList(a, b, c)).partitionedBy(a, b, c)
.createTemporaryTable(path)tEnv.createTemporaryTable(path, descriptor)
.inAppendMode()removed (not needed anymore)
.inRetractMode()removed (not needed anymore)
.inUpsertMode()removed (not needed anymore)
.toProperties()removed (not needed anymore)


Rejected Alternatives

Keep and follow the original TableEnvironment#connect API

...