You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.

Status

Current state["Under Discussion"]

Discussion threadhere (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)

JIRAhere (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)

Released: 1.18

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

CREATE TABLE AS SELECT(CTAS) statement has been support by FLIP-218, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the table will not be dropped.

We want Flink to support atomic CTAS, where only the table is created when the Job succeeds. 

we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) , use the existing JobStatusHook mechanism and extend Catalog's new API to implement atomic CTAS capabilities.

Public Interfaces


Introduce createTwoPhaseCatalogTable API for Catalog.

@PublicEvolving
public interface Catalog {

    /**
     * Create a {@link TwoPhaseCatalogTable} that provided transaction abstraction.
     * TwoPhaseCatalogTable will be combined with {@link JobStatusHook} to achieve atomicity
     * support in the Flink framework. Default returns empty, indicating that atomic operations are
     * not supported, then using non-atomic implementations.
     *
     * <p>The framework will make sure to call this method with fully validated {@link
     * ResolvedCatalogTable}.
     *
     * @param tablePath path of the table to be created
     * @param table the table definition
     * @param ignoreIfExists flag to specify behavior when a table or view already exists at the
     *     given path: if set to false, it throws a TableAlreadyExistException, if set to true, do
     *     nothing.
     * @param isStreamingMode A flag that tells if the current table is in stream mode, Different
     *     modes can have different implementations of atomicity support.
     * @return {@link TwoPhaseCatalogTable} that can be serialized and provides atomic
     *     operations
     * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false
     * @throws DatabaseNotExistException if the database in tablePath doesn't exist
     * @throws CatalogException in case of any runtime exception
     */
    default Optional<TwoPhaseCatalogTable> createTwoPhaseCatalogTable(
            ObjectPath tablePath,
            CatalogBaseTable table,
            boolean ignoreIfExists,
            boolean isStreamingMode)
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
        return Optional.empty();
    }

}


Introduce TwoPhaseCatalogTable interface that support atomic operations.

/**
 * A {@link CatalogTable} for atomic semantics using a two-phase commit protocol, combined with
 * {@link JobStatusHook} for atomic CTAS. {@link TwoPhaseCatalogTable} will be a member
 * variable of CtasJobStatusHook and can be serialized;
 *
 * <p>
 * CtasJobStatusHook#onCreated will call the beginTransaction method of TwoPhaseCatalogTable;
 * CtasJobStatusHook#onFinished will call the commit method of TwoPhaseCatalogTable;
 * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the abort method of
 * TwoPhaseCatalogTable;
 */
@PublicEvolving
public interface TwoPhaseCatalogTable extends CatalogTable, Serializable {

    /**
     * This method will be called when the job is started. Similar to what it means to open a
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * initialization work; For example, initializing the client of the underlying service, the tmp
     * path of the underlying storage, or even call the start transaction API of the underlying
     * service, etc.
     */
    void beginTransaction();

    /**
     * This method will be called when the job is succeeds. Similar to what it means to commit the
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * data visibility related work; For example, moving the underlying data to the target
     * directory, writing buffer data to the underlying storage service, or even call the commit
     * transaction API of the underlying service, etc.
     */
    void commit();

    /**
     * This method will be called when the job is failed or canceled. Similar to what it means to
     * rollback the transaction in a relational database; In Flink's atomic CTAS scenario, it is
     * used to do some data cleaning; For example, delete the data in tmp directory, delete the
     * temporary data in the underlying storage service, or even call the rollback transaction API
     * of the underlying service, etc.
     */
    void abort();
}


Proposed Changes

First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce TwoPhaseCatalogTable, which can perform start transaction, commit transaction, and abort transaction operations.

The three APIs corresponding to TwoPhaseCatalogTable:

beginTransaction : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.

commit : Similar to commit transactions, we can do some data writing, data visibility, table creation, etc.

abort : Similar to abort transactions, we can do some data cleaning, data restoration, etc.

Note: TwoPhaseCatalogTable must be serializable, because it used on JM.

Then we need somewhere to create the TwoPhaseCatalogTable, because different Catalogs implement atomic CTAS and need to perform different operations,

for example, HiveCatalog needs to access the Hive Metastore; JDBCCatalog needs to access the back-end database, so we introduce the createTwoPhaseCatalogTable API on the Catalog interface.

The definition of the createTwoPhaseCatalogTable API is similar to that of the createTable API, with the extension of the isStreamingMode parameter, in order to provide a different atomicity implementation in different modes.

Integrate atomicity CTAS

Introduce CtasJobStatusHook (implements JobStatusHook interface), TwoPhaseCatalogTable is its member variable; 

The implementation of the API related to the call to TwoPhaseCatalogTable is as follows: 

/**
 * This Hook is used to implement the Flink CTAS atomicity semantics, calling the corresponding API
 * of {@link TwoPhaseCatalogTable} at different stages of the job.
 */
public class CtasJobStatusHook implements JobStatusHook {

    private final TwoPhaseCatalogTable twoPhaseCatalogTable;

    public CtasJobStatusHook(TwoPhaseCatalogTable twoPhaseCatalogTable) {
        this.twoPhaseCatalogTable = twoPhaseCatalogTable;
    }

    @Override
    public void onCreated(JobID jobId) {
        twoPhaseCatalogTable.beginTransaction();
    }

    @Override
    public void onFinished(JobID jobId) {
        twoPhaseCatalogTable.commit();
    }

