Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

// Setup environments
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

// Create a custom DataStream
final DataStream<Row> inputStream = env.fromElements(
  Row.of(RowKind.UPDATE_AFTER, "Alice"),
  Row.of(RowKind.UPDATE_AFTER, "Bob"));

// Assign a unique uid() for the stream
inputStream.uid("my-stream");

// Read JSON plan file or compile + write it first
// We use a Supplier here but of course this can also be completely custom logic
CompiledPlan plan = CompiledPlan.fromJsonFilefromFile(
  "/my/path/my_flink_job.json",
  () -> {
    tableEnv
      .fromDataStream(inputStream)
      .select($("f0").count())
      // the caller will write out this plan into a file
      .compilePlan(); 
  });

DataStream<Row> outputStream = tableEnv.asDataStream(plan, Row.class);

outputStream.executeAndCollect();

...

// Read string from local file system
CompiledPlan.fromJsonFilefromFile(String path)
CompiledPlan.fromJsonFilefromFile(Path path)

// We provide a supplier pattern for convenience
CompiledPlan.fromJsonFilefromFile(Path path, Supplier<CompiledPlan> ifNotExist)

// Or create with a string
CompiledPlan.fromJsonString(String json)

// Access the raw content
// (the CompiledPlan.toString might be used for a summary representation)
CompiledPlan.asJsonString(): String

// Use convenience method to write it to a file
CompiledPlan.writeToFile(String path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)
CompiledPlan.writeToFile(Path path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)

...

// --------------------
// COMPILE
// --------------------

// Interface
TableEnvironment.compilePlanSql(String): CompiledPlan

// Example
tableEnv.compilePlanSql("SELECT * FROM T1");
tableEnv.compilePlanSql("INSERT INTO T2 SELECT * FROM T1");

// --------------------
// RESTORE
// --------------------

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromJsonFilefromFile("/my/path")).print();

SQL Query and Table Query

// --------------------
// COMPILE
// --------------------

// Interface
Table.compilePlan(): CompiledPlan

// Example
tableEnv.sqlQuery("SELECT * FROM T1").compilePlan();
tableEnv.from("T1").select($("name")).compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface
TableEnvironment.fromPlan(CompiledPlan): Table

// Example
tableEnv
  .fromPlan(CompiledPlan.fromJsonFilefromFile("/my/path"))
  .select($("name"))
  .execute()
  .print();

...

// --------------------
// COMPILE
// --------------------

// Interface
StatementSet.compilePlan(): CompiledPlan

// Example
tableEnv
  .createStatementSet()
  .addInsertSql("INSERT INTO T2 SELECT * FROM T1")
  .addInsertSql("INSERT INTO T3 SELECT * FROM T1")
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Same API as single SQL queries.

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromJsonFilefromFile("/my/path")).print();

Table with Sink

// Refactor the Table.executeInsert() into declarative insertInto() methods
// because otherwise we would need to add a lot of overloading

// This makes the API way easier and solves other shortcomings of the current design.
Table.insertInto(String targetPath): TablePipeline
Table.insertInto(String targetPath, boolean overwrite): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor, boolean overwrite): TablePipeline

TablePipeline.compilePlan(): CompiledPlan
TablePipeline.execute(): TableResult

// Nice improvement on the way,
// this makes the statement set API more concise.
// No need for several overloaded methods any more.
// This method fits perfectly next to `addInsertSql`.
StatementSet.addInsert(TablePipeline)

// --------------------
// COMPILE
// --------------------

// Interface
TablePipeline.compilePlan(): CompiledPlan

// Example
tableEnv
  .from("T1")
  .insertInto("T2")
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Same as single SQL queries.

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromJsonFilefromFile("/my/path")).print();

Statement Sets added to DataStream API

// --------------------
// RESTORE
// --------------------

// Interface
StatementSet.addPlan(CompiledPlan)

// Example
tableEnv.createStatementSet()
  .addPlan(CompiledPlan.fromJsonFilefromFile("/my/path"))
  .attachAsDataStream();

...

// Refactor also the insertInto DataStream methods,
// because otherwise we would need to add a lot of overloading.

StreamTableEnvironment.insertIntoDataStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoDataStream(Table, AbstractDataType): TableStreamPipeline<T>
StreamTableEnvironment.insertIntoChangelogStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema, ChangelogMode): TableStreamPipeline<Row>

TableStreamPipeline.compilePlan(): CompiledPlan
// the naming `asDataStream` is consistent with StreamStatementSet.attachAsDataStream
TableStreamPipeline.asDataStream(): DataStream<T>

// We can also leverage this new API to get a DataStream side output
// within a StreamStatementSet.
// This is future work but has been requested multiple times by users.

// --------------------
// COMPILE
// --------------------

// Interface
TableStreamPipeline.compilePlan(): CompiledPlan

// Example
tableEnv
  .insertIntoDataStream(tableEnv.from("T1"))
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface
StreamTableEnvironment.asDataStream(CompiledPlan, Class): DataStream<T>

// Example
tableEnv.asDataStream(CompiledPlan.fromJsonFilefromFile("/my/path"), Row.class).map(...).executeAndCollect();

...

// --------------------
// COMPILE
// --------------------

// Example

DataStream<Row> stream1 = env.fromElements(1, 2, 3);

stream1.uid("stream-1"); // compilation would fail if uid() is not set

tableEnv
  .fromDataStream(stream1)
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface

// We use non-SQL terminology to differentiate from catalog operations.
// Similar to TableEnvironment.registerCatalog().
StreamTableEnvironment.registerDataStream(DataStream<?> dataStreamWithUid)
StreamTableEnvironment.unregisterDataStreams()

// Example

DataStream<Row> stream1 = env.fromElements(1, 2, 3);

stream1.uid("stream-1");

tableEnv.registerDataStream(stream1);

tableEnv.fromPlan(CompiledPlan.fromJsonFilefromFile("/my/path")).execute().print();

...