Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.


Current state:  ["Under Discussion"] Accepted

Discussion thread:  here (<- link to

VOTE thread: JIRAhere (<- link to


serverASF JIRA

Released: 1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).


CREATE TABLE AS SELECT(CTAS) statement has been support by FLIP-218, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the Table table will not be dropped.

We want Flink to support atomic CTAS, where only the table is created when the Job succeeds

we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) , use the existing JobStatusHook mechanism and extend Catalog's new API to implement atomic CTAS capabilities.

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings
  • Exposed monitoring information

Introduce interface SupportsStaging , which provided applyStaging API. If DynamicTableSink implements the interface SupportsStaging, it indicates that it supports atomic operations.

Code Block
 * Enables different staged operations to ensure atomicity in a {@link DynamicTableSink}.
 * <p>By default, if this interface is not implemented, indicating that atomic operations are not
 * supported, then a non-atomic implementation is used.
Code Block
public interface CatalogSupportsStaging {

     * CreateProvides a {@link StagedCatalogTableStagedTable} that provided transaction abstraction. StagedTable will be
     * combined with {@link JobStatusHook} to achieve atomicity support in the Flink framework. Call
     * <p>The framework will make sure to call this method with fully validated {@link
     * ResolvedCatalogTable}.the relevant API of StagedTable when the Job state is switched.
     * <p>This method will be called at the compile stage.
     * @param StagingContext Tell DynamicTableSink, the operation type of this StagedTable,
     *     expandable
     * @return {@link StagedTable} that can be serialized and provides atomic operations
    StagedTable applyStaging(StagingContext context);

     * The context is intended to tell DynamicTableSink the type of this operation. In this way,
     * DynamicTableSink can return the corresponding implementation of StagedTable according to the
     * specific @paramoperation. tablePathMore pathtypes of operations can be extended in the table to be created
     * @param table the table definition future.
    interface StagingContext {
        StagingPurpose getStagingPurpose();

    enum StagingPurpose {

Introduce StagedTable interface that support atomic operations.

Code Block
 * A {@link StagedTable} for atomic semantics using a two-phase commit protocol, combined with
 * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member variable of
 * CtasJobStatusHook and can be serialized;
 * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable;
 * CtasJobStatusHook#onFinished will call the commit method of StagedTable;
 * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the abort method of StagedTable;
public interface StagedTable extends Serializable {

     * @param ignoreIfExists flag to specify behavior when a table or view already exists at the This method will be called when the job is started. Similar to what it means to open a
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * initialization work; For example, initializing the client of the underlying service, the tmp
     * path of the underlying storage, or even call the start transaction API of the underlying
     * service, etc.
    void begin();

     * This method will  given path: if setbe called when the job is succeeds. Similar to false,what it throws a TableAlreadyExistException, if set to true, do means to commit the
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * data visibility related work; For example, moving the underlying data to the target
     * directory, writing buffer data to the underlying storage service, or even call the nothing.commit
       * @return A StagedCatalogTable that can be serialized and provides start/commit/abort operationstransaction API of the underlying service, etc.
    void commit();

     * This method will be called when the job is failed or canceled. Similar to what it means to
     * @throws TableAlreadyExistException if table already exists and ignoreIfExists is falserollback the transaction in a relational database; In Flink's atomic CTAS scenario, it is
     * @throws DatabaseNotExistException if used to do some data cleaning; For example, delete the databasedata in tmp tablePath doesn't existdirectory, delete the
     * @throwstemporary CatalogExceptiondata in casethe underlying storage service, or even call the rollback transaction API
     * of any runtime exceptionthe underlying service, etc.
    void abort();


Add table.ctas.atomicity-enabled option to allow users to enable atomicity when using create table as select syntax.

Code Block
public class TableConfigOptions {
    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
    public static final ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
           default StagedCatalogTable createStagedCatalogTable(
                            "Specifies if the create table as select operation is executed atomically. "
                        ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists)
          + "By default, the operation is non-atomic. The target table is created in Client side, and it will not be dropped even though the job fails or is cancelled. "
                               throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
   + "If set this option to true and DynamicTableSink implements the SupportsStaging interface, the create table as select operation is expected to be executed atomically, "
                                 throw new+ UnsupportedOperationException("");

}"the behavior of which depends on the actual DynamicTableSink.");

Proposed Changes

First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce StagedTable, which can perform start transaction, commit transaction, and abort transaction operations.

The three APIs corresponding to StagedTable:

begin : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.

commit : Similar to commit transactions, we can do some data writing, data visibility, table creation, etc.

abort : Similar to abort transactions, we can do some data cleaning, data restoration, etc.

Note: StagedTable must be serializable, because it used on JM.

Then we need somewhere to create the StagedTable, because different TableSink implement atomic CTAS and need to perform different operations,

for example, HiveTableSink needs to access the Hive Metastore and write to HDFS(OSS etc); JDBCTableSink needs to access the back-end database;

Therefore, we introduce the interface SupportsStaging, which, if implemented by DynamicTableSink, indicates that it supports atomic operations, otherwise it does not support atomic operations.

Flink framework can determine whether DynamicTableSink supports atomicity CTAS by whether it implements the interface SupportsStaging, and if it does, get the StagedTable object through the applyStaging API, otherwise use the non-atomic CTAS implementation.

Identification of atomic CTAS

Normally, in stream mode, we consider the job to be LONG RUNNING, and even if it fails, it needs to resume afterwards, so atomic CTAS semantics are usually not needed.

In addition, there are probably many flink jobs that already use non-atomic CTAS functions, especially Stream jobs, in order to ensure the consistency of flink behavior, and to give the user maximum flexibility, in time DynamicTableSink implements the SupportsStaging interface, users can still choose non-atomic implementation according to business needs.

So, we can infer in the TableEnvironmentImpl whether atomic CTAS is used based on whether the user has enabled it and whether DynamicTableSink implements the SupportsStaging interface, like the following:

Code Block
boolean isAtomicCtas = tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED) && dynamicTableSink instanceof SupportsStaging;

Integrate atomicity CTAS

Introduce CtasJobStatusHook (implements JobStatusHook interface), StagedTable is its member variable; 

The implementation of the API related to the call to StagedTable is as follows: StagedCatalogTable

Code Block
 * Catalog table that supports staging. */
public interface StagedCatalogTable extends CatalogTable, Serializable {

