THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
// register a table using specific descriptor tEnv.createTemporaryTable( "MyTable", KafkaConnector.newBuilder() .version("0.11") .topic("user_logs") .property("bootstrap.servers", "localhost:9092") .property("group.id", "test-group") .startFromEarliest() .sinkPartitionerRoundRobin() .format(JsonFormat.newBuilder().ignoreParseErrors(false).build()) .schema( Schema.newBuilder() .column("user_id", DataTypes.BIGINT()) .column("user_name", DataTypes.STRING()) .column("score", DataTypes.DECIMAL(10, 2)) .column("log_ts", DataTypes.STRING()) .column("part_field_0", DataTypes.STRING()) .column("part_field_1", DataTypes.INT()) .column("proc", proctime()) // define a processing-time attribute with column name "proc" .column("ts", toTimestamp($("log_ts"))) .watermark("ts", $("ts").minus(lit(3).seconds())) .primaryKey("user_id") .build()) .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API .build() ); |
...
Code Block | ||
---|---|---|
| ||
// register a table using general purpose Connector descriptor, this would be helpful for custom source/sinks tEnv.createTemporaryTable( "MyTable", Connector.of("kafka-0.11") .option("topic", "user_logs") .option("properties.bootstrap.servers", "localhost:9092") .option("properties.group.id", "test-group") .option("scan.startup.mode", "earliest") .option("format", "json") .option("json.ignore-parse-errors", "true") .option("sink.partitioner", "round-robin") .schema( Schema.newBuilder() .column("user_id", DataTypes.BIGINT()) .column("user_name", DataTypes.STRING()) .column("score", DataTypes.DECIMAL(10, 2)) .column("log_ts", DataTypes.STRING()) .column("part_field_0", DataTypes.STRING()) .column("part_field_1", DataTypes.INT()) .column("proc", proctime()) // define a processing-time attribute with column name "proc" .column("ts", toTimestamp($("log_ts"))) .watermark("ts", $("ts").minus(lit(3).seconds())) .primaryKey("user_id") .build()) .partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API .build() ); |
...
Code Block | ||
---|---|---|
| ||
tEnv.createTemporaryTable( "OrdersInKafka", KafkaConnector.newBuilder() .topic("user_logs") .property("bootstrap.servers", "localhost:9092") .property("group.id", "test-group") .format(JsonFormat.newInstance()) .schema( Schema.newBuilder() .column("user_id", DataTypes.BIGINT()) .column("score", DataTypes.DECIMAL(10, 2)) .column("log_ts", DataTypes.TIMESTAMP(3)) .column("ts", toTimestamp($("log_ts"))) .build()) .build() ); tEnv.createTemporaryTable( "OrdersInFilesystem", Connector.of("filesystem") .option("path", "path/to/whatever") .schema( Schema.newBuilder() .watermark("ts", $("ts").minus(lit(3).seconds())) .build()) .like("OrdersInKafka", LikeOption.EXCLUDING.ALL, LikeOption.INCLUDING.GENERATED) .build() ); |
...
Code Block | ||
---|---|---|
| ||
tableEnv.connect(
new Kafka() // can be replaced by new Connector("kafka-0.11")
.version("0.11")
.topic("myTopic")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group")
.startFromEarliest()
.sinkPartitionerRoundRobin()
.format(new Json().ignoreParseErrors(false))
.schema(
new Schema()
.column("user_id", DataTypes.BIGINT())
.column("user_name", DataTypes.STRING())
.column("score", DataTypes.DECIMAL(10, 2))
.column("log_ts", DataTypes.TIMESTAMP(3))
.column("part_field_0", DataTypes.STRING())
.column("part_field_1", DataTypes.INT())
.column("proc", proctime())
.column("my_ts", toTimestamp($("log_ts"))
.watermarkFor("my_ts", $("my_ts").minus(lit(3).seconds())))
.primaryKey("user_id")
.partitionedBy("part_field_0", "part_field_1")
.createTemporaryTable("MyTable"); |
...