Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A full example will look like this:


Code Block
languagejava
// register a table using specific descriptor

...


tEnv.createTemporaryTable(

...


   "MyTable",

...


   new Kafka()

...


      .version("0.11")

...


      .topic("user_logs")

...


      .property("bootstrap.servers", "localhost:9092")

...


      .property("group.id", "test-group")

...


      .startFromEarliest()

...


      .sinkPartitionerRoundRobin()

...


      .format(new Json().ignoreParseErrors(false))

...


      .schema(

...


         new Schema()

...


            .column("user_id", DataTypes.BIGINT())

...


            .column("user_name", DataTypes.STRING())

...


            .column("score", DataTypes.DECIMAL(10, 2))

...


            .column("log_ts", DataTypes.STRING())

...


            .column("part_field_0", DataTypes.STRING())

...


            .column("part_field_1", DataTypes.INT())

...


            .

...

column("proc", proctime()) // define a processing-time attribute with column name "proc"

...


            .

...

column("my_ts", 

...

toTimestamp($("log_ts")

...

)  // computed column

...


            .watermarkFor("my_ts", $("my_ts").

...

minus(lit(3).seconds()))  // defines watermark and rowtime attribute

...


            .primaryKey("user_id"))

...


      .partitionedBy("part_field_0", "part_field_1")  // Kafka doesn't support partitioned table yet, this is just an example for the API

...


);


Code Block
languagejava
// register a table using general purpose Connector descriptor, this would be helpful for custom source/sinks

...


tEnv.createTemporaryTable(

...


   "MyTable",

...


   new Connector("kafka-0.11")

...


      .option("topic", "user_logs")

...


      .option("properties.bootstrap.servers", "localhost:9092")

...


      .option("properties.group.id", "test-group")

...


      .option("scan.startup.mode", "earliest")

...


      .option("format", "json")

...


      .option("json.ignore-parse-errors", "true")

...


      .option("sink.partitioner", "round-robin")

...


      .schema(

...


         new Schema()

...


            .column("user_id", DataTypes.BIGINT())

...


            .column("user_name", DataTypes.STRING())

...


            .column("score", DataTypes.DECIMAL(10, 2))

...


            .column("log_ts", DataTypes.STRING())

...


            .column("part_field_0", DataTypes.STRING())

...


            .column("part_field_1", DataTypes.INT())

...


            .

...

column("proc", proctime()) // define a processing-time attribute with column name "proc"

...


            .

...

column("my_ts", 

...

toTimestamp($("log_ts")

...

)  // computed column

...


            .watermarkFor("my_ts", $("my_ts").

...

minus(lit(3).seconds())) // defines watermark and rowtime attribute

...


            .primaryKey("user_id"))

...


      .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API

...


);





LIKE clause for Descriptor API

...

Here is a simple example to derive table from existing one:

Code Block
languagejava
tEnv.createTemporaryTable(

...


   "OrdersInKafka",

...


   new Kafka()

...


      .topic("user_logs")

...


      .property("bootstrap.servers", "localhost:9092")

...


      .property("group.id", "test-group")

...


      .format(new Json().ignoreParseErrors(false))

...


      .schema(

...


         new Schema()

...


            .column("user_id", DataTypes.BIGINT())

...


            .column("score", DataTypes.DECIMAL(10, 2))

...


            .column("log_ts", DataTypes.TIMESTAMP(3))

...


            .

...

column("my_ts", 

...

toTimestamp($("log_ts")

...

)

...


      )

...


);

...



tEnv.createTemporaryTable(

...


   "OrdersInFilesystem",

...


   new Connector("filesystem")

...


      .option("path", "path/to/whatever")

...


      .schema(

...


         new Schema()

...


            .watermarkFor("my_ts", $("my_ts").

...

minus(lit(3).seconds())))

...


      .like("OrdersInKafka", LikeOption.EXCLUDING.ALL, LikeOption.INCLUDING.GENERATED)

...


);





The above "OrdersInFilesystem" table will be equivalent to:

Code Block
languagejava
tEnv.createTemporaryTable(

...


   "OrdersInFilesystem",

...


   new Connector("filesystem")

...


      .option("path", "path/to/whatever")

...


      .schema(

...


         new Schema()

...


            .column("user_id", DataTypes.BIGINT())

...


            .column("score", DataTypes.DECIMAL(10, 2))

...


            .column("log_ts", DataTypes.TIMESTAMP(3))

...


            .

...

column("my_ts", 

...

toTimestamp($("log_ts")

...

)

...


            .watermarkFor("my_ts", $("my_ts").

...

minus(lit(3).seconds()))
      )

...


);


TableEnvironment#from() and Table#executeInsert()

...

With the above two methods, we can leverage the same TableDescriptor definition, then Table API users can skip the table registration step and can use the source/sink out-of-box. For example:

Code Block
languagejava
Schema schema = new Schema()

...


   .column("user_id", DataTypes.BIGINT())

...


   .column("score", DataTypes.DECIMAL(10, 2))

...


   .column("ts", DataTypes.TIMESTAMP(3));

...


Table myKafka = tEnv.from(

...


   new Kafka()

...


      .version("0.11")

...


      .topic("user_logs")

...


      .property("bootstrap.servers", "localhost:9092")

...


      .property("group.id", "test-group")

...


      .startFromEarliest()

...


      .sinkPartitionerRoundRobin()

...


      .format(new Json().ignoreParseErrors(false))

...


      .schema(schema)

...


);

...


// reading from kafka table and write into filesystem table

...


myKafka.executeInsert(

...


   new Connector("filesystem")

...


      .option("path", "/path/to/whatever")

...


      .option("format", "json")

...


      .schema(schema)

...


);


Proposed Changes

We will discuss in detail about the new interfaces/classes in this section.

...

Code Block
languagejava
titleSchema
/**
 * Describes a schema of a table.
 */
@PublicEvolving
public class Schema {

	/**
	 * Adds a column with the column name and the data type.
	 */
	public Schema column(String columnName, DataType columnType) {...}

	/**
	 * Adds a computed column with the column name and the SQL expression string.
	 */
	public Schema computedColumncolumn(String columnName, String sqlExpression) {...}

	/**
	 * Adds a processing-time column with the given column name.
	 */
	public Schema proctime(String columnName) {...}

	/**
	 * Specifies the primary key constraint for a set of given columns.
	 */
	public Schema primaryKey(String... columnNames) {...}

	/**
	 * Specifies the watermark strategy for rowtime attribute.
	 */
	public SchemaWithWatermark watermarkFor(String rowtimeColumn) {...}

	public static class SchemaWithWatermark {

		/**
		 * Specifies a custom watermark strategy using the given SQL expression string.
		 */
        public Schema as(String watermarkSqlExpr) {...}

		/**
		 * Specifies a watermark strategy for situations with monotonously ascending timestamps.
		 */
		public Schema ascendingTimestamps() {...}

		/**
		 * Specifies a watermark strategy for situations where records are out of order, but you can place
		 * an upper bound on how far the events are out of order. An out-of-order bound B means that
		 * once the an event with timestamp T was encountered, no events older than {@code T - B} will
		 * follow any more.
		 */
		public Schema boundedOutOfOrderTimestamps(Duration maxOutOfOrderness) {...}
	}
}

...

Current InterfaceNew Interface
.field("name", DataTypes.STRING()).column("name", DataTypes.STRING())
.field("name", "STRING").column("name", DataTypes.STRING())
.field("proctime", DataTypes.TIMESTAMP(3)).proctime().proctime("proctime")
.from("originalName").computedColumncolumn("newName", "originalName")
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicAscending()).watermarkFor("time").ascendingTimestamps()
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicBounded(2)).watermarkFor("time").boundedOutOfOrderTimestamps(Duration.ofMillis(2))
rowtime.timestampsFromExtractor(TimestampExtractor).computedColumncolumn(fieldName, expr).watermarkFor(fieldName)
rowtime.watermarksFromStrategy(WatermarkStrategy).watermarkFor(rowtimeField).as(watermarkExpr)
rowtime.watermarksFromSource()removed (never implemented by any connectors before)
rowtime.timestampsFromSource()removed (never implemented by any connectors before)
.toProperties()removed (not needed anymore)

...

For example, a minor refactored TableEnvironment#connect:

Code Block
languagejava
tableEnv.connect(
        new Kafka() // can be replaced by new Connector("kafka-0.11")
            .version("0.11")
            .topic("myTopic")
            .property("bootstrap.servers", "localhost:9092")
            .property("group.id", "test-group")
            .startFromEarliest()
            .sinkPartitionerRoundRobin()
            .format(new Json().ignoreParseErrors(false))
    .schema(
        new Schema()
            .column("user_id", DataTypes.BIGINT())
            .column("user_name", DataTypes.STRING())
            .column("score", DataTypes.DECIMAL(10, 2))
            .column("log_ts", DataTypes.TIMESTAMP(3))
            .column("part_field_0", DataTypes.STRING())
            .column("part_field_1", DataTypes.INT())
            .

...

column("proc", proctime())
            .

...

column("my_ts", 

...

toTimestamp($("log_ts")

...

) 
            .watermarkFor("my_ts", $("my_ts").

...

minus(lit(3).seconds())))
            .primaryKey("user_id")
    .partitionedBy("part_field_0", "part_field_1")
    .createTemporaryTable("MyTable");


However, we prefer "TableEnvironment#createTemporaryTable(path, descriptor)" instead of "TableEnvironment#connect", because

...