Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:  ["Under Discussion"] Accepted

Discussion thread:  here (<- link to https://mail-archiveslists.apache.org/mod_mbox/flink-dev/)
thread/n6nsvbwhs5kwlj5kjgv24by2tk5mh9xd

VOTE thread: JIRAhere (<- link to https://issueslists.apache.org/jira/browse/FLINK-XXXX)

Released: 1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

CREATE TABLE AS SELECT(CTAS) statement has been support by FLIP-218, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the table will not be dropped.

We want Flink to support atomic CTAS, where only the table is created when the Job succeeds. 

we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) , use the existing JobStatusHook mechanism and extend Catalog's new API to implement atomic CTAS capabilities.

Public Interfaces

Introduce twoPhaseCreateTable API for Catalog.

thread/8c0wlp72kq0dhcbpy08nl1kb28q17kv3

JIRA:

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,customfield_12311032,customfield_12311037,customfield_12311022,customfield_12311027,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,Priority,Priority,Priority,Priority,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32580

Released: 1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

CREATE TABLE AS SELECT(CTAS) statement has been support by FLIP-218, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the table will not be dropped.

We want Flink to support atomic CTAS, where only the table is created when the Job succeeds. 

we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) , use the existing JobStatusHook mechanism and extend Catalog's new API to implement atomic CTAS capabilities.

Public Interfaces


Introduce interface SupportsStaging , which provided applyStaging API. If DynamicTableSink implements the interface SupportsStaging, it indicates that it supports atomic operations.

Code Block
languagejava
/**
 * Enables different staged operations to ensure atomicity in a {@link DynamicTableSink}.
 *
 * <p>By default, if this interface is not implemented, indicating that atomic operations are not
 * supported, then a non-atomic implementation is used.
 */
@PublicEvolving
public interface SupportsStaging {

    /**
     * Provides a {@link StagedTable} that provided transaction abstraction. StagedTable will be
Code Block
languagejava
@PublicEvolving
public interface Catalog {

    /**
     * Create a {@link TwoPhaseCatalogTable} that provided transaction abstraction.
     * TwoPhaseCatalogTable will be combined with {@link JobStatusHook} to achieve atomicity
     * support in the Flink framework. Default returns empty, indicating that atomic operations are
     * not supported, then using non-atomic implementations.
     *
     * <p>The framework will make sure to call this method with fully validated {@link
     * ResolvedCatalogTable}.
     *
     * @param tablePath path of the table to be created
     * @param table the table definition
     * @param ignoreIfExists flag to specify behavior when a table or view already exists at the
     *     given path: if set to false, it throws a TableAlreadyExistException, if set to true, do
     *     nothing.
     * @paramcombined isStreamingModewith A{@link flagJobStatusHook} thatto tellsachieve ifatomicity thesupport currentin tablethe isFlink in stream mode, Differentframework. Call
     * the relevant API of modesStagedTable canwhen havethe differentJob implementationsstate ofis atomicity supportswitched.
     *
 @return {@link TwoPhaseCatalogTable} that can* be<p>This serializedmethod andwill providesbe atomic
called at the compile stage.
 *     operations*
     * @throws@param TableAlreadyExistExceptionStagingContext ifTell tableDynamicTableSink, alreadythe existsoperation andtype ignoreIfExistsof isthis falseStagedTable,
     *  @throws DatabaseNotExistException if theexpandable
 database in tablePath doesn't exist
* @return {@link StagedTable} that *can @throwsbe CatalogExceptionserialized inand caseprovides of any runti
me exceptionatomic operations
     */
    defaultStagedTable Optional<TwoPhaseCatalogTable> twoPhaseCreateTable(applyStaging(StagingContext context);

    /**
     * The context ObjectPathis tablePath,
intended to tell DynamicTableSink the type of this operation. In this  CatalogBaseTable tableway,
     * DynamicTableSink can return the corresponding implementation boolean ignoreIfExists,of StagedTable according to the
     * specific operation. More types of operations booleancan isStreamingMode)
be extended in the future.
     */
   throws TableAlreadyExistException,interface DatabaseNotExistException,StagingContext CatalogException {
        returnStagingPurpose Optional.emptygetStagingPurpose();
    }

}

Introduce TwoPhaseCatalogTable interface that support atomic operations.

    enum StagingPurpose {
        CREATE_TABLE_AS,
        CREATE_TABLE_AS_IF_NOT_EXISTS
    }
}


