You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state["Under Discussion"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: 1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

CREATE TABLE AS SELECT(CTAS) statement has been support by FLIP-218, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the Table will not be dropped.

We want Flink to support atomic CTAS, where only the table is created when the Job succeeds. 

we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) and spark StagedTable design to implement Flink's atomic CTAS support.

Public Interfaces


Introduce createStagedCatalogTable API for Catalog.

@PublicEvolving
public interface Catalog {

    /**
     * Create a StagedCatalogTable that provided transaction abstraction.
     *
     * <p>The framework will make sure to call this method with fully validated {@link
     * ResolvedCatalogTable}.
     *
     * @param tablePath path of the table to be created
     * @param table the table definition
     * @param ignoreIfExists flag to specify behavior when a table or view already exists at the
     *     given path: if set to false, it throws a TableAlreadyExistException, if set to true, do
     *     nothing.
     * @return A StagedCatalogTable that can be serialized and provides start/commit/abort operations
     * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false
     * @throws DatabaseNotExistException if the database in tablePath doesn't exist
     * @throws CatalogException in case of any runtime exception
     */
    default StagedCatalogTable createStagedCatalogTable(
            ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException("");
    }

}


Introduce StagedCatalogTable interface that support atomic operations.

/** Catalog table that supports staging. */
@PublicEvolving
public interface StagedCatalogTable extends CatalogTable, Serializable {

    void start();

    void commit();

    void abort();
}


Proposed Changes

Whether a Catalog implements the createStagedCatalogTable API determines whether the Catalog supports atomic CTAS.




Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.



  • No labels