Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

tableEnv.connect(
        new Kafka() // can be replaced by new Connector("kafka-0.11")
            .version("0.11")
            .topic("myTopic")
            .property("bootstrap.servers", "localhost:9092")
            .property("group.id", "test-group")
            .startFromEarliest()
            .sinkPartitionerRoundRobin()
            .format(new Json().ignoreParseErrors(false))
    .schema(
        new Schema()
            .column("user_id", DataTypes.BIGINT())
            .column("user_name", DataTypes.STRING())
            .column("score", DataTypes.DECIMAL(10, 2))
            .column("log_ts", DataTypes.TIMESTAMP(3))
            .column("part_field_0", DataTypes.STRING())
            .column("part_field_1", DataTypes.INT())
            .proctime("proc")
            .computedColumn("my_ts", "TO_TIMESTAMP(log_ts)") 
            .watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5)))
            .primaryKey("user_id")
    .partitionedBy("part_field_0", "part_field_1")
    .createTemporaryTable("MyTable");

However, we would like to further refactor the method of TableEnvironment#connect, because ...prefer "TableEnvironment#createTemporaryTable(path, descriptor)" instead of "TableEnvironment#connect", because

  1. It may confuse users that the "connect()" method invoking doesn't connect to external system, it's just a start point to connect to external system. It is connected after the invoking of "createTemporaryTable".
  2. The "connect()" method looks weired in the methods of TableEnvironment, because all the other methods are SQL compliant. Thus, we think "tEnv#createTemporaryTable(path, descriptor)" is a better entry point than "connect()".
  3. The "TableEnvironment#createTemporaryTable(path, descriptor)" decouples Descriptor and table registration. We can easily support more features, like "TableEnvironment#from(descriptor)" and "Table#executeInsert(descriptor)" with the same descriptor interfaces/classes.