This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: ["Under Discussion"]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: 1.18
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
CREATE TABLE AS SELECT(CTAS) statement has been support by FLIP-218, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the Table will not be dropped.
We want Flink to support atomic CTAS, where only the table is created when the Job succeeds.
we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) and spark StagedTable design to implement Flink's atomic CTAS support.
Public Interfaces
Introduce createStagedCatalogTable API for Catalog.
@PublicEvolving public interface Catalog { /** * Create a StagedCatalogTable that provided transaction abstraction. * * <p>The framework will make sure to call this method with fully validated {@link * ResolvedCatalogTable}. * * @param tablePath path of the table to be created * @param table the table definition * @param ignoreIfExists flag to specify behavior when a table or view already exists at the * given path: if set to false, it throws a TableAlreadyExistException, if set to true, do * nothing. * @return A StagedCatalogTable that can be serialized and provides start/commit/abort operations * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false * @throws DatabaseNotExistException if the database in tablePath doesn't exist * @throws CatalogException in case of any runtime exception */ default StagedCatalogTable createStagedCatalogTable( ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { throw new UnsupportedOperationException(""); } }
Introduce StagedCatalogTable interface that support atomic operations.
/** Catalog table that supports staging. */ @PublicEvolving public interface StagedCatalogTable extends CatalogTable, Serializable { void start(); void commit(); void abort(); }
Proposed Changes
Whether a Catalog implements the createStagedCatalogTable API determines whether the Catalog supports atomic CTAS.
Compatibility, Deprecation, and Migration Plan
It is a new feature with no implication for backwards compatibility.
Test Plan
changes will be verified by UT
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.