...
The overall execution process is shown in the figure above.
...
Detailed execution process
Steps:
- Create sink table(stage table) through Catalog's new API createStageTable.
- Construct CTASJobStatusHook with Catalog as a construction parameter, CTASJobStatusHook is an implementation of the JobStatusHook interface.
- Register CTASJobStatusHook with StreamGraph, then passed to JobGraph and serialized(Need to implement serialization/deserialization of Catalog and JobStatusHook).
- When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method, and we call the Catalog#createTable method in the CTASJobStatusHook#onCreated method.
- When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method, we call the Catalog#dropTable method in the CTASJobStatusHook#onFailed method.
- When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method, we call the Catalog#dropTable method in the CTASJobStatusHook#onCanceled method.
- When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.
...
@PublicEvolving public interface TableEnvironment { /** * Registers the given {@link Table}'s result as a catalog table with {@link TableDescriptor}'s options. * * <p> CTAS for Table API. * * <p>Examples: * * <pre>{@code * Map<String, String> options = new HashMap<String, String>(); * options.put("connector.type", "hudi"); * tEnv.createTable("MyTable", options, tEnv.sqlQuery("select id, name from user_table")); * }</pre> * * @param path The path under which the table will be registered. See also the {@link * TableEnvironment} class description for the format of the path. * @param options Table options. * @param query The {@link Table} object describing the pipeline for further transformations. */ void createTable(String path, Map<String, String> options, Table query);
} |
Catalog
Providing method that are used to create stage table.
@PublicEvolving public interface Catalog {
/** * Creates a new stage table for CTAS. * This table will be recorded in the Catalog's memory * and will be created when the Job status changes to Created; * will be dropped when the job status changes to FAILED or CANCELED. * * <p>The framework will make sure to call this method with fully validated {@link * ResolvedCatalogTable}. Those instances are easy to serialize * for a durable catalog implementation. * * @param tablePath path of the table to be created * @param table the table definition * @param ignoreIfExists flag to specify behavior when a table already exists at the * given path: if set to false, it throws a TableAlreadyExistException, if set to true, do * nothing. * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false * @throws DatabaseNotExistException if the database in tablePath doesn't exist * @throws CatalogException in case of any runtime exception */ void createStageTable(ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException; } |
Compatibility, Deprecation, and Migration Plan
...