Introduce StagedTable interface that support atomic operations.

Code Block
languagejava
/**
 * A {@link StagedTable} for atomic 
Code Block
languagejava
/**
 * A {@link CatalogTable} for atomic semantics using a two-phase commit protocol, combined with
 * {@link JobStatusHook} for atomic CTAS. {@link TwoPhaseCatalogTableStagedTable} will be a member
 * variable of
 * CtasJobStatusHook and can be serialized;
 *
 * <p>
 * CtasJobStatusHook#onCreated <p>CtasJobStatusHook#onCreated will call the begin method of TwoPhaseCatalogTableStagedTable;
 * CtasJobStatusHook#onFinished will call the commit method of TwoPhaseCatalogTableStagedTable;
 * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the abort method of
 * TwoPhaseCatalogTableStagedTable;
 */
@PublicEvolving
public interface TwoPhaseCatalogTableStagedTable extends CatalogTable, Serializable {

    /**
     * This method will be called when the job is started. Similar to what it means to open a
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * initialization work; For example, initializing the client of the underlying service, the tmp
     * path of the underlying storage, or even call the start transaction API of the underlying
     * service, etc.
     */
    void begin();

    /**
     * This method will be called when the job is succeeds. Similar to what it means to commit the
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * data visibility related work; For example, moving the underlying data to the target
     * directory, writing buffer data to the underlying storage service, or even call the commit
     * transaction API of the underlying service, etc.
     */
    void commit();

    /**
     * This method will be called when the job is failed or canceled. Similar to what it means to
     * rollback the transaction in a relational database; In Flink's atomic CTAS scenario, it is
     * used to do some data cleaning; For example, delete the data in tmp directory, delete the
     * temporary data in the underlying storage service, or even call the rollback transaction API
     * of the underlying service, etc.
     */
    void abort();
}

Proposed Changes

First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce TwoPhaseCatalogTable, which can perform start transaction, commit transaction, and abort transaction operations.

The three APIs corresponding to TwoPhaseCatalogTable:

begin : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.

commit : Similar to commit transactions, we can do some data writing, data visibility, table creation, etc.

abort : Similar to abort transactions, we can do some data cleaning, data restoration, etc.

Note: TwoPhaseCatalogTable must be serializable, because it used on JM.

Then we need somewhere to create the TwoPhaseCatalogTable, because different Catalogs implement atomic CTAS and need to perform different operations,

for example, HiveCatalog needs to access the Hive Metastore; JDBCCatalog needs to access the back-end database, so we introduce the twoPhaseCreateTable API on the Catalog interface.

The definition of the twoPhaseCreateTable API is similar to that of the createTable API, with the extension of the isStreamingMode parameter, in order to provide a different atomicity implementation in different modes.

Identification of atomic CTAS

We can identify whether a CatalogTable supports atomic Ctas by determining its type in DynamicTableFactory/DynamicTableSink, like the following:

Code Block
languagejava
boolean isAtomicCtas = context.getCatalogTable().getOrigin() instanceof TwoPhaseCatalogTable;

Integrate atomicity CTAS

Introduce CtasJobStatusHook (implements JobStatusHook interface), TwoPhaseCatalogTable is its member variable; 

The implementation of the API related to the call to TwoPhaseCatalogTable is as follows: 

TableConfigOptions

Add table.ctas.atomicity-enabled option to allow users to enable atomicity when using create table as select syntax.

