Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Create a StagedTable based on the schema of the query result, but it is not visible in the catalog.
  2. Execute the spark task and write the result into StagedTable.
  3. If all Spark tasks are executed successfully, call StagedTable#commitStagedChanges(), then it is visible in the catalog.
  4. If the execution fails, call StagedTable#abortStagedChanges().

Implementation Plan

Supported Job Mode

Support both streaming and batch mode.

In order to guarantee atomicity, there will be differences in implementation details.

Execution Flow

Image Modified

Steps:

  1. Create the sink table  in the catalog based on the schema of the query result.
  2. Start the job and write the result to target.
  3. If the job executes successfully, then make data visible.
  4. If the job execution fails, then drop the sink table or delete data.(This capability requires runtime module support, such as hook, and SQL passes relevant parameters to the runtime module.)

Supported Job Mode

Support both streaming and batch mode.

The execution flow of streaming and batch is similar, the main differences are in atomicity and data visibility

Streaming

Since streaming job are long-running, the table needs to be created first.usually data is to be consumed downstream in real time. Determined by the specific Sink implementation.

  1. Data is visible after Checkpoint is complete or visible immediately after writing.
  2. In stream semantics
  3. Create the sink table  in the catalog based on the schema of the query result.
  4. Start the job and write the result to the sink table.

Batch

The batch job will end. In order to guarantee atomicity, we usually write the results in a temporary directory.

...

@PublicEvolving
public interface TableEnvironment {
    /**
* Registers the given {@link Table}'s result as a catalog table with {@link TableDescriptor}'s options.
*
* <p> CTAS for Table API.
*
* <p>Examples:
*
* <pre>{@code
* tEnv.createTable("MyTable", TableDescriptor.forConnector("hive")
* .build());
* }</pre>
*
* @param path The path under which the table will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param descriptoroptions Template for creating a {@link CatalogTable} instanceTable options.
* @param query The {@link Table} object describing the pipeline for further transformations.
*/
void createTable(String path, TableDescriptor descriptorMap<String, String> options, Table query);
}


Compatibility, Deprecation, and Migration Plan

...