Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


  1. Create a StagedTable based on the schema of the query result, but it is not visible in the catalog.
  2. Execute the spark task and write the result into StagedTable.
  3. If all Spark tasks are executed successfully, call StagedTable#commitStagedChanges(), then it is visible in the catalog.
  4. If the execution fails, call StagedTable#abortStagedChanges().

Implementation Plan

Execution Flow


  1. Create the sink table  in the catalog based on the schema of the query result.
  2. Start the job and write the result to target.
  3. If the job executes successfully, then make data visible.
  4. If the job execution fails, then drop the sink table or delete data.(This capability requires runtime module support, such as hook, and SQL passes relevant parameters to the runtime module.)


The execution flow of streaming and batch is similar, the main differences are in atomicity and data visibility


Since streaming job are long-running, usually data is to be consumed downstream in real time. Determined by the specific Sink implementation.

  • Data is visible after Checkpoint is


  • success or visible immediately after writing.
  • In stream semantics, the data is as continuous as possible, strict atomicity is not guaranteed. Therefore, when the job fails, there is a high probability that the sink does not need to drop the table.


The batch job will end . In order to guarantee atomicity, we usually write the results in a temporary directorywith disabled checkpoint, so we want the data to be visible after the job is success, and drop the table if the job fails.

Some external storage systems cannot be supported, such as Redis.

We will refer to spark DataSource v1 implementation.

  • Provides atomic capabilities, if the job fails, drop the table.(Requires runtime module support, when the job finally fails, notify the sink to clean up. )
  • Data visibility depends on the specific external storage, and can be divided into write-visible, final visibility, and incremental visibility. (Described in the Data Visibility section)

Drop the table if the job fails requires some additional support(both Streaming and Batch):

  • TableSink needs to provide the CleanUp API, developers implement as needed. Do nothing by default. If an exception occurs, can use this API to drop table or delete the temporary directory, etc.


This requires runtime module support, like the description in the implementation of batch modeExecution Flow.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:


  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Supports transactions or twoTwo-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).


We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:


In-memory Catalog,the options of the table are completely dependent on user input.

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.

For details, please refer to the Table Store docs:

CTAS supports Managed Table and Non-Managed Table.

Users need to be clear about their business needs and set the table options correctlywe should check the table's options, avoid users not setting configuration parameters.

Public API Changes

Table Environment


public interface TableEnvironment {
* Registers the given {@link Table}'s result as a catalog table with {@link TableDescriptor}'s options.
* <p> CTAS for Table API.
* <p>Examples:
* <pre>{@code
* tEnv.createTable("MyTable", TableDescriptor.forConnector("hive")Map<String, String> options = new HashMap<String, String>();
* options.put("connector.type", "hudi");
* tEnv.createTable("MyTable", options,"select id, name from user_table"));
* }</pre>
* @param path The path under which the table will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param options Table options.
* @param query The {@link Table} object describing the pipeline for further transformations.
void createTable(String path, Map<String, String> options, Table query);
