Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) and spark StagedTable design to implement Flink's atomic CTAS support , Use the existing JobStatusHook mechanism and extend Catalog's new API to implement atomic CTAS capabilities.

Public Interfaces


Introduce createStagedCatalogTable API for Catalog.

...

Code Block
languagejava
/** Catalog table that supports staging. */
@PublicEvolving
public interface StagedCatalogTable extends CatalogTable, Serializable {

	/** Prep work for the start. */
    void start();

	/** Submit operation after job succeeded. */
    void commit();

	/** Some cleanup operations after job failed or canceled */
    void abort();
}


Proposed Changes

...

First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce StagedCatalogTable, which can perform start transaction, commit transaction, and abort transaction operations.

The three APIs corresponding to StagedCatalogTable:

start : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.

commit : Similar to commit transactions, we can do some data writing, data visibility, table creation, etc.

abort : Similar to abort transactions, we can do some data cleaning, data restoration, etc.

Note: StagedCatalogTable must be serializable, because it used on JM.

Then we need somewhere to create the StagedCatalogTable, because different Catalogs implement atomic CTAS and need to perform different operations,

for example, HiveCatalog needs to access the Hive Metastore; JDBCCatalog needs to access the back-end database, so we introduce the createStagedCatalogTable API on the Catalog interface.

The definition of the createStagedCatalogTable API is the same as the definition of the createTable API.


The next section describes how to integrate atomicity CTAS in the Flink framework:

Introduce CtasJobStatusHook (implements JobStatusHook interface), StagedCatalogTable is its member variable; 

CtasJobStatusHook#onCreated :  The start method of StagedCatalogTable is called in the onCreated method.

CtasJobStatusHook#onFinished :  The commit method of StagedCatalogTable is called in the onFinished method.

CtasJobStatusHook#onFailed :  The abort method of StagedCatalogTable is called in the onFailed method.

CtasJobStatusHook#onCanceled : The abort method of StagedCatalogTable is called in the onFailed method.


In the job compilation stage, the StagedCatalogTable is obtained by Catalog#createStagedCatalogTable and used to construct the CtasJobStatusHook, 

CtasJobStatusHook will be registered to StreamGraph and will be executed automatically for the lifetime of the Job.

At this point, the entire execution of Atomic CTAS is complete. 

Then implementation of the atomic CTAS operation requires only two steps :

  1. Catalog implements the createStagedCatalogTable method;
  2. Introduce the implementation class of the StagedCatalogTable interface.

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.