Code Block
languagejava
@PublicEvolving
public class TableConfigOptions {
    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
    public static final ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
            key("table.ctas.atomicity-enabled")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription(
                     
Code Block
languagejava
/**
 * This Hook is used to implement the Flink CTAS atomicity semantics, calling the corresponding API
 * of {@link TwoPhaseCatalogTable} at different stages of the job.
 */
public class CtasJobStatusHook implements JobStatusHook {

    private final TwoPhaseCatalogTable twoPhaseCatalogTable;

    public CtasJobStatusHook(TwoPhaseCatalogTable twoPhaseCatalogTable) {
       "Specifies this.twoPhaseCatalogTableif =the twoPhaseCatalogTable;
create table as select }

operation is executed atomically. @Override"
     public void onCreated(JobID jobId) {
        twoPhaseCatalogTable.begin();
    }

    @Override
    public void onFinished(JobID jobId) {
 + "By default, the operation is  twoPhaseCatalogTable.commit();
    }

    @Override
    public void onFailed(JobID jobId, Throwable throwable) {
        twoPhaseCatalogTable.abort();
    }

    @Override
    public void onCanceled(JobID jobId) {
non-atomic. The target table is created in Client side, and it will not be dropped even though the job fails or is cancelled. "
                                  + "If  twoPhaseCatalogTable.abort();
    }
}

Compatibility with existing non-atomic CTAS

The return value of Catalog#twoPhaseCreateTable is Optional, and we can determine whether atomicity semantics are supported based on whether the return value is empty:

empty :  it means that atomicity semantics are not supported and the existing code logic is used;

not empty : it means that atomicity semantics are supported, then create a CtasJobStatusHook and use the JobStatusHook mechanism to implement atomicity semantics, as described in the code in the previous section.

Code Block
languagejava
Optional<TwoPhaseCatalogTable> twoPhaseCatalogTableOptional =
        ctasCatalog.twoPhaseCreateTable(
  set this option to true and DynamicTableSink implements the SupportsStaging interface, the create table as select operation is expected to be executed atomically, "
              objectPath,
                catalogTable,
    + "the behavior of which depends on the actual    createTableOperation.isIgnoreIfExists(),
                isStreamingMode);

if (twoPhaseCatalogTableOptional.isPresent()) {
	// use TwoPhaseCatalogTable for atomic CTAS statements
    TwoPhaseCatalogTable twoPhaseCatalogTable =
            twoPhaseCatalogTableOptional.get();
    CtasJobStatusHook ctasJobStatusHook =
            new CtasJobStatusHook(twoPhaseCatalogTable);
    mapOperations.add(
            ctasOperation.toSinkModifyOperation(
                    createTableOperation.getTableIdentifier(),
                    createTableOperation.getCatalogTable(),
                    twoPhaseCatalogTable,
                    ctasCatalog,
                    catalogManager));
    jobStatusHookList.add(ctasJobStatusHook);
} else {
    // execute CREATE TABLE first for non-atomic CTAS statements
    executeInternal(ctasOperation.getCreateTableOperation());
    mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
}

Atomicity support on Stream and Batch mode

We usually think of Stream mode jobs as LONG RUNNING, i.e. they never stop, so there is no atomicity semantics, but now flink is the stream batch unified computing engine, 

so we introduce isStreamingMode when we define Catalog#twoPhaseCreateTable, and Catalog can decide whether to provide atomicity semantic support.

In the production environment, there are some user-defined streams source will also be finished(no more data input), the job will also be finished,

in this case use atomic semantic implementation, will improve the user experience, by the implementation of Catalog decision.

 Atomic CTAS demo

Then implementation of the atomic CTAS operation requires only two steps :

  1. Catalog implements the twoPhaseCreateTable method;
  2. Introduce the implementation class of the TwoPhaseCatalogTable interface.

HiveCatalog demo

HiveCatalog implements the twoPhaseCreateTable API:

DynamicTableSink.");
}

Proposed Changes

First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce StagedTable, which can perform start transaction, commit transaction, and abort transaction operations.

The three APIs corresponding to StagedTable:

begin : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.

commit : Similar to commit transactions, we can do some data writing, data visibility, table creation, etc.

abort : Similar to abort transactions, we can do some data cleaning, data restoration, etc.

Note: StagedTable must be serializable, because it used on JM.

Then we need somewhere to create the StagedTable, because different TableSink implement atomic CTAS and need to perform different operations,

for example, HiveTableSink needs to access the Hive Metastore and write to HDFS(OSS etc); JDBCTableSink needs to access the back-end database;

Therefore, we introduce the interface SupportsStaging, which, if implemented by DynamicTableSink, indicates that it supports atomic operations, otherwise it does not support atomic operations.

Flink framework can determine whether DynamicTableSink supports atomicity CTAS by whether it implements the interface SupportsStaging, and if it does, get the StagedTable object through the applyStaging API, otherwise use the non-atomic CTAS implementation.

Identification of atomic CTAS

Normally, in stream mode, we consider the job to be LONG RUNNING, and even if it fails, it needs to resume afterwards, so atomic CTAS semantics are usually not needed.

In addition, there are probably many flink jobs that already use non-atomic CTAS functions, especially Stream jobs, in order to ensure the consistency of flink behavior, and to give the user maximum flexibility, in time DynamicTableSink implements the SupportsStaging interface, users can still choose non-atomic implementation according to business needs.

So, we can infer in the TableEnvironmentImpl whether atomic CTAS is used based on whether the user has enabled it and whether DynamicTableSink implements the SupportsStaging interface, like the following:

Code Block
languagejava
boolean isAtomicCtas = tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED) && dynamicTableSink instanceof SupportsStaging;

