Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In contrast to SQL, the Table API resolves tables, functions, and expressions eagerly for every operation. This means we cannot declare the pipeline fluently without interpreting it already. It makes idempotent statements such as `tableEnv.from("T1").select(...).compileAndExecute()` impossible as both `from()` and `select()` are interpreted eagerly. In programming languages, it is easy to first compile a plan and then pass the instance to an execute method. We offer a supplier method for this.

We have various locations where the optimizer is triggered:

  • single SQL statements
  • statement sets
  • statement sets that are added to DataStream API
  • Table API with sink
  • Table API with implicit collect() sink
  • Table API with DataStream sink (toDataStream and toChangelog).

In the first version, we might not implement all these methods but focus on the most important ones.

CompiledPlan

We provide the concept of a `CompiledPlan`. It is an immutable instance that represents a plan string.

// Read string from local file system
CompiledPlan.fromJsonFile(String path)
CompiledPlan.fromJsonFile(Path path)

// We provide a supplier pattern for convenience
CompiledPlan.fromJsonFile(Path path, Supplier<CompiledPlan> ifNotExist)

// Or create with a string
CompiledPlan.fromJsonString(String json)

// Access the raw content
// (the CompiledPlan.toString might be used for a summary representation)
CompiledPlan.asJsonString(): String

// Use convenience method to write it to a file
CompiledPlan.writeToFile(String path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)
CompiledPlan.writeToFile(Path path)
CompiledPlan.writeToFile(Path path, boolean ignoreIfExists)

Single SQL Statements

// --------------------
// COMPILE
// --------------------

// Interface
TableEnvironment.compilePlanSql(String): CompiledPlan

// Example
tableEnv.compilePlanSql("SELECT * FROM T1");
tableEnv.compilePlanSql("INSERT INTO T2 SELECT * FROM T1");

// --------------------
// RESTORE
// --------------------

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromJsonFile("/my/path")).print();

SQL Query and Table Query

// --------------------
// COMPILE
// --------------------

// Interface
Table.compilePlan(): CompiledPlan

// Example
tableEnv.sqlQuery("SELECT * FROM T1").compilePlan();
tableEnv.from("T1").select($("name")).compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface
TableEnvironment.fromPlan(CompiledPlan): Table

// Example
tableEnv
  .fromPlan(CompiledPlan.fromJsonFile("/my/path"))
  .select($("name"))
  .execute()
  .print();

Statement Sets

// --------------------
// COMPILE
// --------------------

// Interface
StatementSet.compilePlan(): CompiledPlan

// Example
tableEnv
  .createStatementSet()
  .addInsertSql("INSERT INTO T2 SELECT * FROM T1")
  .addInsertSql("INSERT INTO T3 SELECT * FROM T1")
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Same API as single SQL queries.

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromJsonFile("/my/path")).print();

Table with Sink

// Refactor the Table.executeInsert() into declarative insertInto() methods
// because otherwise we would need to add a lot of overloading

// This makes the API way easier and solves other shortcomings of the current design.
Table.insertInto(String targetPath): TablePipeline
Table.insertInto(String targetPath, boolean overwrite): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor): TablePipeline
Table.insertInto(TableDescriptor targetDescriptor, boolean overwrite): TablePipeline

TablePipeline.compilePlan(): CompiledPlan
TablePipeline.execute(): TableResult

// Nice improvement on the way,
// this makes the statement set API more concise.
// No need for several overloaded methods any more.
// This method fits perfectly next to `addInsertSql`.
StatementSet.addInsert(TablePipeline)

// --------------------
// COMPILE
// --------------------

// Interface
TablePipeline.compilePlan(): CompiledPlan

// Example
tableEnv
  .from("T1")
  .insertInto("T2")
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Same as single SQL queries.

// Interface
TableEnvironment.executePlan(CompiledPlan): TableResult

// Example
tableEnv.executePlan(CompiledPlan.fromJsonFile("/my/path")).print();

Statement Sets added to DataStream API

// --------------------
// RESTORE
// --------------------

// Interface
StatementSet.addPlan(CompiledPlan)

// Example
tableEnv.createStatementSet()
  .addPlan(CompiledPlan.fromJsonFile("/my/path"))
  .attachAsDataStream();

Table with DataStream API Sink

// Refactor also the insertInto DataStream methods,
// because otherwise we would need to add a lot of overloading.

StreamTableEnvironment.insertIntoDataStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoDataStream(Table, AbstractDataType): TableStreamPipeline<T>
StreamTableEnvironment.insertIntoChangelogStream(Table): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema): TableStreamPipeline<Row>
StreamTableEnvironment.insertIntoChangelogStream(Table, Schema, ChangelogMode): TableStreamPipeline<Row>

TableStreamPipeline.compilePlan(): CompiledPlan
// the naming `asDataStream` is consistent with StreamStatementSet.attachAsDataStream
TableStreamPipeline.asDataStream(): DataStream<T>

// We can also leverage this new API to get a DataStream side output
// within a StreamStatementSet.
// This is future work but has been requested multiple times by users.

// --------------------
// COMPILE
// --------------------

// Interface
TableStreamPipeline.compilePlan(): CompiledPlan

// Example
tableEnv
  .insertIntoDataStream(tableEnv.from("T1"))
  .compilePlan();

// --------------------
// RESTORE
// --------------------

// Interface
StreamTableEnvironment.asDataStream(CompiledPlan, Class): DataStream<T>

// Example
tableEnv.asDataStream(CompiledPlan.fromJsonFile("/my/path"), Row.class).map(...).executeAndCollect();


Note: We need to store the expected target data type in the JSON plan. This means a JSON representation is not only required for logical type but also conversion classes of all fields. We use a `Class` argument in `asDataStream` to additionally verify the output of the CompiledPlan.











Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

...