Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The overall execution process is shown in the figure above.

...


Detailed execution process

Steps:

  1. Create sink table(stage table) through Catalog's new API createStageTable.
  2. Construct CTASJobStatusHook with Catalog as a construction parameter, CTASJobStatusHook is an implementation of the JobStatusHook interface.
  3. Register CTASJobStatusHook with StreamGraph, then passed to JobGraph and serialized(Need to implement serialization/deserialization of Catalog and JobStatusHook).
  4. When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method, and we call the Catalog#createTable method in the CTASJobStatusHook#onCreated method.
  5. When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method, we call the Catalog#dropTable method in the CTASJobStatusHook#onFailed method.
  6. When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method, we call the Catalog#dropTable method in the CTASJobStatusHook#onCanceled method.
  7. When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.

...

@PublicEvolving
public interface TableEnvironment {
    /**
* Registers the given {@link Table}'s result as a catalog table with {@link TableDescriptor}'s options.
*
* <p> CTAS for Table API.
*
* <p>Examples:
*
* <pre>{@code
* Map<String, String> options = new HashMap<String, String>();
* options.put("connector.type", "hudi");
* tEnv.createTable("MyTable", options, tEnv.sqlQuery("select id, name from user_table"));
* }</pre>
*
* @param path The path under which the table will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
* @param options Table options.
* @param query The {@link Table} object describing the pipeline for further transformations.
*/
void createTable(String path, Map<String, String> options, Table query);
}

Catalog

Providing method that are used to create stage table.

@PublicEvolving
public interface Catalog {

    /**
* Creates a new stage table for CTAS.
* This table will be recorded in the Catalog's memory
* and will be created when the Job status changes to Created;
* will be dropped when the job status changes to FAILED or CANCELED.
*
* <p>The framework will make sure to call this method with fully validated {@link
* ResolvedCatalogTable}. Those instances are easy to serialize
* for a durable catalog implementation.
*
* @param tablePath path of the table to be created
* @param table the table definition
* @param ignoreIfExists flag to specify behavior when a table already exists at the
* given path: if set to false, it throws a TableAlreadyExistException, if set to true, do
* nothing.
* @throws TableAlreadyExistException if table already exists and ignoreIfExists is false
* @throws DatabaseNotExistException if the database in tablePath doesn't exist
* @throws CatalogException in case of any runtime exception
*/
void createStageTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;
}


Compatibility, Deprecation, and Migration Plan

...