Integrate atomicity CTAS

Introduce CtasJobStatusHook (implements JobStatusHook interface), StagedTable is its member variable; 

The implementation of the API related to the call to StagedTable is as follows: 

Code Block
languagejava
/**
 * This Hook is used to implement the Flink CTAS atomicity semantics, calling the corresponding API
 * of {@link StagedTable} at different stages of the job.
 */
public class CtasJobStatusHook implements JobStatusHook {

    private final StagedTable stagedTable;

    public CtasJobStatusHook(StagedTable stagedTable) {
        this.stagedTable = stagedTable;
    }

    @Override
    public void onCreated(JobID jobId) {
        stagedTable.begin();
    }

    @Override
    public void onFinished(JobID jobId) {
        stagedTable.commit();
    }

    @Override
    public void onFailed(JobID jobId, Throwable throwable) {
        stagedTable.abort();
    }

    @Override
    public void onCanceled(JobID jobId) {
        stagedTable.abort();
    }
}

Compatibility with existing non-atomic CTAS

We can infer atomicity CTAS support by whether DynamicTableSink implements the interface SupportsStaging or not:

not :  it means that atomicity semantics are not supported and the existing code logic is used;

yes : it means that atomicity semantics are supported, then create a CtasJobStatusHook and use the JobStatusHook mechanism to implement atomicity semantics, as described in the code in the previous section.

Code Block
languagejava
Optional<DynamicTableSink> dynamicTableSinkOptional =
        getDynamicTableSink(
                catalogTable,
                tableIdentifier,
                createTableOperation.isTemporary(),
                catalogManager.getCatalog(catalogName)); 
if (tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED)
        && dynamicTableSinkOptional.isPresent()
        && dynamicTableSinkOptional.get() instanceof SupportsStaging) {
    DynamicTableSink dynamicTableSink = dynamicTableSinkOptional.get();
    StagedTable stagedTable =
            ((SupportsStaging) dynamicTableSink)
                    .applyStaging(
                            new SupportsStaging.StagingContext() {
                                @Override
                                public SupportsStaging.StagingPurpose
                                        getStagingPurpose() {
                                    if (createTableOperation
                                            .isIgnoreIfExists()) {
                                        return SupportsStaging.StagingPurpose
                                                .CREATE_TABLE_AS_IF_NOT_EXISTS;
                                    }
                                    return SupportsStaging.StagingPurpose
                                            .CREATE_TABLE_AS;
                                }
                            });
    CtasJobStatusHook ctasJobStatusHook = new CtasJobStatusHook(stagedTable);
    mapOperations.add(
            ctasOperation.toStagedSinkModifyOperation(
                    createTableOperation.getTableIdentifier(),
                    catalogTable,
                    ctasCatalog,
                    dynamicTableSink));
    jobStatusHookList.add(ctasJobStatusHook);
} else {
    // execute CREATE TABLE first for non-atomic CTAS statements
    executeInternal(ctasOperation.getCreateTableOperation());
    mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
}

To avoid secondary generation of DynamicTableSink, we need to construct a StagedSinkModifyOperation that inherits from SinkModifyOperation and then add the DynamicTableSink member variable.

Current non-atomic CTAS implementations

Current Flink supports non-atomic CTAS operations, when it is CreateTableASOperation, we will create the target table first, and then compile and execute the insert operation.

The current program has the following shortcomings:

First: If the insert operation fails, whether it is a compile failure or a job execution failure, flink will not drop the created target table;

Second: Before the job is executed, because the target table already exists, but no data can be read.

Atomic CTAS demo

Then implementation of the atomic CTAS operation requires only two steps :

1: DynamicTableSink implements the interface SupportsStaging;

2: Introduce the implementation class of the StagedTable interface.

Hive demo

HiveTableSink implements the applyStaging API:

