Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

CREATE TABLE AS SELECT(CTAS) statement has been support by FLIP-218, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the Table table will not be dropped.

We want Flink to support atomic CTAS, where only the table is created when the Job succeeds. 

we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) , Use use the existing JobStatusHook mechanism and extend Catalog's new API to implement atomic CTAS capabilities.

...

First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce TwoPhaseCatalogTable, which can perform start transaction, commit transaction, and abort transaction operations.

Compatibility with existing non-atomic CTAS

The three APIs corresponding to TwoPhaseCatalogTable:

start beginTransaction : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.

...

for example, HiveCatalog needs to access the Hive Metastore; JDBCCatalog needs to access the back-end database, so we introduce the createTwoPhaseCatalogTable API on the Catalog interface.

The definition of the createTwoPhaseCatalogTable API is the same as the definition of the createTable API.The next section describes how to integrate atomicity CTAS in the Flink framework:createTwoPhaseCatalogTable API is similar to that of the createTable API, with the extension of the isStreamingMode parameter, in order to provide a different atomicity implementation in different modes.

Integrate atomicity CTAS

Introduce CtasJobStatusHook (implements JobStatusHook interface), TwoPhaseCatalogTable is its member variable; 

CtasJobStatusHook#onCreated :  The start method of TwoPhaseCatalogTable is called in the onCreated method.

CtasJobStatusHook#onFinished :  The commit method of TwoPhaseCatalogTable is called in the onFinished method.

CtasJobStatusHook#onFailed :  The abort method of TwoPhaseCatalogTable is called in the onFailed method.

CtasJobStatusHook#onCanceled : The abort method of TwoPhaseCatalogTable is called in the onFailed method.

In the job compilation stage, the TwoPhaseCatalogTable is obtained by Catalog#createTwoPhaseCatalogTable and used to construct the CtasJobStatusHook, 

The implementation of the API related to the call to TwoPhaseCatalogTable is as follows: 

Code Block
languagejava
/**
 * This Hook is used to implement the Flink CTAS atomicity semantics, calling the corresponding API
 * of {@link TwoPhaseCatalogTable} at different stages of the job.
 */
public class CtasJobStatusHook implements JobStatusHook {

    private final TwoPhaseCatalogTable twoPhaseCatalogTable;

    public CtasJobStatusHook(TwoPhaseCatalogTable twoPhaseCatalogTable) {
        this.twoPhaseCatalogTable = twoPhaseCatalogTable;
    }

    @Override
    public void onCreated(JobID jobId) {
        twoPhaseCatalogTable.beginTransaction();
    }

    @Override
    public void onFinished(JobID jobId) {
        twoPhaseCatalogTable.commit();
    }

    @Override
    public void onFailed(JobID jobId, Throwable throwable) {
        twoPhaseCatalogTable.abort();
    }

    @Override
    public void onCanceled(JobID jobId) {
        twoPhaseCatalogTable.abort();
    }
}

Compatibility with existing non-atomic CTAS

The return value of Catalog#createTwoPhaseCatalogTable is Optional, and we can determine whether atomicity semantics are supported based on whether the return value is empty:

empty :  it means that atomicity semantics are not supported and the existing code logic is used;

not empty : it means that atomicity semantics are supported, then create a CtasJobStatusHook and use the JobStatusHook mechanism to implement atomicity semantics, as described in the code in the previous section.

Code Block
languagejava
Optional<TwoPhaseCatalogTable> twoPhaseCatalogTableOptional =
        ctasCatalog.createTwoPhaseCatalogTable(
                objectPath,
                catalogTable,
                createTableOperation.isIgnoreIfExists(),
                isStreamingMode);

if (twoPhaseCatalogTableOptional.isPresent()) {
	// use TwoPhaseCatalogTable for atomic CTAS statements
    TwoPhaseCatalogTable twoPhaseCatalogTable =
            twoPhaseCatalogTableOptional.get();
    CtasJobStatusHook ctasJobStatusHook =
            new CtasJobStatusHook(twoPhaseCatalogTable);
    mapOperations.add(
            ctasOperation.toSinkModifyOperation(
                    createTableOperation.getTableIdentifier(),
                    createTableOperation.getCatalogTable(),
                    twoPhaseCatalogTable,
                    ctasCatalog,
                    catalogManager));
    jobStatusHookList.add(ctasJobStatusHook);
} else {
    // execute CREATE TABLE first for non-atomic CTAS statements
    executeInternal(ctasOperation.getCreateTableOperation());
    mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
}

Atomicity support on Stream and Batch mode

We usually think of Stream mode jobs as LONG RUNNING, i.e. they never stop, so there is no atomicity semantics, but now flink is the stream batch unified computing engine, 

so we introduce isStreamingMode when we define Catalog#createTwoPhaseCatalogTable, and Catalog can decide whether to provide atomicity semantic support.

In the production environment, there are some user-defined streams source will also be finished, the job will also be finished (no more data input),

in this case use atomic semantic implementation, will improve the user experience, by the implementation of Catalog decisionCtasJobStatusHook will be registered to StreamGraph and will be executed automatically for the lifetime of the Job.


At this point, the entire execution of Atomic CTAS is complete. 

...