You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 3 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state["Under Discussion"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: 1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

CREATE TABLE AS SELECT(CTAS) statement has been support by FLIP-218, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the Table will not be dropped.

We want Flink to support atomic CTAS, where only the table is created when the Job succeeds. 

we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) , Use the existing JobStatusHook mechanism and extend Catalog's new API to implement atomic CTAS capabilities.

Public Interfaces


Introduce createStagedCatalogTable API for Catalog.

@PublicEvolving
public interface Catalog {

    /**
     * Create a StagedCatalogTable that provided transaction abstraction.
     *
     * <p>The framework will make sure to call this method with fully validated {@link
     * ResolvedCatalogTable}.
     *
     * @param tablePath path of the table to be created
     * @param table the table definition
     * @param ignoreIfExists flag to specify behavior when a table or view already exists at the
     *     given path: if set to false, it throws a TableAlreadyExistException, if set to true, do
     *     nothing.
     * @return A StagedCatalogTable that can be serialized and provides start/commit/abort operations
     * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false
     * @throws DatabaseNotExistException if the database in tablePath doesn't exist
     * @throws CatalogException in case of any runtime exception
     */
    default StagedCatalogTable createStagedCatalogTable(
            ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException("");
    }

}


Introduce StagedCatalogTable interface that support atomic operations.

/** Catalog table that supports staging. */
@PublicEvolving
public interface StagedCatalogTable extends CatalogTable, Serializable {

	/** Prep work for the start. */
    void start();

	/** Submit operation after job succeeded. */
    void commit();

	/** Some cleanup operations after job failed or canceled */
    void abort();
}


Proposed Changes

First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce StagedCatalogTable, which can perform start transaction, commit transaction, and abort transaction operations.

The three APIs corresponding to StagedCatalogTable:

start : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.

commit : Similar to commit transactions, we can do some data writing, data visibility, table creation, etc.

abort : Similar to abort transactions, we can do some data cleaning, data restoration, etc.

Note: StagedCatalogTable must be serializable, because it used on JM.

Then we need somewhere to create the StagedCatalogTable, because different Catalogs implement atomic CTAS and need to perform different operations,

for example, HiveCatalog needs to access the Hive Metastore; JDBCCatalog needs to access the back-end database, so we introduce the createStagedCatalogTable API on the Catalog interface.

The definition of the createStagedCatalogTable API is the same as the definition of the createTable API.


The next section describes how to integrate atomicity CTAS in the Flink framework:

Introduce CtasJobStatusHook (implements JobStatusHook interface), StagedCatalogTable is its member variable; 

CtasJobStatusHook#onCreated :  The start method of StagedCatalogTable is called in the onCreated method.

CtasJobStatusHook#onFinished :  The commit method of StagedCatalogTable is called in the onFinished method.

CtasJobStatusHook#onFailed :  The abort method of StagedCatalogTable is called in the onFailed method.

CtasJobStatusHook#onCanceled : The abort method of StagedCatalogTable is called in the onFailed method.


In the job compilation stage, the StagedCatalogTable is obtained by Catalog#createStagedCatalogTable and used to construct the CtasJobStatusHook, 

CtasJobStatusHook will be registered to StreamGraph and will be executed automatically for the lifetime of the Job.

At this point, the entire execution of Atomic CTAS is complete. 

Then implementation of the atomic CTAS operation requires only two steps :

  1. Catalog implements the createStagedCatalogTable method;
  2. Introduce the implementation class of the StagedCatalogTable interface.

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT




  • No labels