Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

A full example will look like this:

...

languagejava

...

tEnv.createTemporaryTable(
"MyTable",

...


new Kafka()

...


.version("0.11")

...


.topic("user_logs")

...


.property("bootstrap.servers", "localhost:9092")

...


.property("group.id", "test-group")

...


.startFromEarliest()

...


.sinkPartitionerRoundRobin()

...


.format(new Json().ignoreParseErrors(false))

...


.schema(

...


new Schema()

...


.column("user_id", DataTypes.BIGINT())

...


.column("user_name", DataTypes.STRING())

...


.column("score", DataTypes.DECIMAL(10, 2))

...


.column("log_ts", DataTypes.STRING())

...


.column("part_field_0", DataTypes.STRING())

...


.column("part_field_1", DataTypes.INT())

...


.proctime("proc")

...

  

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

.computedColumn("my_ts", "TO_TIMESTAMP(log_ts)")  // 

...

计算列
.watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5))

...

 

...

 

...

 

...

 

...

 

...

.primaryKey("user_id"))

...


.partitionedBy("part_field_0", "part_field_1")

...


);

tEnv.createTemporaryTable(

...


"MyTable",

...


new Connector("kafka-0.11")

...


.option("topic", "user_logs")

...


.option("properties.bootstrap.servers", "localhost:9092")

...


.option("properties.group.id", "test-group")

...


.option("scan.startup.mode", "earliest")

...


.option("format", "json")

...


.option("json.ignore-parse-errors", "true")

...


.option("sink.partitioner", "round-robin")

...


.schema(

...


new Schema()

...


.column("user_id", DataTypes.BIGINT())

...


.column("user_name", DataTypes.STRING())

...


.column("score", DataTypes.DECIMAL(10, 2))

...


.column("log_ts", DataTypes.STRING())

...


.column("part_field_0", DataTypes.STRING())

...


.column("part_field_1", DataTypes.INT())

...


.proctime("proc")

...

  

...

 

...

 

...

 

...

 

...

 

...

 

...

 

...

.computedColumn("my_ts", "TO_TIMESTAMP(log_ts)")  //

...

 计算列
.watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5))

...


.primaryKey("user_id"))

...


.partitionedBy("part_field_0", "part_field_1")

...


);


Additionally, we would like to propose two new methods for better usability for Table API users. 

...