...
Hive SQL and Spark SQL are mainly used in offline(batch mode) scenarios; Flink SQL is suitable for both real-time(streaming mode) and offline(batch mode) scenarios. In a real-time scenario, we believe that the job is always running and does not stop, and the data is written in real time and visible in real time, so we do not think it is necessary to provide atomicity in a real-time scenario.
...
/** /**
|
TableConfigOptions
Add configuration options to allow users to enable atomicity.
111111 |
Implementation Plan
We provide two semantics for Flink CTAS: Non-atomic and Atomic. Non-atomic implementations are the default behavior of Streaming and Batch modes.
Non-atomic (default)
The overall execution process is shown in the following figure.
The non-atomic implementation is basically the same as the existing Insert data process, except that the sink table is first created on the Client side via Catalog before performing the insert. No need for too much introduction.
Atomic
The atomicity implementation of Flink CTAS requires two parts:
- Add the Enable atomicity support option.
- Catalog can be serialized, ensuring atomicity by performing created/dropped table on the JM side.
The following describes the process when the user enables the atomicity support option. The overall execution process is shown in the following figure.
Due to the client process may exit soon, such as Due to the client process may exit soon, such as detached mode, and the job execution maybe take a longer time, so the table creation(before job start) and drop(when job failed or cancelled) should be executed in JM side instead of client. In addition, the creation and drop of table is executed through Catalog which is consistent with existing behavior. Therefore, a new Hook mechanism should be introduced which is needed by JM to execute the corresponding action, the hook depend on Catalog to complete the function. In summary, the overall execution process of CTAS job is as following:
- Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, the Catalog and CatalogBaseTable are the member variable of Hook which should be serializable .
- Submit the job to the cluster through the client.
- When the job starts, construct the hook object, deserialize the Catalog and CatalogBaseTable in hook. Then call the Catalog#createTable method through the hook to create the CatalogBaseTable.
- Task start to execute.
- If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by the hook to call the Catalog#dropTable method.be dropped by the hook to call the Catalog#dropTable method.
The next describes the details of the implementationIn streaming mode, usually the data is written in real time and visible in real time, so streaming mode does not provide atomicity guarantee, then there is no need to use JobStatusHook mechanism. However, batch mode requires to use the JobStatusHook mechanism to ensure atomicity. We will be compatible in Planner between stream mode without JobStatusHook and batch mode with JobStatusHook to achieve the ultimate atomicity of batch mode.
Planner
Providing method for planner to register JobStatusHook with StreamGraph.
...
Create Table As Select(CTAS) feature depends on the serializability of the catalog. To quickly see if the catalog supports CTAS, we need to try to serialize the catalog when compile SQL in planner and if it fails, an exception will be thrown to indicate to user that the catalog does not support CTAS because it cannot be serialized.
Note: The metadata in InMemoryCatalog is only stored in Flink's memory and does not modify metadata in external systems, so InMemoryCatalog does not need to support atomicity. That is, InMemoryCatalog does not need to be adapted to support serialization.
Runtime
Provide JM side, job status change hook mechanism.
...
Flink's current Hook design cannot meet the needs of CTAS. For example, the JobListener is executed on the client side; JobStatusListener is on the JM side, but it cannot be serialized and it function cannot meets CTAS. Thus we tend to propose a new interface which is called JobStatusHook, that could be attached to the JobGraph and executed in the JobMaster. The interface will also be marked as Internal.
The process of CTAS in runtime
- When the job starting, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.
- When deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.
- Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.
- After deserializing the Catalog, call Catalog#open to ensure that the Catalog can be used.
- When the job is start and the job status changes, the JobStatusHook method will be called by JM:
...
Non-atomic implementation is consistent with DataSource v1 logic. For details, see CreateTableAsSelectExec .
Atomic
Atomic implementation( for details, see AtomicCreateTableAsSelectExec), supported by StagingTableCatalog and StagedTable .
...