...
Code Block |
---|
|
// register a table using specific descriptor
tEnv.createTemporaryTable(
"MyTable",
new Kafka KafkaConnector.newBuilder()
.version("0.11")
.topic("user_logs")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group")
.startFromEarliest()
.sinkPartitionerRoundRobin()
.format(new JsonJsonFormat.newBuilder().ignoreParseErrors(false).build()
)
.schema(
new Schema()
Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("user_name", DataTypes.STRING())
.column("score", DataTypes.DECIMAL(10, 2))
.column("log_ts", DataTypes.STRING())
.column("part_field_0", DataTypes.STRING())
.column("part_field_1", DataTypes.INT())
.column("proc", proctime()) // define a processing-time attribute with column name "proc"
.column("my_ts", toTimestamp($("log_ts")) // computed column
.watermarkFor("my_
.watermark("ts", $("my_ts").minus(lit(3).seconds())) // defines watermark and rowtime attribute
.primaryKey("user_id")
.build())
.partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API
.build()
); |
Code Block |
---|
|
// register a table using general purpose Connector descriptor, this would be helpful for custom source/sinks
tEnv.createTemporaryTable(
"MyTable",
new Connector Connector.of("kafka-0.11")
.option("topic", "user_logs")
.option("properties.bootstrap.servers", "localhost:9092")
.option("properties.group.id", "test-group")
.option("scan.startup.mode", "earliest")
.option("format", "json")
.option("json.ignore-parse-errors", "true")
.option("sink.partitioner", "round-robin")
. .schema(
new Schema()
Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("user_name", DataTypes.STRING())
.column("score", DataTypes.DECIMAL(10, 2))
. .column("log_ts", DataTypes.STRING())
.column("part_field_0", DataTypes.STRING())
.column("part_field_1", DataTypes.INT())
.column("proc", proctime()) // define a processing-time attribute with column name "proc"
.column("my_ts", toTimestamp($("log_ts")) // computed column
.watermarkFor("my_
.watermark("ts", $("my_ts").minus(lit(3).seconds())) // defines watermark and rowtime attribute
.primaryKey("user_id")
.build()
)
.partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API
);
.build()
);
|
LIKE clause for Descriptor API
...
Code Block |
---|
|
tEnv.createTemporaryTable(
"OrdersInKafka",
new Kafka KafkaConnector.newBuilder()
.topic("user_logs")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group")
.format(new Json().ignoreParseErrors(falseJsonFormat.newInstance())
.schema(
new Schema()
Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("score", DataTypes.DECIMAL(10, 2))
.column("log_ts", DataTypes.TIMESTAMP(3))
.column("my_ts", toTimestamp($("log_ts"))
.build())
.build()
);
tEnv.createTemporaryTable(
"OrdersInFilesystem",
new Connector Connector.of("filesystem")
.option("path", "path/to/whatever")
.schema(
new Schema Schema.newBuilder()
.watermarkFor("my_ts", $("my_ .watermark("ts", $("ts").minus(lit(3).seconds()))
.build())
.like("OrdersInKafka", LikeOption.EXCLUDING.ALL, LikeOption.INCLUDING.GENERATED)
.build()
); |
The above "OrdersInFilesystem" table will be equivalent to:
Code Block |
---|
|
tEnv.createTemporaryTable(
"OrdersInFilesystem",
new Connector Connector.of("filesystem")
.option("path", "path/to/whatever")
.schema(
new Schema()
Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("score", DataTypes.DECIMAL(10, 2))
.column("log_ts", DataTypes.TIMESTAMP(3))
.column("my_ts", toTimestamp($("log_ts"))
.watermarkFor("my_ts", $("my_ts").minus(lit(3).seconds()))
.build())
.build()
); |
TableEnvironment#from() and Table#executeInsert()
...
Code Block |
---|
|
Schema schema = new Schema.newBuilder()
.column("user_id", DataTypes.BIGINT())
.column("score", DataTypes.DECIMAL(10, 2))
.column("ts", DataTypes.TIMESTAMP(3))
.build();
Table myKafka = tEnv.from(
new KafkaKafkaConnector.newBuilder()
.version("0.11")
.topic("user_logs")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group")
.startFromEarliest()
.sinkPartitionerRoundRobin()
.format(new JsonJsonFormat.newBuilder().ignoreParseErrors(false)).build())
.schema(schema)
.build()
);
// reading from kafka table and write into filesystem table
myKafka.executeInsert(
new Connector.of("filesystem")
.option("path", "/path/to/whatever")
.option("format", "json")
.schema(schema)
.build()
); |
Proposed Changes
We will discuss in detail about the new interfaces/classes in this section.
...
TableDescriptor
is an abstract class, it represents a SQL DDL strucutre or a CatalogTable. It can be divided into several parts: schema, partitionedKey, and options. The TableDescriptor
determines how to define schema and partitionedKeys, but leaves options to be implemented by subclasses. Specific connectors can extend to TableDescriptor
and provide handy methods to set connector options (e.g. Kafka#topic(..)
). We also propose to provide a built-in and general implementation of TableDescriptor
, i.e. Connector
. The Connector class provides a general option(String key, String value)
method, thus it can support arbitrary custom connector implementations (based on FLIP-95). The Connector class can reduce the effort of development of custom connectors without implementing a specific descriptor.
TableDescriptor & TableDescriptorBuilder
The current TableDescriptor
will be refactored into:
Code Block |
---|
language | java |
---|
title | TableDescriptor |
---|
|
/**
* Describes a table to connect. It is a same representation of SQL CREATE TABLE DDL.
*/
@PublicEvolving
public abstract class TableDescriptor {
/**
* Specifies the table schema.
*/
public final TableDescriptor schema(Schema schema) {...}
/**
* Specifies the partition keys of this table.
*/
public final TableDescriptor partitionedBy(String... columnNames) {...}
/**
* Extends some parts from the original regsitered table path.
*/
public final TableDescriptor like(String originalTablePath, LikeOption... likeOptions) {...}
/**
* Extends some parts from the original table descriptor.
*/
public final TableDescriptor like(TableDescriptor originalTableDescriptor, LikeOption... likeOptions) {... It only wraps meta information about a catalog table.
* Please use a specific {@link TableDescriptorBuilder} to build the {@link TableDescriptor}.
*/
@PublicEvolving
public abstract class TableDescriptor {
// package visible variables used internally to build a CatalogTable
protected List<String> partitionedFields;
protected Schema schema;
protected Map<String, String> options;
protected LikeOption[] likeOptions;
protected String likePath;
}
/**
* A basic builder implementation to build {@link TableDescriptor}.
*/
@PublicEvolving
public abstract class TableDescriptorBuilder<DESCRIPTOR extends TableDescriptor, BUILDER extends TableDescriptorBuilder<DESCRIPTOR, BUILDER>> {
private final DESCRIPTOR descriptor;
protected TableDescriptorBuilder(Class<DESCRIPTOR> descriptorClass) {
descriptor = InstantiationUtil.instantiate(descriptorClass, TableDescriptor.class);
}
/**
* SpecifiesReturns the connectorthis optionsbuilder ofinstance thisin table,the subclassestype should override this methodof subclass.
*/
protected abstract Map<String, String> connectorOptionsBUILDER self();
}
|
Code Block |
---|
language | java |
---|
title | LikeOption |
---|
|
public interface LikeOption {
enum INCLUDING implements LikeOption /**
* Specifies the table schema.
*/
public BUILDER schema(Schema schema) {
ALL,descriptor.schema = schema;
CONSTRAINTS,
GENERATED,
OPTIONS,
PARTITIONS,
WATERMARKS
}
enum EXCLUDING implements LikeOption {
ALL,
CONSTRAINTS,
GENERATED,
OPTIONS,
PARTITIONS,
WATERMARKS
}
enum OVERWRITING implements LikeOption {
GENERATED,
OPTIONS,
WATERMARKS
}
} |
Code Block |
---|
language | java |
---|
title | Connector |
---|
|
public class Connector extends TableDescriptor {
private final Map<String, String> options = new HashMap<>();
public Connector(String identifier) {
this.options.put(CONNECTOR.key(), identifier);
}
public Connector return self();
}
/**
* Specifies the partition keys of this table.
*/
public BUILDER partitionedBy(String... fieldNames) {
checkArgument(descriptor.partitionedFields.isEmpty(), "partitionedBy(...) shouldn't be called more than once.");
descriptor.partitionedFields.addAll(Arrays.asList(fieldNames));
return self();
}
/**
* Extends some parts from the original registered table path.
*/
public BUILDER like(String tablePath, LikeOption... likeOptions) {
descriptor.likePath = tablePath;
descriptor.likeOptions = likeOptions;
return self();
}
protected BUILDER option(String key, String value) {
descriptor.options.put(key, value);
return thisself();
}
/**
protected * Map<String,Returns String>created toConnectorOptions() {table descriptor.
*/
returnpublic newDESCRIPTOR HashMap<>build(options); {
return descriptor;
}
}
|
Code Block |
---|
language | java |
---|
title | KafkaLikeOption |
---|
|
public class Kafka extends TableDescriptorinterface LikeOption {
privateenum finalINCLUDING Map<String,implements String> options = new HashMap<>();
public Kafka() {
this.options.put(CONNECTOR.key(), "kafka");
}
public Kafka version(String version) {
this.options.put(CONNECTOR.key(), "kafka-" + version);
return this;LikeOption {
ALL,
CONSTRAINTS,
GENERATED,
OPTIONS,
PARTITIONS,
WATERMARKS
}
enum EXCLUDING implements LikeOption {
ALL,
CONSTRAINTS,
GENERATED,
OPTIONS,
PARTITIONS,
WATERMARKS
}
publicenum KafkaOVERWRITING topic(String topic)implements LikeOption {
this.options.put("topic", topic);GENERATED,
OPTIONS,
return this;WATERMARKS
}
} |
Code Block |
---|
language | java |
---|
title | Connector |
---|
|
public class Connector extends TableDescriptor {
public Kafkastatic ConnectorBuilder formatof(FormatDescriptorString formatDescriptoridentifier) {
this.options.putAll(formatDescriptor.toFormatOptions())return new ConnectorBuilder(identifier);
return this;
}
public static class ConnectorBuilder extends TableDescriptorBuilder<Connector, ConnectorBuilder> {
private ConnectorBuilder(String identifier) {
super(Connector...
} |
Schema
The current Rowtime
class will be removed. The current Schema
class will be refactored into:
class);
option(CONNECTOR.key(), identifier);
}
public ConnectorBuilder option(String key, String value) {
String lowerKey = key.toLowerCase().trim();
if (CONNECTOR.key().equals(lowerKey)) {
throw new IllegalArgumentException("It's not allowed to override 'connector' option.");
}
return super.option(key, value);
}
@Override
protected ConnectorBuilder self() {
return this;
}
}
} |
Code Block |
---|
|
/**
* Connector descriptor for the Apache Kafka message queue.
*/
@PublicEvolving
public class KafkaConnector extends TableDescriptor {
public static KafkaConnectorBuilder newBuilder() {
return new KafkaConnectorBuilder();
}
public static class KafkaConnectorBuilder extends TableDescriptorBuilder<KafkaConnector, KafkaConnectorBuilder> {
public KafkaConnectorBuilder() {
super(KafkaConnector.class);
}
/**
* Sets the Kafka version to be used.
*
* @param version Kafka version. E.g., "0.8", "0.11", etc.
*/
public KafkaConnectorBuilder version(String version) {
Preconditions.checkNotNull(version);
option(CONNECTOR.key(), "kafka-" + version);
return this;
}
/**
* Sets the topic from which the table is read.
*
* @param topic The topic from which the table is read.
*/
public KafkaConnectorBuilder topic(String topic) {
Preconditions.checkNotNull(topic);
option("topic", topic);
return this;
}
// TODO: add more methods
...
@Override
protected KafkaConnectorBuilder self() {
return this;
}
}
} |
Schema
The current Rowtime
class will be removed. The current Schema
class will be refactored into:
Code Block |
---|
|
public class Schema {
// package visible variables used to build TableSchema
protected final List<Column> columns;
protected final WatermarkInfo watermarkInfo;
protected final List<String> primaryKey;
private Schema(List<Column> columns, WatermarkInfo watermarkInfo, List<String> primaryKey) {
this.columns = columns;
this.watermarkInfo = watermarkInfo;
this.primaryKey = primaryKey;
}
public static SchemaBuilder newBuilder() {
return new SchemaBuilder();
}
public static class SchemaBuilder {
List<Column> columns = new ArrayList<>();
WatermarkInfo watermarkInfo;
List<String> primaryKey;
private SchemaBuilder() {
}
/**
* Adds a column with the column name and the data type.
*/
public SchemaBuilder column(String fieldName, AbstractDataType<?> fieldType) {
columns.add(new PhysicalColumn(fieldName, fieldType));
return this;
}
public SchemaBuilder column(String fieldName, Expression expr) {
columns.add(new VirtualColumn(fieldName, expr));
return this;
}
public SchemaBuilder primaryKey(String... fieldNames) {
this.primaryKey = Arrays.asList(fieldNames);
return this;
}
public SchemaBuilder watermark(String rowtimeField, Expression watermarkExpr) {
this.watermarkInfo = new WatermarkInfo(rowtimeField, watermarkExpr);
return this;
}
public Schema build() {
return new Schema(columns, watermarkInfo, primaryKey);
}
|
Code Block |
---|
|
/**
* Describes a schema of a table.
*/
@PublicEvolving
public class Schema {
/**
* Adds a column with the column name and the data type.
*/
public Schema column(String columnName, DataType columnType) {...}
/**
* Adds a computed column with the column name and the column Expression.
*/
public Schema column(String columnName, Expression columnExpr) {...}
/**
* Specifies the primary key constraint for a set of given columns.
*/
public Schema primaryKey(String... columnNames) {...}
/**
* Specifies the watermark strategy for rowtime attribute.
*/
public SchemaWithWatermark watermarkFor(String rowtimeColumn, Expression watermarkExpr) {...}
} |
Implementation
I propose to only support this in blink planner as we are going to drop old planner in the near future and old planner doesn't support FLIP-95 connectors.
...
TableDescriptor
stores the meta information in the package-visible member fields/methods, e.g. schema, partitionedKeys, connectorOptions()options
, so does the Schema
class.
...
Current Interface | New Interface |
---|
.field("name", DataTypes.STRING()) | .column("name", DataTypes.STRING()) |
.field("name", "STRING") | .column("name", DataTypes.STRING()) |
.field("proctime", DataTypes.TIMESTAMP(3)).proctime() | .column("proc", proctime()) |
.from("originalName") | .column("newName", $("originalName")) |
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicAscending()) | .watermarkFor("time", $("time")) |
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicBounded(2)) | .watermarkForwatermark("time", $("time").minus(lit(2).milli())) |
rowtime.timestampsFromExtractor(TimestampExtractor) | .column(fieldName, expr).watermarkForwatermark(fieldName, watermarkExpr) |
rowtime.watermarksFromStrategy(WatermarkStrategy) | .watermarkForwatermark(fieldName, watermarkExpr) |
rowtime.watermarksFromSource() | removed (never implemented by any connectors before) |
rowtime.timestampsFromSource() | removed (never implemented by any connectors before) |
.toProperties() | removed (not needed anymore) |
...
Current Interface | New Interface |
---|
.withSchema(new Schema()) | .schema(new Schema()) |
.withFormat(new Json()) | new Kafka()KafkaConnector.newBuilder.format(new JsonJsonFormat.newInstance()).build() |
.withPartitionKeys(Arrays.asList(a, b, c)) | .partitionedBy(a, b, c) |
.createTemporaryTable(path) | tEnv.createTemporaryTable(path, descriptor) |
.inAppendMode() | removed (not needed anymore) |
.inRetractMode() | removed (not needed anymore) |
.inUpsertMode() | removed (not needed anymore) |
.toProperties() | removed (not needed anymore) |
...