...
Proposing a public interface CreateOrReplaceTable used by CTAS(CREATE TABLE AS SELECT) for table API.
/** A table that need to be created for CTAS. */ |
The CreateOrReplaceTable interface is introduced newly because if we add the create/createIfNotExist API to the Table interface, the user must call the saveAs API before calling these API, which will cause additional usage costs to the user.
The reason it is called CreateOrReplaceTable is to prepare for future support of the Replace Table feature.
The recommended way to use is:
...
@PublicEvolving /** |
...
- Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, and the Hook, Catalog and CatalogBaseTable are serialized.
- Submit the job to the cluster, if it is in detached mode, the client can exit.
- When the job starts, deserialize hooks, Catalog and CatalogBaseTable; Call the Catalog#createTable method through the hook to create the CatalogBaseTable.
- start task execution.
- If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by calling the hook of the Catalog#dropTable method.
In In streaming mode, usually the data is written in real time and visible in real time, so streaming mode does not provide atomicity guarantee, then there is no need to use JobStatusHook mechanism.use JobStatusHook mechanism.
Batch mode requires the use of the JobStatusHook mechanism to ensure atomicity. We will be compatible in Planner between stream mode without JobStatusHook and batch mode with JobStatusHook to achieve the ultimate atomicity of batch modeBatch mode requires the use of the JobStatusHook mechanism to ensure atomicity.
Planner
Providing method for planner to register JobStatusHook with StreamGraph.
...
Note: This solution only works if we create the Catalog using DDL, because we can only get the Catalog properties with the with keyword. If we use a Catalog registered by TableEnvironment#registerCatalog method, we cannot get these properties. Therefore, CTAS does not currently support jobs that use TableEnvironment#registerCatalog.
In the HiveCatalog solution, since the configuration of hive-conf-dir is a local path, make sure that all nodes in the cluster are placing hive configuration files under the same path. The current Application mode of Flink also has this problem.
Runtime
Provide JM side, job status change hook mechanism.
...