Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state:  ["Under Discussion"] Accepted

Discussion thread:  here (<- link to https://mail-archiveslists.apache.org/mod_mbox/flink-dev/)
thread/n6nsvbwhs5kwlj5kjgv24by2tk5mh9xd

VOTE thread: JIRAhere (<- link to https://issueslists.apache.org/jira/browse/FLINK-XXXX)

Released: 1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

CREATE TABLE AS SELECT(CTAS) statement has been support by FLIP-218, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the table will not be dropped.

We want Flink to support atomic CTAS, where only the table is created when the Job succeeds. 

we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) , use the existing JobStatusHook mechanism and extend Catalog's new API to implement atomic CTAS capabilities.

Public Interfaces

thread/8c0wlp72kq0dhcbpy08nl1kb28q17kv3

JIRA:

Jira
serverASF JIRA
columnIdsissuekey,summary,issuetype,created,updated,duedate,assignee,reporter,customfield_12311032,customfield_12311037,customfield_12311022,customfield_12311027,priority,status,resolution
columnskey,summary,type,created,updated,due,assignee,reporter,Priority,Priority,Priority,Priority,priority,status,resolution
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-32580

Released: 1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

CREATE TABLE AS SELECT(CTAS) statement has been support by FLIP-218, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the table will not be dropped.

We want Flink to support atomic CTAS, where only the table is created when the Job succeeds. 

we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) , use the existing JobStatusHook mechanism and extend Catalog's new API to implement atomic CTAS capabilities.

Public Interfaces


Introduce interface SupportsStaging , which provided applyStaging API. If DynamicTableSink implements the interface SupportsStaging, it indicates that it supports atomic operations.

Code Block
languagejava
/**
 * Enables different staged operations to ensure atomicity in a {@link DynamicTableSink}.
 *
 * <p>By default, if this interface is not implemented, indicating that atomic operations are not
 * supported, then a non-atomic implementation is used.
 */
@PublicEvolving
public interface SupportsStaging {

    /**
     * Provides a {@link StagedTable} that provided transaction abstraction. StagedTable will be
     * combined with {@link JobStatusHook} to achieve atomicity support in the Flink framework. Call
     * the relevant API of StagedTable when the Job state is switched.
     *
     * <p>This method will be called at the compile stage.
     *
     * @param StagingContext Tell DynamicTableSink, the operation type of this StagedTable,
     *     expandable
     * @return {@link StagedTable} that can be serialized and provides atomic operations
     */
    StagedTable applyStaging(StagingContext context);

    /**
     * The context is intended to tell DynamicTableSink the type of this operation. In this way,
     * DynamicTableSink can return the corresponding implementation of StagedTable according to the
     * specific operation. More types of operations can be extended in the future.
     */
    interface StagingContext {
        StagingPurpose getStagingPurpose();
    }

    enum StagingPurpose {
        CREATE_TABLE_AS,
        CREATE_TABLE_AS_IF_NOT_EXISTS
    }
}


Introduce StagedTable interface that support Introduce interface SupportsStaging , which provided applyStaging API. If DynamicTableSink implements the interface SupportsStaging, it indicates that it supports atomic operations.

Code Block
languagejava
/**
 * A Enables{@link differentStagedTable} stagedfor operationsatomic tosemantics ensureusing atomicitya intwo-phase acommit {@link DynamicTableSink}.
 *protocol, combined with
 * <p>By default, if this interface is not implemented, indicating that atomic operations are not{@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member variable of
 * supported,CtasJobStatusHook thenand acan non-atomic implementation is used.be serialized;
 */
@PublicEvolving
public interface* SupportsStaging<p>CtasJobStatusHook#onCreated {

will call the begin /**
method of  StagedTable;
  * ProvidesCtasJobStatusHook#onFinished awill {@linkcall StagedTable}the thatcommit provided transaction abstraction.method of StagedTable;
 will be
     * combined with {@link JobStatusHook} to achieve atomicity support in the Flink framework. Call* CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the abort method of StagedTable;
 */
@PublicEvolving
public interface StagedTable extends Serializable {

    /**
     * theThis relevantmethod APIwill ofbe StagedTablecalled when the Job statejob is switched.
     *started. Similar to what it means to open a
     * <p>This method will be called at the compile stage.
     *transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * @paraminitialization StagingContextwork; TellFor DynamicTableSinkexample, initializing the operationclient typeof ofthe thisunderlying StagedTableservice, the tmp
     * path of the underlying storage, expandable
or even call the start *transaction @returnAPI {@linkof StagedTable}the thatunderlying
 can be serialized and provides* atomicservice, operationsetc.
     */
    StagedTablevoid applyStagingbegin(StagingContext context);

    /**
     * The context This method will be called when the job is intendedsucceeds. Similar to tell DynamicTableSink the type of this operation. In this way, what it means to commit the
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * DynamicTableSinkdata visibility canrelated returnwork; theFor correspondingexample, implementationmoving ofthe StagedTableunderlying accordingdata to the target
     * directory, writing specificbuffer operation.data Moreto typesthe ofunderlying operationsstorage canservice, beor extendedeven incall the future.commit
     */
 transaction API of the interfaceunderlying StagingContextservice, {etc.
     */
    StagingPurposevoid getStagingPurposecommit();

    }/**

     enum* StagingPurposeThis {
method will be called when the job  CREATE_TABLE_AS,
        CREATE_TABLE_AS_IF_NOT_EXISTSis failed or canceled. Similar to what it means to
    }
}

Introduce StagedTable interface that support atomic operations.

Code Block
languagejava
/**
 * Arollback {@linkthe StagedTable} for atomic semantics using a two-phase commit protocol, combined with
 * {@link JobStatusHook} fortransaction in a relational database; In Flink's atomic CTAS. {@link StagedTable} will be a member variable of
 * CtasJobStatusHook and can be serialized;
 *
 * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable;
 * CtasJobStatusHook#onFinished will call the commit method of StagedTable;
 * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the abort method of StagedTable;
 */
@PublicEvolving
public interface StagedTable extends Serializable {