Code Block
languagejava
@Override
public StagedTable applyStaging(StagingContext context) {
    Table hiveTable =
            HiveTableUtil.instantiateHiveTable(
                    identifier.toObjectPath(),
                    catalogTable,
                    HiveConfUtils.create(jobConf),
                    false);

    hiveStagedTable =
            new HiveStagedTable(
                    hiveVersion,
                    new JobConfWrapper(jobConf),
                    hiveTable,
                    context.getStagingPurpose()
                            == SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS);

    return hiveStagedTable;
}

HiveStagedTable implements the core logic

Code Block
languagejava
/** An implementation of {@link StagedTable} for Hive to support atomic ctas. */
public class HiveStagedTable implements StagedTable {

    private static final long serialVersionUID = 1L;

    @Nullable private final String hiveVersion;
    private final JobConfWrapper jobConfWrapper;

    private final Table table;

    private final boolean ignoreIfExists;

    private transient HiveMetastoreClientWrapper client;

    private FileSystemFactory fsFactory;
    private TableMetaStoreFactory msFactory;
    private boolean overwrite;
    private Path tmpPath;
    private String[] partitionColumns;
    private boolean dynamicGrouped;
    private LinkedHashMap<String, String> staticPartitions;
    private ObjectIdentifier identifier;
    private PartitionCommitPolicyFactory partitionCommitPolicyFactory;

    public HiveStagedTable(
            String hiveVersion,
            JobConfWrapper jobConfWrapper,
            Table table,
            boolean ignoreIfExists) {
        this.hiveVersion = hiveVersion;
        this.jobConfWrapper = jobConfWrapper;
        this.table = table;
        this.ignoreIfExists = ignoreIfExists;
    }

    @Override
    public void begin() {
        // init hive metastore client
        client =
                HiveMetastoreClientFactory.create(
                        HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion);
    }

    @Override
    public void commit() {
        try {
            // create table first
            client.createTable(table);

            try {
                List<PartitionCommitPolicy> policies = Collections.emptyList();
                if (partitionCommitPolicyFactory != null) {
                    policies =
                            partitionCommitPolicyFactory.createPolicyChain(
                                    Thread.currentThread().getContextClassLoader(),
                                    () -> {
                                        try {
                                            return fsFactory.create(tmpPath.toUri());
                                        } catch (IOException e) {
                                            throw new RuntimeException(e);
                                        }
                                    });
                }

                FileSystemCommitter committer =
                        new FileSystemCommitter(
                                fsFactory,
                                msFactory,
                                overwrite,
                                tmpPath,
                                partitionColumns.length,
                                false,
                   
Code Block
languagejava
	@Override
    public Optional<TwoPhaseCatalogTable> twoPhaseCreateTable(
            ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists, boolean isStreamingMode)
identifier,
                              throws TableAlreadyExistException, DatabaseNotExistExceptionstaticPartitions,
 CatalogException {

        if (isStreamingMode) {
            //HiveCatalog does not support atomicity semantics in stream mode
policies);
               return Optionalcommitter.emptycommitPartitions();
        }
		... ...
   } catch (Exception e)  TwoPhaseCatalogTable{
 twoPhaseCatalogTable = new HiveTwoPhaseCatalogTable(
            throw new   getHiveVersion(),
       TableException("Exception in two phase commit", e);
         new JobConfWrapper(JobConfUtils.createJobConfWithCredentials(hiveConf)),
  } finally {
            hiveTable,
    try {
           ignoreIfExists);

        return Optional.of(twoPhaseCatalogTable fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
    }

HiveTwoPhaseCatalogTable implements the core logic

Code Block
languagejava
 public class HiveTwoPhaseCatalogTable implements TwoPhaseCatalogTable {

    private static final} longcatch serialVersionUID(IOException =ignore) 1L;{

     @Nullable private final String hiveVersion;
    private final JobConfWrapper jobConfWrapper;}

    private final Table table;
    private final boolean ignoreIfExists;

 }
    private transient HiveMetastoreClientWrapper client;

 } catch (AlreadyExistsException publicalreadyExistsException) HiveTwoPhaseCatalogTable({
            if String hiveVersion,
(!ignoreIfExists) {
              JobConfWrapper jobConfWrapper,
 throw new FlinkHiveException(alreadyExistsException);
         Table table,
  }
        } catch boolean(Exception ignoreIfExistse) {
        this.hiveVersion = hiveVersion    throw new FlinkHiveException(e);
        this.jobConfWrapper} =finally jobConfWrapper;{
          this.table = tableclient.close();
        this.ignoreIfExists = ignoreIfExists;}
    }

    @Override
    public void beginabort() {
        // init hive metastore client do nothing
    }

    public void setFsFactory(FileSystemFactory fsFactory) {
        clientthis.fsFactory = fsFactory;
    }

    public void setMsFactory(TableMetaStoreFactory      HiveMetastoreClientFactory.create(msFactory) {
        this.msFactory = msFactory;
    }

    public void     HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion);setOverwrite(boolean overwrite) {
    }

    @Override
 this.overwrite = overwrite;
 public void commit() {}

     public void setTmpPath(Path trytmpPath) {
        this.tmpPath    client.createTable(table)= tmpPath;
    }

    }public catchvoid setPartitionColumns(AlreadyExistsExceptionString[] alreadyExistsExceptionpartitionColumns) {
        this.partitionColumns = partitionColumns;
    }

    public ifvoid setDynamicGrouped(!ignoreIfExistsboolean dynamicGrouped) {
        this.dynamicGrouped = dynamicGrouped;
    }

    public void throw new FlinkHiveException(alreadyExistsException);setStaticPartitions(LinkedHashMap<String, String> staticPartitions) {
        this.staticPartitions =   }staticPartitions;
    }

    }public catchvoid setIdentifier(ExceptionObjectIdentifier eidentifier) {
        this.identifier = identifier;
  throw new FlinkHiveException(e); }

