Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
/**
 * A {@link StagedTable} for atomic semantics using a two-phase commit protocol, combined with
 * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member variable of
 * CtasJobStatusHook and can be serialized;
 *
 * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable;
 * CtasJobStatusHook#onFinished will call the commit method of StagedTable;
 * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the abort method of StagedTable;
 */
@PublicEvolving
public interface StagedTable extends Serializable {

    /**
     * This method will be called when the job is started. Similar to what it means to open a
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * initialization work; For example, initializing the client of the underlying service, the tmp
     * path of the underlying storage, or even call the start transaction API of the underlying
     * service, etc.
     */
    void begin();

    /**
     * This method will be called when the job is succeeds. Similar to what it means to commit the
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * data visibility related work; For example, moving the underlying data to the target
     * directory, writing buffer data to the underlying storage service, or even call the commit
     * transaction API of the underlying service, etc.
     */
    void commit();

    /**
     * This method will be called when the job is failed or canceled. Similar to what it means to
     * rollback the transaction in a relational database; In Flink's atomic CTAS scenario, it is
     * used to do some data cleaning; For example, delete the data in tmp directory, delete the
     * temporary data in the underlying storage service, or even call the rollback transaction API
     * of the underlying service, etc.
     */
    void abort();
}

TableConfigOptions

Add table.ctas.atomicity-enabled option to allow users to enable atomicity when using create table as select syntax.

Code Block
languagejava
@PublicEvolving
public class TableConfigOptions {
    @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
    public static final ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
            key("table.ctas.atomicity-enabled")
                    .booleanType()
                    .defaultValue(false)
                    .withDescription(
                            "Specifies if the create table as select operation is executed atomically. "
                                  + "By default, the operation is non-atomic. The target table is created in Client side, and it will not be dropped even though the job fails or is cancelled. "
                                  + "If set this option to true and DynamicTableSink implements the SupportsStaging interface, the target table is created in JM side, it also will be dropped when the job fails or is cancelled.");
}

Proposed Changes

First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce StagedTable, which can perform start transaction, commit transaction, and abort transaction operations.

...

Flink framework can determine whether DynamicTableSink supports atomicity CTAS by whether it implements the interface SupportsStaging, and if it does, get the StagedTable object through the applyStaging API, otherwise use the non-atomic CTAS implementation.

Identification of atomic CTAS

it implements the interface SupportsStaging, and if it does, get the StagedTable object through the applyStaging API, otherwise use the non-atomic CTAS implementation.

Identification of atomic CTAS

At present, there may be a large number of flink jobs using non-atomic CTAS functions, especially Stream jobs, in order to ensure the consistency of flink behavior, and to give the user maximum flexibility, in time DynamicTableSink implements the SupportsStaging interface, users can still choose non-atomic implementation according to business needs.

So, we can infer in the TableEnvironmentImpl whether atomic CTAS is used based on whether the user has enabled it and whether DynamicTableSink implements the SupportsStaging interfaceWe can identify whether a DynamicTableSink supports atomic Ctas by determining its type in TableEnvironmentImpl, like the following:

Code Block
languagejava
boolean isAtomicCtas = tableConfig.get(TableConfigOptions.TABLE_CTAS_ATOMICITY_ENABLED) && dynamicTableSink instanceof SupportsStaging;

...