Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We give a migration plan for users who are still using TableEnvironment#connect and want to migrate to new Descriptor API. The following tables list the API changes:


Schema API Changes

Current InterfaceNew Interface
.field("name", DataTypes.STRING()).column("name", DataTypes.STRING())
.field("name", "STRING").column("name", DataTypes.STRING())
.field("proctime", DataTypes.TIMESTAMP(3)).proctime().proctime("proctime")
.from("originalName").computedColumn("newName", "originalName")
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicAscending()).watermarkFor("time").ascendingTimestamps()
.rowtime(new Rowtime().timestampsFromField("time").watermarksPeriodicBounded(2)).watermarkFor("time").boundedOutOfOrderTimestamps(Duration.ofMillis(2))
rowtime.timestampsFromExtractor(TimestampExtractor).computedColumn(fieldName, expr).watermarkFor(fieldName)
rowtime.watermarksFromStrategy(WatermarkStrategy).watermarkFor(rowtimeField).as(watermarkExpr)
rowtime.watermarksFromSource()removed (never implemented by any connectors before)
rowtime.timestampsFromSource()removed (never implemented by any connectors before)
.toProperties()removed (not needed anymore)

ConnectTableDescriptor API Changes

...

Current InterfaceNew Interface
.withSchema(new Schema()).schema(new Schema())
.withFormat(new Json())new Kafka().format(new Json())
.withPartitionKeys(Arrays.asList(a, b, c)).partitionedBy(a, b, c)
.createTemporaryTable(path)tEnv.createTemporaryTable(path, descriptor)
.inAppendMode()removed (not needed anymore)
.inRetractMode()removed (not needed anymore)
.inUpsertMode()removed (not needed anymore)
.toProperties()removed (not needed anymore)

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives


Rejected Alternatives

Keep and follow the original TableEnvironment#connect API

For example, a minor refactored TableEnvironment#connect:

tableEnv.connect(
        new Kafka() // can be replaced by new Connector("kafka-0.11")
            .version("0.11")
            .topic("myTopic")
            .property("bootstrap.servers", "localhost:9092")
            .property("group.id", "test-group")
            .startFromEarliest()
            .sinkPartitionerRoundRobin()
            .format(new Json().ignoreParseErrors(false))
    .schema(
        new Schema()
            .column("user_id", DataTypes.BIGINT())
            .column("user_name", DataTypes.STRING())
            .column("score", DataTypes.DECIMAL(10, 2))
            .column("log_ts", DataTypes.TIMESTAMP(3))
            .column("part_field_0", DataTypes.STRING())
            .column("part_field_1", DataTypes.INT())
            .proctime("proc")
            .computedColumn("my_ts", "TO_TIMESTAMP(log_ts)") 
            .watermarkFor("my_ts").boundedOutOfOrderTimestamps(Duration.ofSeconds(5)))
            .primaryKey("user_id")
    .partitionedBy("part_field_0", "part_field_1")
    .createTemporaryTable("MyTable");

However, we would like to further refactor the method of TableEnvironment#connect, because ..If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.