THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
tableEnv.connect( new Kafka() // can be replaced by new Connector("kafka-0.11") .version("0.11") .topic("myTopic") .property("bootstrap.servers", "localhost:9092") .property("group.id", "test-group") .startFromEarliest() .sinkPartitionerRoundRobin() .format(new Json().ignoreParseErrors(false)) .schema( new Schema() .column("user_id", DataTypes.BIGINT()) .column("user_name", DataTypes.STRING()) .column("score", DataTypes.DECIMAL(10, 2)) .column("log_ts", DataTypes.TIMESTAMP(3)) .column("part_field_0", DataTypes.STRING()) .column("part_field_1", DataTypes.INT()) .proctime("proc") .computedColumn("my_ts", "TO_TIMESTAMP(log_ts)") .watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5))) .primaryKey("user_id") .partitionedBy("part_field_0", "part_field_1") .createTemporaryTable("MyTable");
However, we would like to further refactor the method of TableEnvironment#connect, because ...prefer "TableEnvironment#createTemporaryTable(path, descriptor)" instead of "TableEnvironment#connect", because
- It may confuse users that the "connect()" method invoking doesn't connect to external system, it's just a start point to connect to external system. It is connected after the invoking of "createTemporaryTable".
- The "connect()" method looks weired in the methods of TableEnvironment, because all the other methods are SQL compliant. Thus, we think "tEnv#createTemporaryTable(path, descriptor)" is a better entry point than "connect()".
- The "TableEnvironment#createTemporaryTable(path, descriptor)" decouples Descriptor and table registration. We can easily support more features, like "TableEnvironment#from(descriptor)" and "Table#executeInsert(descriptor)" with the same descriptor interfaces/classes.