Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
// register a table using specific descriptor
tEnv.createTemporaryTable(
   	"MyTable",
   new Kafka	KafkaConnector.newBuilder()
      		.version("0.11")
      		.topic("user_logs")
      		.property("bootstrap.servers", "localhost:9092")
      		.property("group.id", "test-group")
      		.startFromEarliest()
      		.sinkPartitionerRoundRobin()
      		.format(new JsonJsonFormat.newBuilder().ignoreParseErrors(false).build()
      )
		.schema(
         new Schema()
            			Schema.newBuilder()
				.column("user_id", DataTypes.BIGINT())
            				.column("user_name", DataTypes.STRING())
            				.column("score", DataTypes.DECIMAL(10, 2))
            				.column("log_ts", DataTypes.STRING())
            				.column("part_field_0", DataTypes.STRING())
            				.column("part_field_1", DataTypes.INT())
            				.column("proc", proctime()) // define a processing-time attribute with column name "proc"
            				.column("my_ts", toTimestamp($("log_ts"))  // computed column
            .watermarkFor("my_
				.watermark("ts", $("my_ts").minus(lit(3).seconds()))  // defines watermark and rowtime attribute
            
				.primaryKey("user_id")
				.build())
      		.partitionedBy("part_field_0", "part_field_1")  // Kafka doesn't support partitioned table yet, this is just an example for the API
		.build()
);


Code Block
languagejava
// register a table using general purpose Connector descriptor, this would be helpful for custom source/sinks
tEnv.createTemporaryTable(
   	"MyTable",
   new Connector	Connector.of("kafka-0.11")
      		.option("topic", "user_logs")
      		.option("properties.bootstrap.servers", "localhost:9092")
      		.option("properties.group.id", "test-group")
      		.option("scan.startup.mode", "earliest")
      		.option("format", "json")
      		.option("json.ignore-parse-errors", "true")
      		.option("sink.partitioner", "round-robin")
      .		.schema(
         new Schema()
            			Schema.newBuilder()
				.column("user_id", DataTypes.BIGINT())
            				.column("user_name", DataTypes.STRING())
            				.column("score", DataTypes.DECIMAL(10, 2))
            .				.column("log_ts", DataTypes.STRING())
            				.column("part_field_0", DataTypes.STRING())
            				.column("part_field_1", DataTypes.INT())
            				.column("proc", proctime()) // define a processing-time attribute with column name "proc"
            				.column("my_ts", toTimestamp($("log_ts"))  // computed column
            .watermarkFor("my_
				.watermark("ts", $("my_ts").minus(lit(3).seconds())) // defines watermark and rowtime attribute
            
				.primaryKey("user_id")
				.build()
      )
		.partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API
);


		.build()
);




LIKE clause for Descriptor API

...

Code Block
languagejava
tEnv.createTemporaryTable(
   	"OrdersInKafka",
   new Kafka	KafkaConnector.newBuilder()
      		.topic("user_logs")
      		.property("bootstrap.servers", "localhost:9092")
      		.property("group.id", "test-group")
      		.format(new Json().ignoreParseErrors(falseJsonFormat.newInstance())
      		.schema(
         new Schema()
            			Schema.newBuilder()
				.column("user_id", DataTypes.BIGINT())
            				.column("score", DataTypes.DECIMAL(10, 2))
            				.column("log_ts", DataTypes.TIMESTAMP(3))
            				.column("my_ts", toTimestamp($("log_ts"))
      				.build())
		.build()
);

tEnv.createTemporaryTable(
   	"OrdersInFilesystem",
   new Connector	Connector.of("filesystem")
      		.option("path", "path/to/whatever")
      		.schema(
         new Schema			Schema.newBuilder()
            .watermarkFor("my_ts", $("my_				.watermark("ts", $("ts").minus(lit(3).seconds()))
				.build())
      		.like("OrdersInKafka", LikeOption.EXCLUDING.ALL, LikeOption.INCLUDING.GENERATED)
		.build()
);


The above "OrdersInFilesystem" table will be equivalent to:

Code Block
languagejava
tEnv.createTemporaryTable(
   	"OrdersInFilesystem",
   new Connector	Connector.of("filesystem")
      		.option("path", "path/to/whatever")
      		.schema(
         new Schema()
            			Schema.newBuilder()
				.column("user_id", DataTypes.BIGINT())
            				.column("score", DataTypes.DECIMAL(10, 2))
            				.column("log_ts", DataTypes.TIMESTAMP(3))
            				.column("my_ts", toTimestamp($("log_ts"))
            .watermarkFor("my_ts", $("my_ts").minus(lit(3).seconds()))
      				.build())
		.build()
);


TableEnvironment#from() and Table#executeInsert()

...

Code Block
languagejava
Schema schema = new Schema.newBuilder()
   .column("user_id", DataTypes.BIGINT())
   .column("score", DataTypes.DECIMAL(10, 2))
   .column("ts", DataTypes.TIMESTAMP(3))
   .build();
Table myKafka = tEnv.from(
   new KafkaKafkaConnector.newBuilder()
      .version("0.11")
      .topic("user_logs")
      .property("bootstrap.servers", "localhost:9092")
      .property("group.id", "test-group")
      .startFromEarliest()
      .sinkPartitionerRoundRobin()
      .format(new JsonJsonFormat.newBuilder().ignoreParseErrors(false)).build())
      .schema(schema)
      .build()
);
// reading from kafka table and write into filesystem table
myKafka.executeInsert(
   new Connector.of("filesystem")
      .option("path", "/path/to/whatever")
      .option("format", "json")
      .schema(schema)
      .build()
);


Proposed Changes

We will discuss in detail about the new interfaces/classes in this section.

...

TableDescriptor is an abstract class, it represents a SQL DDL strucutre or a CatalogTable. It can be divided into several parts: schema, partitionedKey, and options. The TableDescriptor determines how to define schema and partitionedKeys, but leaves options to be implemented by subclasses. Specific connectors can extend to TableDescriptor and provide handy methods to set connector options (e.g. Kafka#topic(..)). We also propose to provide a built-in and general implementation of TableDescriptor, i.e. Connector. The Connector class provides a general option(String key, String value) method, thus it can support arbitrary custom connector implementations (based on FLIP-95). The Connector class can reduce the effort of development of custom connectors without implementing a specific descriptor. 

TableDescriptor & TableDescriptorBuilder

The current TableDescriptor will be refactored into:

Code Block
languagejava
titleTableDescriptor
/**
 * Describes a table to connect. It is a same representation of SQL CREATE TABLE DDL.
 */
@PublicEvolving
public abstract class TableDescriptor {

	/**
	 * Specifies the table schema.
	 */
	public final TableDescriptor schema(Schema schema) {...}

	/**
	 * Specifies the partition keys of this table.
	 */
	public final TableDescriptor partitionedBy(String... columnNames) {...}

	/**
	 * Extends some parts from the original regsitered table path.
	 */
	public final TableDescriptor like(String originalTablePath, LikeOption... likeOptions) {...}

	/**
	 * Extends some parts from the original table descriptor.
	 */
	public final TableDescriptor like(TableDescriptor originalTableDescriptor, LikeOption... likeOptions) {... It only wraps meta information about a catalog table.
 * Please use a specific {@link TableDescriptorBuilder} to build the {@link TableDescriptor}.
 */
@PublicEvolving
public abstract class TableDescriptor {
    // package visible variables used internally to build a CatalogTable
	protected List<String> partitionedFields;
	protected Schema schema;
	protected Map<String, String> options;
	protected LikeOption[] likeOptions;
	protected String likePath;
}

/**
 * A basic builder implementation to build {@link TableDescriptor}.
 */
@PublicEvolving
public abstract class TableDescriptorBuilder<DESCRIPTOR extends TableDescriptor, BUILDER extends TableDescriptorBuilder<DESCRIPTOR, BUILDER>> {

	private final DESCRIPTOR descriptor;

	protected TableDescriptorBuilder(Class<DESCRIPTOR> descriptorClass) {
		descriptor = InstantiationUtil.instantiate(descriptorClass, TableDescriptor.class);
	}

	/**
	 * SpecifiesReturns the connectorthis optionsbuilder ofinstance thisin table,the subclassestype should override this methodof subclass.
	 */
	protected abstract Map<String, String> connectorOptionsBUILDER self();
}

Code Block
languagejava
titleLikeOption
public interface LikeOption {
	enum INCLUDING implements LikeOption	/**
	 * Specifies the table schema.
	 */
	public BUILDER schema(Schema schema) {
		ALL,descriptor.schema = schema;
		CONSTRAINTS,
		GENERATED,
		OPTIONS,
		PARTITIONS,
		WATERMARKS
	}

	enum EXCLUDING implements LikeOption {
		ALL,
		CONSTRAINTS,
		GENERATED,
		OPTIONS,
		PARTITIONS,
		WATERMARKS
	}

	enum OVERWRITING implements LikeOption {
		GENERATED,
		OPTIONS,
		WATERMARKS
	}
}
Code Block
languagejava
titleConnector
public class Connector extends TableDescriptor {

	private final Map<String, String> options = new HashMap<>();

	public Connector(String identifier) {
		this.options.put(CONNECTOR.key(), identifier);
	}

	public Connector return self();
	}

	/**
	 * Specifies the partition keys of this table.
	 */
	public BUILDER partitionedBy(String... fieldNames) {
		checkArgument(descriptor.partitionedFields.isEmpty(), "partitionedBy(...) shouldn't be called more than once.");
		descriptor.partitionedFields.addAll(Arrays.asList(fieldNames));
		return self();
	}

	/**
	 * Extends some parts from the original registered table path.
	 */
	public BUILDER like(String tablePath, LikeOption... likeOptions) {
		descriptor.likePath = tablePath;
		descriptor.likeOptions = likeOptions;
		return self();
	}

	protected BUILDER option(String key, String value) {
		descriptor.options.put(key, value);
		return thisself();
	}

	/**
	protected * Map<String,Returns String>created toConnectorOptions() {table descriptor.
	 */
	returnpublic newDESCRIPTOR HashMap<>build(options); {
		return descriptor;
	}
}


Code Block
languagejava
titleKafkaLikeOption
public class Kafka extends TableDescriptorinterface LikeOption {

	privateenum finalINCLUDING Map<String,implements String> options = new HashMap<>();

	public Kafka() {
		this.options.put(CONNECTOR.key(), "kafka");
	}

	public Kafka version(String version) {
		this.options.put(CONNECTOR.key(), "kafka-" + version);
		return this;LikeOption {
		ALL,
		CONSTRAINTS,
		GENERATED,
		OPTIONS,
		PARTITIONS,
		WATERMARKS
	}

	enum EXCLUDING implements LikeOption {
		ALL,
		CONSTRAINTS,
		GENERATED,
		OPTIONS,
		PARTITIONS,
		WATERMARKS
	}

	publicenum KafkaOVERWRITING topic(String topic)implements LikeOption {
		this.options.put("topic", topic);GENERATED,
		OPTIONS,
		return this;WATERMARKS
	}
}


Code Block
languagejava
titleConnector
public class Connector extends TableDescriptor {

	public Kafkastatic ConnectorBuilder formatof(FormatDescriptorString formatDescriptoridentifier) {
		this.options.putAll(formatDescriptor.toFormatOptions())return new ConnectorBuilder(identifier);
		return this;
	}

	public static class ConnectorBuilder extends TableDescriptorBuilder<Connector, ConnectorBuilder> {

		private ConnectorBuilder(String identifier) {
			super(Connector...
}

Schema

The current Rowtime class will be removed. The current Schema class will be refactored into:

class);
			option(CONNECTOR.key(), identifier);
		}

		public ConnectorBuilder option(String key, String value) {
			String lowerKey = key.toLowerCase().trim();
			if (CONNECTOR.key().equals(lowerKey)) {
				throw new IllegalArgumentException("It's not allowed to override 'connector' option.");
			}
			return super.option(key, value);
		}

		@Override
		protected ConnectorBuilder self() {
			return this;
		}
	}
}


Code Block
languagejava
titleKafka
/**
 * Connector descriptor for the Apache Kafka message queue.
 */
@PublicEvolving
public class KafkaConnector extends TableDescriptor {

	public static KafkaConnectorBuilder newBuilder() {
		return new KafkaConnectorBuilder();
	}

	public static class KafkaConnectorBuilder extends TableDescriptorBuilder<KafkaConnector, KafkaConnectorBuilder> {

		public KafkaConnectorBuilder() {
			super(KafkaConnector.class);
		}

		/**
		 * Sets the Kafka version to be used.
		 *
		 * @param version Kafka version. E.g., "0.8", "0.11", etc.
		 */
		public KafkaConnectorBuilder version(String version) {
			Preconditions.checkNotNull(version);
			option(CONNECTOR.key(), "kafka-" + version);
			return this;
		}

		/**
		 * Sets the topic from which the table is read.
		 *
		 * @param topic The topic from which the table is read.
		 */
		public KafkaConnectorBuilder topic(String topic) {
			Preconditions.checkNotNull(topic);
			option("topic", topic);
			return this;
		}
 
        // TODO: add more methods
		... 

		@Override
		protected KafkaConnectorBuilder self() {
			return this;
		}
	}
}


Schema

The current Rowtime class will be removed. The current Schema class will be refactored into:

Code Block
languagejava
titleSchema
public class Schema {

	// package visible variables used to build TableSchema
	protected final List<Column> columns;
	protected final WatermarkInfo watermarkInfo;
	protected final List<String> primaryKey;

	private Schema(List<Column> columns, WatermarkInfo watermarkInfo, List<String> primaryKey) {
		this.columns = columns;
		this.watermarkInfo = watermarkInfo;
		this.primaryKey = primaryKey;
	}

	public static SchemaBuilder newBuilder() {
		return new SchemaBuilder();
	}

	public static class SchemaBuilder {
		List<Column> columns = new ArrayList<>();
		WatermarkInfo watermarkInfo;
		List<String> primaryKey;

		private SchemaBuilder() {
		}

		/**
		 * Adds a column with the column name and the data type.
		 */
		public SchemaBuilder column(String fieldName, AbstractDataType<?> fieldType) {
			columns.add(new PhysicalColumn(fieldName, fieldType));
			return this;
		}

		public SchemaBuilder column(String fieldName, Expression expr) {
			columns.add(new VirtualColumn(fieldName, expr));
			return this;
		}

		public SchemaBuilder primaryKey(String... fieldNames) {
			this.primaryKey = Arrays.asList(fieldNames);
			return this;
		}

		public SchemaBuilder watermark(String rowtimeField, Expression watermarkExpr) {
			this.watermarkInfo = new WatermarkInfo(rowtimeField, watermarkExpr);
			return this;
		}

		public Schema build() {
			return new Schema(columns, watermarkInfo, primaryKey);
		}
	
Code Block
languagejava
titleSchema
/**
 * Describes a schema of a table.
 */
@PublicEvolving
public class Schema {

	/**
	 * Adds a column with the column name and the data type.
	 */
	public Schema column(String columnName, DataType columnType) {...}

	/**
	 * Adds a computed column with the column name and the column Expression.
	 */
	public Schema column(String columnName, Expression columnExpr) {...}


	/**
	 * Specifies the primary key constraint for a set of given columns.
	 */
	public Schema primaryKey(String... columnNames) {...}

	/**
	 * Specifies the watermark strategy for rowtime attribute.
	 */
	public SchemaWithWatermark watermarkFor(String rowtimeColumn, Expression watermarkExpr) {...}
}


Implementation

I propose to only support this in blink planner as we are going to drop old planner in the near future and old planner doesn't support FLIP-95 connectors.

...

TableDescriptor stores the meta information in the package-visible member fields/methods, e.g. schema, partitionedKeys, connectorOptions()options, so does the Schema class. 

...

Current InterfaceNew Interface
.field("name", DataTypes.STRING()).column("name", DataTypes.STRING())
.field("name", "STRING").column("name", DataTypes.STRING())
.field("proctime", DataTypes.TIMESTAMP(3)).proctime().column("proc", proctime())
.from("originalName").column("newName", $("originalName"))
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicAscending()).watermarkFor("time", $("time"))
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicBounded(2)).watermarkForwatermark("time", $("time").minus(lit(2).milli()))
rowtime.timestampsFromExtractor(TimestampExtractor).column(fieldName, expr).watermarkForwatermark(fieldName, watermarkExpr)
rowtime.watermarksFromStrategy(WatermarkStrategy).watermarkForwatermark(fieldName, watermarkExpr)
rowtime.watermarksFromSource()removed (never implemented by any connectors before)
rowtime.timestampsFromSource()removed (never implemented by any connectors before)
.toProperties()removed (not needed anymore)

...

Current InterfaceNew Interface
.withSchema(new Schema()).schema(new Schema())
.withFormat(new Json())new Kafka()KafkaConnector.newBuilder.format(new JsonJsonFormat.newInstance()).build()
.withPartitionKeys(Arrays.asList(a, b, c)).partitionedBy(a, b, c)
.createTemporaryTable(path)tEnv.createTemporaryTable(path, descriptor)
.inAppendMode()removed (not needed anymore)
.inRetractMode()removed (not needed anymore)
.inUpsertMode()removed (not needed anymore)
.toProperties()removed (not needed anymore)

...