Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Schema schema = new Schema()
.column("user_id", DataTypes.BIGINT())
.column("score", DataTypes.DECIMAL(10, 2))
.column("ts", DataTypes.TIMESTAMP(3));
Table myKafka = tEnv.from(
new Kafka()
.version("0.11")
.topic("user_logs")
.property("bootstrap.servers", "localhost:9092")
.property("group.id", "test-group")
.startFromEarliest()
.sinkPartitionerRoundRobin()
.format(new Json().ignoreParseErrors(false))
.schema(schema)
);
// reading from kafka table and write into filesystem table
myKafka.executeInsert(
new Connector("filesystem")
.option("path", "/path/to/whatever")
.option("format", "json")
.schema(schema)
);

Proposed Changes

...