THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
// Setup environments
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
// Create a custom DataStream
final DataStream<Row> inputStream = env.fromElements(
Row.of(RowKind.UPDATE_AFTER, "Alice"),
Row.of(RowKind.UPDATE_AFTER, "Bob"));
// Assign a unique uid() for the stream
inputStream.uid("my-stream");
// Read JSON plan file or compile + write it first
// We use a Supplier here but of course this can also be completely custom logic
CompiledPlan plan = CompiledPlan.fromJsonFilefromFile(
"/my/path/my_flink_job.json",
() -> {
tableEnv
.fromDataStream(inputStream)
.select($("f0").count())
// the caller will write out this plan into a file
.compilePlan();
});
DataStream<Row> outputStream = tableEnv.asDataStream(plan, Row.class);
outputStream.executeAndCollect();
...
// Read string from local file system
CompiledPlan.fromJsonFilefromFile(String path)
CompiledPlan.fromJsonFilefromFile(Path path)
// We provide a supplier pattern for convenience
CompiledPlan.fromJsonFilefromFile(Path path, Supplier<CompiledPlan> ifNotExist)
// Or create with a string
CompiledPlan.fromJsonString(String json)
// Access the raw content
// (the CompiledPlan.toString might be used for a summary representation)
CompiledPlan.asJsonString(): String
// Use convenience method to write it to a file
CompiledPlan.writeToFile(String path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)
CompiledPlan.writeToFile(Path path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)
...
// --------------------
// COMPILE
// --------------------
// Interface
TableEnvironment.compilePlanSql(String): CompiledPlan
// Example
tableEnv.compilePlanSql("SELECT * FROM T1");
tableEnv.compilePlanSql("INSERT INTO T2 SELECT * FROM T1");
// --------------------
// RESTORE
// --------------------
// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult
// Example
tableEnv.executePlan(CompiledPlan.fromJsonFilefromFile("/my/path")).print();
SQL Query and Table Query
// --------------------
// COMPILE
// --------------------
// Interface
Table.compilePlan(): CompiledPlan
// Example
tableEnv.sqlQuery("SELECT * FROM T1").compilePlan();
tableEnv.from("T1").select($("name")).compilePlan();
// --------------------
// RESTORE
// --------------------
// Interface
TableEnvironment.fromPlan(CompiledPlan): Table
// Example
tableEnv
.fromPlan(CompiledPlan.fromJsonFilefromFile("/my/path"))
.select($("name"))
.execute()
.print();
...
// --------------------
// COMPILE
// --------------------
// Interface
StatementSet.compilePlan(): CompiledPlan
// Example
tableEnv
.createStatementSet()
.addInsertSql("INSERT INTO T2 SELECT * FROM T1")
.addInsertSql("INSERT INTO T3 SELECT * FROM T1")
.compilePlan();
// --------------------
// RESTORE
// --------------------
// Same API as single SQL queries.
// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult
// Example
tableEnv.executePlan(CompiledPlan.fromJsonFilefromFile("/my/path")).print();
Table with Sink
// Refactor the Table.executeInsert() into declarative insertInto() methods
// because otherwise we would need to add a lot of overloading
// This makes the API way easier and solves other shortcomings of the current design.
Table.insertInto(String targetPath): TablePipeline
Table.insertInto(String targetPath, boolean overwrite): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor, boolean overwrite): TablePipeline
TablePipeline.compilePlan(): CompiledPlan
TablePipeline.execute(): TableResult
// Nice improvement on the way,
// this makes the statement set API more concise.
// No need for several overloaded methods any more.
// This method fits perfectly next to `addInsertSql`.
StatementSet.addInsert(TablePipeline)
// --------------------
// COMPILE
// --------------------
// Interface
TablePipeline.compilePlan(): CompiledPlan
// Example
tableEnv
.from("T1")
.insertInto("T2")
.compilePlan();
// --------------------
// RESTORE
// --------------------
// Same as single SQL queries.
// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult
// Example
tableEnv.executePlan(CompiledPlan.fromJsonFilefromFile("/my/path")).print();
Statement Sets added to DataStream API
// --------------------
// RESTORE
// --------------------
// Interface
StatementSet.addPlan(CompiledPlan)
// Example
tableEnv.createStatementSet()
.addPlan(CompiledPlan.fromJsonFilefromFile("/my/path"))
.attachAsDataStream();
...
// Refactor also the insertInto DataStream methods,
// because otherwise we would need to add a lot of overloading.
StreamTableEnvironment.insertIntoDataStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoDataStream(Table, AbstractDataType): TableStreamPipeline<T>
StreamTableEnvironment.insertIntoChangelogStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema, ChangelogMode): TableStreamPipeline<Row>
TableStreamPipeline.compilePlan(): CompiledPlan
// the naming `asDataStream` is consistent with StreamStatementSet.attachAsDataStream
TableStreamPipeline.asDataStream(): DataStream<T>
// We can also leverage this new API to get a DataStream side output
// within a StreamStatementSet.
// This is future work but has been requested multiple times by users.
// --------------------
// COMPILE
// --------------------
// Interface
TableStreamPipeline.compilePlan(): CompiledPlan
// Example
tableEnv
.insertIntoDataStream(tableEnv.from("T1"))
.compilePlan();
// --------------------
// RESTORE
// --------------------
// Interface
StreamTableEnvironment.asDataStream(CompiledPlan, Class): DataStream<T>
// Example
tableEnv.asDataStream(CompiledPlan.fromJsonFilefromFile("/my/path"), Row.class).map(...).executeAndCollect();
...
// --------------------
// COMPILE
// --------------------
// Example
DataStream<Row> stream1 = env.fromElements(1, 2, 3);
stream1.uid("stream-1"); // compilation would fail if uid() is not set
tableEnv
.fromDataStream(stream1)
.compilePlan();
// --------------------
// RESTORE
// --------------------
// Interface
// We use non-SQL terminology to differentiate from catalog operations.
// Similar to TableEnvironment.registerCatalog().
StreamTableEnvironment.registerDataStream(DataStream<?> dataStreamWithUid)
StreamTableEnvironment.unregisterDataStreams()
// Example
DataStream<Row> stream1 = env.fromElements(1, 2, 3);
stream1.uid("stream-1");
tableEnv.registerDataStream(stream1);
tableEnv.fromPlan(CompiledPlan.fromJsonFilefromFile("/my/path")).execute().print();
...