    public void setPartitionCommitPolicyFactory(
  } finally {
        PartitionCommitPolicyFactory partitionCommitPolicyFactory) {
  client.close();
      this.partitionCommitPolicyFactory = }partitionCommitPolicyFactory;
    }

    @Override
    public voidTable abortgetTable() {
		// may clear staging dir
        client.close()return table;
    }
}

...

Jdbc Demo

...

JdbcTableSink implements

...

the applyStaging API:

Code Block
languagejava
	@Override
    public Optional<TwoPhaseCatalogTable> twoPhaseCreateTable(
            ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists, boolean isStreamingMode)
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {

		@Override
public StagedTable applyStaging(StagingContext context) {
    ... ...
        TwoPhaseCatalogTable twoPhaseCatalogTable 	StagedTable stagedTable = new JdbcTwoPhaseCatalogTableJdbcStagedTable(
                new ObjectPath(tablePath.getDatabaseName(), tablePath.getObjectName() + "_" + System.currentTimeMillis()),
				tablePath,
				tableSchem,
				jdbcUrl,
                jdbcUserName,
                jdbcPassword);

        return Optional.of(twoPhaseCatalogTable)stagedTable;
    }

...

JdbcStagedTable implements the core logic

Code Block
languagejava
/** An implementation of {@link TwoPhaseCatalogTableStagedTable} for Jdbc to support atomic ctas. */
public class JdbcTwoPhaseCatalogTableJdbcStagedTable implements TwoPhaseCatalogTableStagedTable {

    private final ObjectPath tmpTablePath;
    private final ObjectPath finalTablePath;
    private final Map<String, String> schema;
    private final String jdbcUrl;
    private final String userName;
    private final String password;

    public JdbcTwoPhaseCatalogTableJdbcStagedTable(
            ObjectPath tmpTablePath,
            ObjectPath finalTablePath,
            Map<String, String> schema,
            String jdbcUrl,
            String userName,
            String password) {
        this.tmpTablePath = tmpTablePath;
        this.finalTablePath = finalTablePath;
        this.schema = schema;
        this.jdbcUrl = jdbcUrl;
        this.userName = userName;
        this.password = password;
    }

    @Override
    public void begin() {
        // create tmp table, writing data to the tmp table
        Connection connection = getConnection();
        connection
                .prepareStatement("create table " + tmpTablePath.getFullName() + "( ... ... )")
                .execute();
    }

    @Override
    public void commit() {
        // Rename the tmp table to the final table name
        Connection connection = getConnection();
        connection
                .prepareStatement(
                        "rename table "
                                + tmpTablePath.getFullName()
                                + " to "
                                + finalTablePath.getFullName())
                .execute();
    }

    @Override
    public void abort() {
        // drop tmp table
        Connection connection = getConnection();
        connection.prepareStatement("drop table " + tmpTablePath.getFullName()).execute();
    }

    private Connection getConnection() {
        // get jdbc connection
        return JDBCDriver.getConnection();
    }
}

...