    void start();

    void commit();

    void abort();

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

 This Hook is used to implement the Flink CTAS atomicity semantics, calling the corresponding API
 * of {@link StagedTable} at different stages of the job.
public class CtasJobStatusHook implements JobStatusHook {

    private final StagedTable stagedTable;

    public CtasJobStatusHook(StagedTable stagedTable) {
        this.stagedTable = stagedTable;

    public void onCreated(JobID jobId) {

    public void onFinished(JobID jobId) {

    public void onFailed(JobID jobId, Throwable throwable) {

    public void onCanceled(JobID jobId) {

Compatibility with existing non-atomic CTAS

We can infer atomicity CTAS support by whether DynamicTableSink implements the interface SupportsStaging or not:

not :  it means that atomicity semantics are not supported and the existing code logic is used;

yes : it means that atomicity semantics are supported, then create a CtasJobStatusHook and use the JobStatusHook mechanism to implement atomicity semantics, as described in the code in the previous section.

Code Block
Optional<DynamicTableSink> dynamicTableSinkOptional =
if (tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED)
        && dynamicTableSinkOptional.isPresent()
        && dynamicTableSinkOptional.get() instanceof SupportsStaging) {
    DynamicTableSink dynamicTableSink = dynamicTableSinkOptional.get();
    StagedTable stagedTable =
            ((SupportsStaging) dynamicTableSink)
                            new SupportsStaging.StagingContext() {
                                public SupportsStaging.StagingPurpose
                                        getStagingPurpose() {
                                    if (createTableOperation
                                            .isIgnoreIfExists()) {
                                        return SupportsStaging.StagingPurpose
                                    return SupportsStaging.StagingPurpose
    CtasJobStatusHook ctasJobStatusHook = new CtasJobStatusHook(stagedTable);
} else {
    // execute CREATE TABLE first for non-atomic CTAS statements

To avoid secondary generation of DynamicTableSink, we need to construct a StagedSinkModifyOperation that inherits from SinkModifyOperation and then add the DynamicTableSink member variable.

Current non-atomic CTAS implementations

Current Flink supports non-atomic CTAS operations, when it is CreateTableASOperation, we will create the target table first, and then compile and execute the insert operation.

The current program has the following shortcomings:

First: If the insert operation fails, whether it is a compile failure or a job execution failure, flink will not drop the created target table;

Second: Before the job is executed, because the target table already exists, but no data can be read.

Atomic CTAS demo

Then implementation of the atomic CTAS operation requires only two steps :

1: DynamicTableSink implements the interface SupportsStaging;

2: Introduce the implementation class of the StagedTable interface.

Hive demo

HiveTableSink implements the applyStaging API:

Code Block
public StagedTable applyStaging(StagingContext context) {
    Table hiveTable =

    hiveStagedTable =
            new HiveStagedTable(
                    new JobConfWrapper(jobConf),
                            == SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS);

    return hiveStagedTable;

HiveStagedTable implements the core logic

Code Block
/** An implementation of {@link StagedTable} for Hive to support atomic ctas. */
public class HiveStagedTable implements StagedTable {

    private static final long serialVersionUID = 1L;

    @Nullable private final String hiveVersion;
    private final JobConfWrapper jobConfWrapper;

    private final Table table;

    private final boolean ignoreIfExists;

    private transient HiveMetastoreClientWrapper client;

    private FileSystemFactory fsFactory;
    private TableMetaStoreFactory msFactory;
    private boolean overwrite;
    private Path tmpPath;
    private String[] partitionColumns;
    private boolean dynamicGrouped;
    private LinkedHashMap<String, String> staticPartitions;
    private ObjectIdentifier identifier;
    private PartitionCommitPolicyFactory partitionCommitPolicyFactory;

    public HiveStagedTable(
            String hiveVersion,
            JobConfWrapper jobConfWrapper,
            Table table,
            boolean ignoreIfExists) {
        this.hiveVersion = hiveVersion;
        this.jobConfWrapper = jobConfWrapper;
        this.table = table;
        this.ignoreIfExists = ignoreIfExists;

    public void begin() {
        // init hive metastore client
        client =
                        HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion);

    public void commit() {
        try {
            // create table first

            try {
                List<PartitionCommitPolicy> policies = Collections.emptyList();
                if (partitionCommitPolicyFactory != null) {
                    policies =
                                    () -> {
                                        try {
                                            return fsFactory.create(tmpPath.toUri());
                                        } catch (IOException e) {
                                            throw new RuntimeException(e);

                FileSystemCommitter committer =
                        new FileSystemCommitter(
            } catch (Exception e) {
                throw new TableException("Exception in two phase commit", e);
            } finally {
                try {
                    fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
                } catch (IOException ignore) {
        } catch (AlreadyExistsException alreadyExistsException) {
            if (!ignoreIfExists) {
                throw new FlinkHiveException(alreadyExistsException);
        } catch (Exception e) {
            throw new FlinkHiveException(e);
        } finally {

    public void abort() {
        // do nothing

    public void setFsFactory(FileSystemFactory fsFactory) {
        this.fsFactory = fsFactory;

    public void setMsFactory(TableMetaStoreFactory msFactory) {
        this.msFactory = msFactory;

    public void setOverwrite(boolean overwrite) {
        this.overwrite = overwrite;

    public void setTmpPath(Path tmpPath) {
        this.tmpPath = tmpPath;

    public void setPartitionColumns(String[] partitionColumns) {
        this.partitionColumns = partitionColumns;

    public void setDynamicGrouped(boolean dynamicGrouped) {
        this.dynamicGrouped = dynamicGrouped;

    public void setStaticPartitions(LinkedHashMap<String, String> staticPartitions) {
        this.staticPartitions = staticPartitions;

    public void setIdentifier(ObjectIdentifier identifier) {
        this.identifier = identifier;

    public void setPartitionCommitPolicyFactory(
            PartitionCommitPolicyFactory partitionCommitPolicyFactory) {
        this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;

    public Table getTable() {
        return table;

Jdbc Demo

JdbcTableSink implements the applyStaging API:

Code Block
public StagedTable applyStaging(StagingContext context) {
    ... ...
	StagedTable stagedTable = new JdbcStagedTable(
            new ObjectPath(tablePath.getDatabaseName(), tablePath.getObjectName() + "_" + System.currentTimeMillis()),

    return stagedTable;

JdbcStagedTable implements the core logic

Code Block
/** An implementation of {@link StagedTable} for Jdbc to support atomic ctas. */
public class JdbcStagedTable implements StagedTable {

    private final ObjectPath tmpTablePath;
    private final ObjectPath finalTablePath;
    private final Map<String, String> schema;
    private final String jdbcUrl;
    private final String userName;
    private final String password;

    public JdbcStagedTable(
            ObjectPath tmpTablePath,
            ObjectPath finalTablePath,
            Map<String, String> schema,
            String jdbcUrl,
            String userName,
            String password) {
        this.tmpTablePath = tmpTablePath;
        this.finalTablePath = finalTablePath;
        this.schema = schema;
        this.jdbcUrl = jdbcUrl;
        this.userName = userName;
        this.password = password;

    public void begin() {
        // create tmp table, writing data to the tmp table
        Connection connection = getConnection();
                .prepareStatement("create table " + tmpTablePath.getFullName() + "( ... ... )")

    public void commit() {
        // Rename the tmp table to the final table name
        Connection connection = getConnection();
                        "rename table "
                                + tmpTablePath.getFullName()
                                + " to "
                                + finalTablePath.getFullName())

    public void abort() {
        // drop tmp table
        Connection connection = getConnection();
        connection.prepareStatement("drop table " + tmpTablePath.getFullName()).execute();

    private Connection getConnection() {
        // get jdbc connection
        return JDBCDriver.getConnection();

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UTIf there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.