...
A full example will look like this:
Code Block |
---|
|
// register a table using specific descriptor |
...
tEnv.createTemporaryTable( |
...
...
...
...
...
.property("bootstrap.servers", "localhost:9092") |
...
.property("group.id", "test-group") |
...
...
.sinkPartitionerRoundRobin() |
...
.format(new Json().ignoreParseErrors(false)) |
...
...
...
.column("user_id", DataTypes.BIGINT()) |
...
.column("user_name", DataTypes.STRING()) |
...
.column("score", DataTypes.DECIMAL(10, 2)) |
...
.column("log_ts", DataTypes.STRING()) |
...
.column("part_field_0", DataTypes.STRING()) |
...
.column("part_field_1", DataTypes.INT()) |
...
...
column("proc", proctime()) // define a processing-time attribute with column name "proc" |
...
...
...
...
...
.watermarkFor("my_ts", $("my_ts"). |
...
minus(lit(3).seconds())) // defines watermark and rowtime attribute |
...
...
.partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API |
...
Code Block |
---|
|
// register a table using general purpose Connector descriptor, this would be helpful for custom source/sinks |
...
tEnv.createTemporaryTable( |
...
...
new Connector("kafka-0.11") |
...
.option("topic", "user_logs") |
...
.option("properties.bootstrap.servers", "localhost:9092") |
...
.option("properties.group.id", "test-group") |
...
.option("scan.startup.mode", "earliest") |
...
.option("format", "json") |
...
.option("json.ignore-parse-errors", "true") |
...
.option("sink.partitioner", "round-robin") |
...
...
...
.column("user_id", DataTypes.BIGINT()) |
...
.column("user_name", DataTypes.STRING()) |
...
.column("score", DataTypes.DECIMAL(10, 2)) |
...
.column("log_ts", DataTypes.STRING()) |
...
.column("part_field_0", DataTypes.STRING()) |
...
.column("part_field_1", DataTypes.INT()) |
...
...
column("proc", proctime()) // define a processing-time attribute with column name "proc" |
...
...
...
...
...
.watermarkFor("my_ts", $("my_ts"). |
...
minus(lit(3).seconds())) // defines watermark and rowtime attribute |
...
...
.partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API |
...
LIKE clause for Descriptor API
...
Here is a simple example to derive table from existing one:
Code Block |
---|
|
tEnv.createTemporaryTable( |
...
...
...
...
.property("bootstrap.servers", "localhost:9092") |
...
.property("group.id", "test-group") |
...
.format(new Json().ignoreParseErrors(false)) |
...
...
...
.column("user_id", DataTypes.BIGINT()) |
...
.column("score", DataTypes.DECIMAL(10, 2)) |
...
.column("log_ts", DataTypes.TIMESTAMP(3)) |
...
...
...
...
...
...
...
tEnv.createTemporaryTable( |
...
...
new Connector("filesystem") |
...
.option("path", "path/to/whatever") |
...
...
...
.watermarkFor("my_ts", $("my_ts"). |
...
minus(lit(3).seconds()))) |
...
.like("OrdersInKafka", LikeOption.EXCLUDING.ALL, LikeOption.INCLUDING.GENERATED) |
...
The above "OrdersInFilesystem" table will be equivalent to:
Code Block |
---|
|
tEnv.createTemporaryTable( |
...
...
new Connector("filesystem") |
...
.option("path", "path/to/whatever") |
...
...
...
.column("user_id", DataTypes.BIGINT()) |
...
.column("score", DataTypes.DECIMAL(10, 2)) |
...
.column("log_ts", DataTypes.TIMESTAMP(3)) |
...
...
...
...
...
.watermarkFor("my_ts", $("my_ts"). |
...
minus(lit(3).seconds()))
) |
...
TableEnvironment#from() and Table#executeInsert()
...
With the above two methods, we can leverage the same TableDescriptor
definition, then Table API users can skip the table registration step and can use the source/sink out-of-box. For example:
Code Block |
---|
|
Schema schema = new Schema() |
...
.column("user_id", DataTypes.BIGINT()) |
...
.column("score", DataTypes.DECIMAL(10, 2)) |
...
.column("ts", DataTypes.TIMESTAMP(3)); |
...
Table myKafka = tEnv.from( |
...
...
...
...
.property("bootstrap.servers", "localhost:9092") |
...
.property("group.id", "test-group") |
...
...
.sinkPartitionerRoundRobin() |
...
.format(new Json().ignoreParseErrors(false)) |
...
...
...
// reading from kafka table and write into filesystem table |
...
...
new Connector("filesystem") |
...
.option("path", "/path/to/whatever") |
...
.option("format", "json") |
...
...
Proposed Changes
We will discuss in detail about the new interfaces/classes in this section.
...
Code Block |
---|
|
/**
* Describes a schema of a table.
*/
@PublicEvolving
public class Schema {
/**
* Adds a column with the column name and the data type.
*/
public Schema column(String columnName, DataType columnType) {...}
/**
* Adds a computed column with the column name and the SQL expression string.
*/
public Schema computedColumncolumn(String columnName, String sqlExpression) {...}
/**
* Adds a processing-time column with the given column name.
*/
public Schema proctime(String columnName) {...}
/**
* Specifies the primary key constraint for a set of given columns.
*/
public Schema primaryKey(String... columnNames) {...}
/**
* Specifies the watermark strategy for rowtime attribute.
*/
public SchemaWithWatermark watermarkFor(String rowtimeColumn) {...}
public static class SchemaWithWatermark {
/**
* Specifies a custom watermark strategy using the given SQL expression string.
*/
public Schema as(String watermarkSqlExpr) {...}
/**
* Specifies a watermark strategy for situations with monotonously ascending timestamps.
*/
public Schema ascendingTimestamps() {...}
/**
* Specifies a watermark strategy for situations where records are out of order, but you can place
* an upper bound on how far the events are out of order. An out-of-order bound B means that
* once the an event with timestamp T was encountered, no events older than {@code T - B} will
* follow any more.
*/
public Schema boundedOutOfOrderTimestamps(Duration maxOutOfOrderness) {...}
}
} |
...
Current Interface | New Interface |
---|
.field("name", DataTypes.STRING()) | .column("name", DataTypes.STRING()) |
.field("name", "STRING") | .column("name", DataTypes.STRING()) |
.field("proctime", DataTypes.TIMESTAMP(3)).proctime() | .proctime("proctime") |
.from("originalName") | .computedColumncolumn("newName", "originalName") |
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicAscending()) | .watermarkFor("time").ascendingTimestamps() |
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicBounded(2)) | .watermarkFor("time").boundedOutOfOrderTimestamps(Duration.ofMillis(2)) |
rowtime.timestampsFromExtractor(TimestampExtractor) | .computedColumncolumn(fieldName, expr).watermarkFor(fieldName) |
rowtime.watermarksFromStrategy(WatermarkStrategy) | .watermarkFor(rowtimeField).as(watermarkExpr) |
rowtime.watermarksFromSource() | removed (never implemented by any connectors before) |
rowtime.timestampsFromSource() | removed (never implemented by any connectors before) |
.toProperties() | removed (not needed anymore) |
...
For example, a minor refactored TableEnvironment#connect:
Code Block |
---|
|
tableEnv.connect(
new Kafka() // can be replaced by new Connector("kafka-0.11")
.version("0.11")
.topic("myTopic")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group")
.startFromEarliest()
.sinkPartitionerRoundRobin()
.format(new Json().ignoreParseErrors(false))
.schema(
new Schema()
.column("user_id", DataTypes.BIGINT())
.column("user_name", DataTypes.STRING())
.column("score", DataTypes.DECIMAL(10, 2))
.column("log_ts", DataTypes.TIMESTAMP(3))
.column("part_field_0", DataTypes.STRING())
.column("part_field_1", DataTypes.INT())
. |
...
column("proc", proctime())
. |
...
...
...
)
.watermarkFor("my_ts", $("my_ts"). |
...
minus(lit(3).seconds())))
.primaryKey("user_id")
.partitionedBy("part_field_0", "part_field_1")
.createTemporaryTable("MyTable"); |
However, we prefer "TableEnvironment#createTemporaryTable(path, descriptor)" instead of "TableEnvironment#connect", because
...