Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
// register a table using specific descriptor
tEnv.createTemporaryTable(
	"MyTable",
	KafkaConnector.newBuilder()
		.version("0.11")
		.topic("user_logs")
		.property("bootstrap.servers", "localhost:9092")
		.property("group.id", "test-group")
		.startFromEarliest()
		.sinkPartitionerRoundRobin()
		.format(JsonFormat.newBuilder().ignoreParseErrors(false).build())
		.schema(
			Schema.newBuilder()
				.column("user_id", DataTypes.BIGINT())
				.column("user_name", DataTypes.STRING())
				.column("score", DataTypes.DECIMAL(10, 2))
				.column("log_ts", DataTypes.STRING())
				.column("part_field_0", DataTypes.STRING())
				.column("part_field_1", DataTypes.INT())
				.column("proc", proctime()) // define a processing-time attribute with column name "proc"
				.column("ts", toTimestamp($("log_ts")))
				.watermark("ts", $("ts").minus(lit(3).seconds()))
				.primaryKey("user_id")
				.build())
		.partitionedBy("part_field_0", "part_field_1")  // Kafka doesn't support partitioned table yet, this is just an example for the API
		.build()
);

...

Code Block
languagejava
// register a table using general purpose Connector descriptor, this would be helpful for custom source/sinks
tEnv.createTemporaryTable(
	"MyTable",
	Connector.of("kafka-0.11")
		.option("topic", "user_logs")
		.option("properties.bootstrap.servers", "localhost:9092")
		.option("properties.group.id", "test-group")
		.option("scan.startup.mode", "earliest")
		.option("format", "json")
		.option("json.ignore-parse-errors", "true")
		.option("sink.partitioner", "round-robin")
		.schema(
			Schema.newBuilder()
				.column("user_id", DataTypes.BIGINT())
				.column("user_name", DataTypes.STRING())
				.column("score", DataTypes.DECIMAL(10, 2))
				.column("log_ts", DataTypes.STRING())
				.column("part_field_0", DataTypes.STRING())
				.column("part_field_1", DataTypes.INT())
				.column("proc", proctime()) // define a processing-time attribute with column name "proc"
				.column("ts", toTimestamp($("log_ts")))
				.watermark("ts", $("ts").minus(lit(3).seconds()))
				.primaryKey("user_id")
				.build())
		.partitionedBy("part_field_0", "part_field_1") // Kafka doesn't support partitioned table yet, this is just an example for the API
		.build()
);

...

Code Block
languagejava
tEnv.createTemporaryTable(
	"OrdersInKafka",
	KafkaConnector.newBuilder()
		.topic("user_logs")
		.property("bootstrap.servers", "localhost:9092")
		.property("group.id", "test-group")
		.format(JsonFormat.newInstance())
		.schema(
			Schema.newBuilder()
				.column("user_id", DataTypes.BIGINT())
				.column("score", DataTypes.DECIMAL(10, 2))
				.column("log_ts", DataTypes.TIMESTAMP(3))
				.column("ts", toTimestamp($("log_ts")))
				.build())
		.build()
);

tEnv.createTemporaryTable(
	"OrdersInFilesystem",
	Connector.of("filesystem")
		.option("path", "path/to/whatever")
		.schema(
			Schema.newBuilder()
				.watermark("ts", $("ts").minus(lit(3).seconds()))
				.build())
		.like("OrdersInKafka", LikeOption.EXCLUDING.ALL, LikeOption.INCLUDING.GENERATED)
		.build()
);

...

Code Block
languagejava
tableEnv.connect(
        new Kafka() // can be replaced by new Connector("kafka-0.11")
            .version("0.11")
            .topic("myTopic")
            .property("bootstrap.servers", "localhost:9092")
            .property("group.id", "test-group")
            .startFromEarliest()
            .sinkPartitionerRoundRobin()
            .format(new Json().ignoreParseErrors(false))
    .schema(
        new Schema()
            .column("user_id", DataTypes.BIGINT())
            .column("user_name", DataTypes.STRING())
            .column("score", DataTypes.DECIMAL(10, 2))
            .column("log_ts", DataTypes.TIMESTAMP(3))
            .column("part_field_0", DataTypes.STRING())
            .column("part_field_1", DataTypes.INT())
            .column("proc", proctime())
            .column("my_ts", toTimestamp($("log_ts")) 
            .watermarkFor("my_ts", $("my_ts").minus(lit(3).seconds())))
            .primaryKey("user_id")
    .partitionedBy("part_field_0", "part_field_1")
    .createTemporaryTable("MyTable");

...