Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces


Introduce createStagedCatalogTable API createTwoPhaseCatalogTable API for Catalog.

Code Block
languagejava
@PublicEvolving
public interface Catalog {

    /**
     * Create a {@link StagedCatalogTableTwoPhaseCatalogTable} that provided transaction abstraction.
     * TwoPhaseCatalogTable will be combined with {@link JobStatusHook} to achieve atomicity
     * support in the Flink framework. Default returns empty, indicating that atomic operations are
     * not supported, then using non-atomic implementations.
     *
     * <p>The framework will make sure to call this method with fully validated {@link
     * ResolvedCatalogTable}.
     *
     * @param tablePath path of the table to be created
     * @param table the table definition
     * @param ignoreIfExists flag to specify behavior when a table or view already exists at the
     *     given path: if set to false, it throws a TableAlreadyExistException, if set to true, do
     *     nothing.
     * @param isStreamingMode A flag that tells if the current table is in stream mode, Different
     *     modes can have different implementations of atomicity support.
       * @return A{@link StagedCatalogTableTwoPhaseCatalogTable} that can be serialized and provides start/commit/abortatomic
     *     operations
     * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false
     * @throws DatabaseNotExistException if the database in tablePath doesn't exist
     * @throws CatalogException in case of any runtime exception
     */
    default StagedCatalogTableOptional<TwoPhaseCatalogTable> createStagedCatalogTablecreateTwoPhaseCatalogTable(
            ObjectPath tablePath,
            CatalogBaseTable table,
            boolean ignoreIfExists,
            boolean isStreamingMode)
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException(""return Optional.empty();
    }

}


Introduce StagedCatalogTable interface TwoPhaseCatalogTable interface that support atomic operations.

Code Block
languagejava
/**
 ** Catalog table that supports staging. A {@link CatalogTable} for atomic semantics using a two-phase commit protocol, combined with
 * {@link JobStatusHook} for atomic CTAS. {@link TwoPhaseCatalogTable} will be a member
 * variable of CtasJobStatusHook and can be serialized;
 *
 * <p>
 * CtasJobStatusHook#onCreated will call the beginTransaction method of TwoPhaseCatalogTable;
 * CtasJobStatusHook#onFinished will call the commit method of TwoPhaseCatalogTable;
 * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the abort method of
 * TwoPhaseCatalogTable;
 */
@PublicEvolving
public interface StagedCatalogTableTwoPhaseCatalogTable extends CatalogTable, Serializable {

	/** Prep work for the start.    /**
     * This method will be called when the job is started. Similar to what it means to open a
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * initialization work; For example, initializing the client of the underlying service, the tmp
     * path of the underlying storage, or even call the start transaction API of the underlying
     * service, etc.
     */
    void startbeginTransaction();

	    /**
     * Submit operation after job succeeded.This method will be called when the job is succeeds. Similar to what it means to commit the
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * data visibility related work; For example, moving the underlying data to the target
     * directory, writing buffer data to the underlying storage service, or even call the commit
     * transaction API of the underlying service, etc.
     */
    void commit();

	    /**
     * Some cleanup operations after jobThis method will be called when the job is failed or canceled. Similar to what it means to
     * rollback the transaction in a relational database; In Flink's atomic CTAS scenario, it is
     * used to do some data cleaning; For example, delete the data in tmp directory, delete the
     * temporary data in the underlying storage service, or even call the rollback transaction API
     * of the underlying service, etc.
     */
    void abort();
}


Proposed Changes

First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce StagedCatalogTable TwoPhaseCatalogTable, which can perform start transaction, commit transaction, and abort transaction operations.

Compatibility with existing non-atomic CTAS






The three APIs corresponding to StagedCatalogTableTwoPhaseCatalogTable:

start : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.

...

abort : Similar to abort transactions, we can do some data cleaning, data restoration, etc.

Note: StagedCatalogTable must TwoPhaseCatalogTable must be serializable, because it used on JM.

Then we need somewhere to create the StagedCatalogTableTwoPhaseCatalogTable, because different Catalogs implement atomic CTAS and need to perform different operations,

for example, HiveCatalog needs to access the Hive Metastore; JDBCCatalog needs to access the back-end database, so we introduce the createStagedCatalogTable API createTwoPhaseCatalogTable API on the Catalog interface.

The definition of the createStagedCatalogTable API createTwoPhaseCatalogTable API is the same as the definition of the createTable API.

...

Introduce CtasJobStatusHook (implements JobStatusHook interface), StagedCatalogTable is TwoPhaseCatalogTable is its member variable; 

CtasJobStatusHook#onCreated :  The start method of StagedCatalogTable is TwoPhaseCatalogTable is called in the onCreated method.

CtasJobStatusHook#onFinished :  The commit method of StagedCatalogTable is TwoPhaseCatalogTable is called in the onFinished method.

CtasJobStatusHook#onFailed :  The abort method of StagedCatalogTable is TwoPhaseCatalogTable is called in the onFailed method.

CtasJobStatusHook#onCanceled : The abort method of StagedCatalogTable is TwoPhaseCatalogTable is called in the onFailed method.

...

In the job compilation stage, the StagedCatalogTable TwoPhaseCatalogTable is obtained by Catalog#createStagedCatalogTable and Catalog#createTwoPhaseCatalogTable and used to construct the CtasJobStatusHook, 

...

Then implementation of the atomic CTAS operation requires only two steps :

  1. Catalog implements the createStagedCatalogTable methodcreateTwoPhaseCatalogTable method;
  2. Introduce the implementation class of the StagedCatalogTable interfaceTwoPhaseCatalogTable interface.

Compatibility, Deprecation, and Migration Plan

...