Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We want Flink to support atomic CTAS, where only the table is created when the Job succeeds.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings

 

we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) and spark StagedTable design to implement Flink's atomic CTAS support.

Public Interfaces


Introduce createStagedCatalogTable API for Catalog.

...

Code Block
languagejava
@PublicEvolving
public interface Catalog {

    /**
     * Create a StagedCatalogTable that provided transaction abstraction.
     *
     * <p>The framework will make sure to call this method with fully validated {@link
     * ResolvedCatalogTable}.
     *
     * @param tablePath path of the table to be created
     * @param table the table definition
     * @param ignoreIfExists flag to specify behavior when a table or view already exists at the
     *     given path: if set to false, it throws a TableAlreadyExistException, if set to true, do
     *     nothing.
     * @return A StagedCatalogTable that can be serialized and provides start/commit/abort operations
     * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false
     * @throws DatabaseNotExistException if the database in tablePath doesn't exist
     * @throws CatalogException in case of any runtime exception
     */
    default StagedCatalogTable createStagedCatalogTable(
            ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        throw new UnsupportedOperationException("");
    }

}


Introduce StagedCatalogTable interface that support atomic operations.

Code Block
languagejava
/** Catalog table that supports staging. */
@PublicEvolving
public interface StagedCatalogTable extends CatalogTable, Serializable {

    void start();

    void commit();

    void abort();
}


Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the changeWhether a Catalog implements the createStagedCatalogTable API determines whether the Catalog supports atomic CTAS.




Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UTDescribe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

...