THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
A full example will look like this:
...
language | java |
---|
...
tEnv.createTemporaryTable(
"MyTable",
...
new Kafka()
...
.version("0.11")
...
.topic("user_logs")
...
.property("bootstrap.servers", "localhost:9092")
...
.property("group.id", "test-group")
...
.startFromEarliest()
...
.sinkPartitionerRoundRobin()
...
.format(new Json().ignoreParseErrors(false))
...
.schema(
...
new Schema()
...
.column("user_id", DataTypes.BIGINT())
...
.column("user_name", DataTypes.STRING())
...
.column("score", DataTypes.DECIMAL(10, 2))
...
.column("log_ts", DataTypes.STRING())
...
.column("part_field_0", DataTypes.STRING())
...
.column("part_field_1", DataTypes.INT())
...
.proctime("proc")
...
...
...
...
...
...
...
...
...
.computedColumn("my_ts", "TO_TIMESTAMP(log_ts)") //
...
计算列
.watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5))
...
...
...
...
...
...
.primaryKey("user_id"))
...
.partitionedBy("part_field_0", "part_field_1")
...
);
tEnv.createTemporaryTable(
...
"MyTable",
...
new Connector("kafka-0.11")
...
.option("topic", "user_logs")
...
.option("properties.bootstrap.servers", "localhost:9092")
...
.option("properties.group.id", "test-group")
...
.option("scan.startup.mode", "earliest")
...
.option("format", "json")
...
.option("json.ignore-parse-errors", "true")
...
.option("sink.partitioner", "round-robin")
...
.schema(
...
new Schema()
...
.column("user_id", DataTypes.BIGINT())
...
.column("user_name", DataTypes.STRING())
...
.column("score", DataTypes.DECIMAL(10, 2))
...
.column("log_ts", DataTypes.STRING())
...
.column("part_field_0", DataTypes.STRING())
...
.column("part_field_1", DataTypes.INT())
...
.proctime("proc")
...
...
...
...
...
...
...
...
...
.computedColumn("my_ts", "TO_TIMESTAMP(log_ts)") //
...
计算列
.watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5))
...
.primaryKey("user_id"))
...
.partitionedBy("part_field_0", "part_field_1")
...
);
Additionally, we would like to propose two new methods for better usability for Table API users.
...