    /**scenario, it is
     * used to do some data cleaning; For example, delete the data in tmp directory, delete the
     * temporary data in the underlying storage service, or even call the rollback transaction API
     * of the underlying service, etc.
     */
 This method will be called when the job is started. Similar to what it means to open avoid abort();
}

TableConfigOptions

Add table.ctas.atomicity-enabled option to allow users to enable atomicity when using create table as select syntax.

Code Block
languagejava
@PublicEvolving
public class TableConfigOptions {
    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
    public *static transactionfinal in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * initialization work; For example, initializing the client of the underlying service, the tmp
ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
            key("table.ctas.atomicity-enabled")
                  * path of.booleanType()
 the underlying storage, or even call the start transaction API of the underlying
     * service, etc.defaultValue(false)
     */
      void begin();

    /**
    .withDescription(
 * This method will be called when the job is succeeds. Similar to what it means to commit the
     * transaction in a relational"Specifies database;if Inthe Flink'screate atomictable CTASas scenario,select itoperation is usedexecuted toatomically. do"
 some
     * data visibility related work; For example, moving the underlying data to the target
     * directory, writing buffer data to the underlying storage service, or+ even"By calldefault, the commit
operation is non-atomic. The target *table transaction APIis ofcreated thein underlyingClient serviceside, etc.
and it will not be */
dropped even though the void commit();

    /**job fails or is cancelled. "
     * This method will be called when the job is failed or canceled. Similar to what it means to
     * rollback the transaction in a relational+ database;"If Inset Flink'sthis atomicoption CTASto scenario,true itand is
DynamicTableSink implements the SupportsStaging interface, *the usedcreate totable doas someselect dataoperation cleaning;is Forexpected example,to deletebe theexecuted dataatomically, in"
 tmp directory, delete the
     * temporary data in the underlying storage service, or even call the rollback transaction API
     * of the underlying service, etc.
 + "the behavior of */
which depends on the void abort(actual DynamicTableSink.");
}

Proposed Changes

First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce StagedTable, which can perform start transaction, commit transaction, and abort transaction operations.

...

Therefore, we introduce the interface SupportsStaging, which, if implemented by DynamicTableSink, indicates that it supports atomic operations, otherwise it does not support atomic operations.

Flink framework can determine whether DynamicTableSink supports atomicity CTAS by whether it implements the interface SupportsStaging, and if it does not support atomic operations.

Flink framework can determine whether DynamicTableSink supports atomicity CTAS by whether it implements the interface SupportsStaging, and if it does, get the StagedTable object through the applyStaging API, otherwise use the non-atomic CTAS implementation.

Identification of atomic CTAS

, get the StagedTable object through the applyStaging API, otherwise use the non-atomic CTAS implementation.

Identification of atomic CTAS

Normally, in stream mode, we consider the job to be LONG RUNNING, and even if it fails, it needs to resume afterwards, so atomic CTAS semantics are usually not needed.

In addition, there are probably many flink jobs that already use non-atomic CTAS functions, especially Stream jobs, in order to ensure the consistency of flink behavior, and to give the user maximum flexibility, in time DynamicTableSink implements the SupportsStaging interface, users can still choose non-atomic implementation according to business needs.

So, we can infer in the TableEnvironmentImpl whether atomic CTAS is used based on whether the user has enabled it and whether DynamicTableSink implements the SupportsStaging interfaceWe can identify whether a DynamicTableSink supports atomic Ctas by determining its type in TableEnvironmentImpl, like the following:

Code Block
languagejava
boolean isAtomicCtas = tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED) && dynamicTableSink instanceof SupportsStaging;

...

Code Block
languagejava
Optional<DynamicTableSink> dynamicTableSinkOptional =
        getDynamicTableSink(
                catalogTable,
                tableIdentifier,
                createTableOperation.isTemporary(),
                catalogManager.getCatalog(catalogName)); 
if (tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED)
        && dynamicTableSinkOptional.isPresent()
        && dynamicTableSinkOptional.get() instanceof SupportsStaging) {
    DynamicTableSink  DynamicTableSink dynamicTableSink = dynamicTableSinkOptional.get();
    StagedTable stagedTable =
            ((SupportsStaging) dynamicTableSink)
                    .applyStaging(
                            new SupportsStaging.StagingContext() {
                                @Override
                                public SupportsStaging.StagingPurpose
                                        getStagingPurpose() {
                                    if (createTableOperation
                                            .isIgnoreIfExists()) {
                                        return SupportsStaging.StagingPurpose
                                                .CREATE_TABLE_AS_IF_NOT_EXISTS;
                                    }
                                    return SupportsStaging.StagingPurpose
                                            .CREATE_TABLE_AS;
                                }
                            });
    CtasJobStatusHook ctasJobStatusHook = new CtasJobStatusHook(stagedTable);
    mapOperations.add(
            ctasOperation.toStagedSinkModifyOperation(
                    createTableOperation.getTableIdentifier(),
                    catalogTable,
                    ctasCatalog,
                    dynamicTableSink));
    jobStatusHookList.add(ctasJobStatusHook);
} else {
    // execute CREATE TABLE first for non-atomic CTAS statements
    executeInternal(ctasOperation.getCreateTableOperation());
    mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
}

...

Then implementation of the atomic CTAS operation requires only two steps :

1: DynamicTableSink implements the interface SupportsStaging;

2: Introduce the implementation class of the StagedTable interface.

Hive demo

HiveTableSink implements the applyStaging API:

...