    @Override
    public void onFailed(JobID jobId, Throwable throwable) {
        twoPhaseCatalogTable.abort();
    }

    @Override
    public void onCanceled(JobID jobId) {
        twoPhaseCatalogTable.abort();
    }
}

Compatibility with existing non-atomic CTAS

The return value of Catalog#createTwoPhaseCatalogTable is Optional, and we can determine whether atomicity semantics are supported based on whether the return value is empty:

empty :  it means that atomicity semantics are not supported and the existing code logic is used;

not empty : it means that atomicity semantics are supported, then create a CtasJobStatusHook and use the JobStatusHook mechanism to implement atomicity semantics, as described in the code in the previous section.

Optional<TwoPhaseCatalogTable> twoPhaseCatalogTableOptional =
        ctasCatalog.createTwoPhaseCatalogTable(
                objectPath,
                catalogTable,
                createTableOperation.isIgnoreIfExists(),
                isStreamingMode);

if (twoPhaseCatalogTableOptional.isPresent()) {
	// use TwoPhaseCatalogTable for atomic CTAS statements
    TwoPhaseCatalogTable twoPhaseCatalogTable =
            twoPhaseCatalogTableOptional.get();
    CtasJobStatusHook ctasJobStatusHook =
            new CtasJobStatusHook(twoPhaseCatalogTable);
    mapOperations.add(
            ctasOperation.toSinkModifyOperation(
                    createTableOperation.getTableIdentifier(),
                    createTableOperation.getCatalogTable(),
                    twoPhaseCatalogTable,
                    ctasCatalog,
                    catalogManager));
    jobStatusHookList.add(ctasJobStatusHook);
} else {
    // execute CREATE TABLE first for non-atomic CTAS statements
    executeInternal(ctasOperation.getCreateTableOperation());
    mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
}

Atomicity support on Stream and Batch mode

We usually think of Stream mode jobs as LONG RUNNING, i.e. they never stop, so there is no atomicity semantics, but now flink is the stream batch unified computing engine, 

so we introduce isStreamingMode when we define Catalog#createTwoPhaseCatalogTable, and Catalog can decide whether to provide atomicity semantic support.

In the production environment, there are some user-defined streams source will also be finished, the job will also be finished (no more data input),

in this case use atomic semantic implementation, will improve the user experience, by the implementation of Catalog decision.

HiveCatalog implementation of atomic CTAS demo

Then implementation of the atomic CTAS operation requires only two steps :

  1. Catalog implements the createTwoPhaseCatalogTable method;
  2. Introduce the implementation class of the TwoPhaseCatalogTable interface.

HiveCatalog implements the createTwoPhaseCatalogTable API:

	@Override
    public Optional<TwoPhaseCatalogTable> createTwoPhaseCatalogTable(
            ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists, boolean isStreamingMode)
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {

        if (isStreamingMode) {
            //HiveCatalog does not support atomicity semantics in stream mode
            return Optional.empty();
        }

        checkNotNull(tablePath, "tablePath cannot be null");
        checkArgument(table instanceof ResolvedCatalogBaseTable, "table should be resolved");

        ResolvedCatalogBaseTable<?> resolvedTable = (ResolvedCatalogBaseTable<?>) table;
        if (!databaseExists(tablePath.getDatabaseName())) {
            throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
        }
        if (!ignoreIfExists && tableExists(tablePath)) {
            throw new TableAlreadyExistException(getName(), tablePath);
        }

        boolean managedTable = ManagedTableListener.isManagedTable(this, resolvedTable);

        Table hiveTable =
                HiveTableUtil.instantiateHiveTable(
                        tablePath, resolvedTable, hiveConf, managedTable);

        TwoPhaseCatalogTable twoPhaseCatalogTable = new HiveTwoPhaseCatalogTable(
                getHiveVersion(),
                new JobConfWrapper(JobConfUtils.createJobConfWithCredentials(hiveConf)),
                hiveTable,
                ignoreIfExists);

        return Optional.of(twoPhaseCatalogTable);
    }

HiveTwoPhaseCatalogTable implements the core logic

/**
 * An implementation of {@link TwoPhaseCatalogTable} for Hive to
 * support atomic ctas.
 */
public class HiveTwoPhaseCatalogTable implements TwoPhaseCatalogTable {

    private static final long serialVersionUID = 1L;

    @Nullable private final String hiveVersion;
    private final JobConfWrapper jobConfWrapper;

    private final Table table;
    private final boolean ignoreIfExists;

    private transient HiveMetastoreClientWrapper client;

    public HiveTwoPhaseCatalogTable(
            String hiveVersion,
            JobConfWrapper jobConfWrapper,
            Table table,
            boolean ignoreIfExists) {
        this.hiveVersion = hiveVersion;
        this.jobConfWrapper = jobConfWrapper;
        this.table = table;
        this.ignoreIfExists = ignoreIfExists;
    }

    @Override
    public void beginTransaction() {
        // init hive metastore client
        client =
                HiveMetastoreClientFactory.create(
                        HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion);
    }

    @Override
    public void commit() {
        try {
            client.createTable(table);
        } catch (AlreadyExistsException alreadyExistsException) {
            if (!ignoreIfExists) {
                throw new FlinkHiveException(alreadyExistsException);
            }
        } catch (Exception e) {
            throw new FlinkHiveException(e);
        } finally {
            client.close();
        }
    }

    @Override
    public void abort() {
        // do nothing
    }
}


Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT





  • No labels