Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
@PublicEvolving
public interface SupportsStaged {

    /**
     * Create a {@link StagedTable} that provided transaction abstraction.
     * StagedTable will be combined with {@link JobStatusHook} to achieve atomicity
     * support in the Flink framework.
     *
     * <p>The framework will make sure to call this method with fully validated {@link
     * ResolvedCatalogTable}.
     *
     * @param tablePath path of the table to be created
     * @param table the table definition
     * @param SupportsStagedContext Sections beyond basic information, expandable
     * @return {@link StagedTable} that can be serialized and provides atomic operations
     */
    StagedTable getStagedTable(
            ObjectPath tablePath,
            CatalogBaseTable table,
            SupportsStagedContext context);

    /**
     * Extended information for {@link StagedTable}.
     */
    public static class SupportsStagedContext {

        private boolean ignoreIfExists;
        private boolean managedTable;

        public SupportsStagedContext(boolean ignoreIfExists, boolean managedTable) {
            this.ignoreIfExists = ignoreIfExists;
            this.managedTable = managedTable;
        }

        public boolean isIgnoreIfExists() {
            return ignoreIfExists;
        }

        public boolean isManagedTable() {
            return managedTable;
        }
    }
}


Introduce TwoPhaseCatalogTable StagedTable interface that support atomic operations.

Code Block
languagejava
/**
 * A {@link CatalogTableStagedTable} for atomic semantics using a two-phase commit protocol, combined with
 * {@link JobStatusHook} for atomic CTAS. {@link TwoPhaseCatalogTableStagedTable} will be a member
 * variable of
 * CtasJobStatusHook and can be serialized;
 *
 * <p>
 * CtasJobStatusHook#onCreated <p>CtasJobStatusHook#onCreated will call the begin method of TwoPhaseCatalogTableStagedTable;
 * CtasJobStatusHook#onFinished will call the commit method of TwoPhaseCatalogTableStagedTable;
 * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the abort method of
 * TwoPhaseCatalogTableStagedTable;
 */
@PublicEvolving
public interface TwoPhaseCatalogTableStagedTable extends CatalogTable, Serializable {

    /**
     * This method will be called when the job is started. Similar to what it means to open a
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * initialization work; For example, initializing the client of the underlying service, the tmp
     * path of the underlying storage, or even call the start transaction API of the underlying
     * service, etc.
     */
    void begin();

    /**
     * This method will be called when the job is succeeds. Similar to what it means to commit the
     * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some
     * data visibility related work; For example, moving the underlying data to the target
     * directory, writing buffer data to the underlying storage service, or even call the commit
     * transaction API of the underlying service, etc.
     */
    void commit();

    /**
     * This method will be called when the job is failed or canceled. Similar to what it means to
     * rollback the transaction in a relational database; In Flink's atomic CTAS scenario, it is
     * used to do some data cleaning; For example, delete the data in tmp directory, delete the
     * temporary data in the underlying storage service, or even call the rollback transaction API
     * of the underlying service, etc.
     */
    void abort();
}

...

First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce TwoPhaseCatalogTable StagedTable, which can perform start transaction, commit transaction, and abort transaction operations.

The three APIs corresponding to TwoPhaseCatalogTableStagedTable:

begin : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.

...

abort : Similar to abort transactions, we can do some data cleaning, data restoration, etc.

Note: TwoPhaseCatalogTable must StagedTable must be serializable, because it used on JM.

Then we need somewhere to create the TwoPhaseCatalogTableStagedTable, because different Catalogs TableSink implement atomic CTAS and need to perform different operations,

for example, HiveCatalog HiveTableSink needs to access the Hive Metastore ; JDBCCatalog and write to HDFS(OSS etc); JDBCTableSink needs to access the back-end database;

Therefore, so we introduce the twoPhaseCreateTable API on the Catalog interface.The definition of the twoPhaseCreateTable API is similar to that of the createTable API, with the extension of the isStreamingMode parameter, in order to provide a different atomicity implementation in different modesinterface SupportsStaged, which, if implemented by DynamicTableSink, indicates that it supports atomic operations, otherwise it does not support atomic operations.

Flink framework can determine whether DynamicTableSink supports atomicity CTAS by whether it implements the interface SupportsStaged, and if it does, get the StagedTable object through the getStagedTable API, otherwise use the non-atomic CTAS implementation.

Identification of atomic CTAS

We can identify whether a CatalogTable DynamicTableSink supports atomic Ctas by determining its type in DynamicTableFactory/DynamicTableSink TableEnvironmentImpl, like the following:

Code Block
languagejava
boolean isAtomicCtas = context.getCatalogTable().getOrigin()dynamicTableSink instanceof TwoPhaseCatalogTableSupportsStaged;

Integrate atomicity CTAS

...

The implementation of the API related to the call to TwoPhaseCatalogTable is to StagedTable is as follows: 

Code Block
languagejava
/**
 * This Hook is used to implement the Flink CTAS atomicity semantics, calling the corresponding API
 * of {@link TwoPhaseCatalogTableStagedTable} at different stages of the job.
 */
public class CtasJobStatusHook implements JobStatusHook {

    private final TwoPhaseCatalogTableStagedTable twoPhaseCatalogTablestagedTable;

    public CtasJobStatusHook(TwoPhaseCatalogTableStagedTable twoPhaseCatalogTablestagedTable) {
        this.twoPhaseCatalogTablestagedTable = twoPhaseCatalogTablestagedTable;
    }

    @Override
    public void onCreated(JobID jobId) {
        twoPhaseCatalogTablestagedTable.begin();
    }

    @Override
    public void onFinished(JobID jobId) {
        twoPhaseCatalogTablestagedTable.commit();
    }

    @Override
    public void onFailed(JobID jobId, Throwable throwable) {
        twoPhaseCatalogTablestagedTable.abort();
    }

    @Override
    public void onCanceled(JobID jobId) {
        twoPhaseCatalogTablestagedTable.abort();
    }
}

Compatibility with existing non-atomic CTAS

The return value of Catalog#twoPhaseCreateTable is Optional, and we can determine whether atomicity semantics are supported based on whether the return value is empty:

We can infer atomicity CTAS support by whether DynamicTableSink implements the interface SupportsStaged or not:

not empty it means that atomicity semantics are not supported and the existing code logic is used;

not empty yes : it means that atomicity semantics are supported, then create a CtasJobStatusHook and use the JobStatusHook mechanism to implement atomicity semantics, as described in the code in the previous section.

Code Block
languagejava
Optional<TwoPhaseCatalogTable>Optional<DynamicTableSink> twoPhaseCatalogTableOptionaldynamicTableSinkOptional =
        ctasCatalog.twoPhaseCreateTablegetDynamicTableSink(
                objectPathcatalogTable,
                catalogTabletableIdentifier,
                createTableOperation.isIgnoreIfExistsisTemporary(),
                isStreamingModecatalogManager.getCatalog(catalogName));
ObjectPath objectPath = tableIdentifier.toObjectPath();
if (twoPhaseCatalogTableOptionaldynamicTableSinkOptional.isPresent()) {
	// use TwoPhaseCatalogTable for atomic CTAS statements
  && dynamicTableSinkOptional.get() TwoPhaseCatalogTableinstanceof twoPhaseCatalogTableSupportsStaged) ={
    DynamicTableSink dynamicTableSink       twoPhaseCatalogTableOptional= dynamicTableSinkOptional.get();
    CtasJobStatusHookStagedTable ctasJobStatusHookstagedTable =
            new CtasJobStatusHook(twoPhaseCatalogTable);
((SupportsStaged) dynamicTableSink)
       mapOperations.add(
             ctasOperation.toSinkModifyOperationgetStagedTable(
                    createTableOperation.getTableIdentifier(),
        objectPath,
                          createTableOperation.getCatalogTable()  catalogTable,
                     twoPhaseCatalogTable,
       new SupportsStaged.SupportsStagedContext(
            ctasCatalog,
                    catalogManager));
    jobStatusHookListcreateTableOperation.addisIgnoreIfExists(ctasJobStatusHook);,
}  else {
    // execute CREATE TABLE first for non-atomic CTAS statements
                             executeInternal(ctasOperationManagedTableListener.getCreateTableOperationisManagedTable());
    mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
}

Atomicity support on Stream and Batch mode

We usually think of Stream mode jobs as LONG RUNNING, i.e. they never stop, so there is no atomicity semantics, but now flink is the stream batch unified computing engine, 

so we introduce isStreamingMode when we define Catalog#twoPhaseCreateTable, and Catalog can decide whether to provide atomicity semantic support.

In the production environment, there are some user-defined streams source will also be finished(no more data input), the job will also be finished,

in this case use atomic semantic implementation, will improve the user experience, by the implementation of Catalog decision.

 Atomic CTAS demo

Then implementation of the atomic CTAS operation requires only two steps :

  1. Catalog implements the twoPhaseCreateTable method;
  2. Introduce the implementation class of the TwoPhaseCatalogTable interface.

HiveCatalog demo

HiveCatalog implements the twoPhaseCreateTable API:


                                            ctasCatalog, catalogTable)));
    CtasJobStatusHook ctasJobStatusHook = new CtasJobStatusHook(stagedTable);
    mapOperations.add(
            ctasOperation.toStagedSinkModifyOperation(
                    createTableOperation.getTableIdentifier(),
                    catalogTable,
                    ctasCatalog,
                    dynamicTableSink));
    jobStatusHookList.add(ctasJobStatusHook);
} else {
    // execute CREATE TABLE first for non-atomic CTAS statements
    executeInternal(ctasOperation.getCreateTableOperation());
    mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
}

To avoid secondary generation of DynamicTableSink, we need to construct a StagedSinkModifyOperation that inherits from SinkModifyOperation and then add the DynamicTableSink member variable.

 Atomic CTAS demo

Then implementation of the atomic CTAS operation requires only two steps :

  1. DynamicTableSink implements the interface SupportsStaged;
  2. Introduce the implementation class of the StagedTable interface.

HiveCatalog demo

HiveTableSink implements the interface SupportsStaged :

Code Block
languagejava
@Override
public StagedTable getStagedTable(
        ObjectPath tablePath, CatalogBaseTable table, SupportsStagedContext context) {

    checkNotNull(tablePath, "tablePath cannot be null");
    checkArgument(table instanceof ResolvedCatalogBaseTable, "table should be resolved");

    ResolvedCatalogBaseTable<?> resolvedTable = (ResolvedCatalogBaseTable<?>) table;

    Table hiveTable =
            HiveTableUtil.instantiateHiveTable(
                    tablePath,
                    resolvedTable,
                    HiveConfUtils.create(jobConf),
                    context.isManagedTable());

    HiveStagedTable hiveStagedTable =
            new HiveStagedTable(
                    hiveVersion,
                    new JobConfWrapper(jobConf),
                    hiveTable,
                    context.isIgnoreIfExists());

    return hiveStagedTable;
}

HiveStagedTable implements the core logic

Code Block
languagejava
/** An implementation of {@link StagedTable} for Hive to support atomic ctas. */
public class HiveStagedTable implements StagedTable {

    private static final long serialVersionUID = 1L;

    @Nullable private final String hiveVersion;
    private final JobConfWrapper jobConfWrapper;

    private final Table table;

    private final boolean ignoreIfExists;

    private transient HiveMetastoreClientWrapper client;

    //
    private FileSystemFactory fsFactory;
    private TableMetaStoreFactory msFactory;
    private boolean overwrite;
    private Path tmpPath;
    private String[] partitionColumns;
    private boolean dynamicGrouped;
    private LinkedHashMap<String, String> staticPartitions;
    private ObjectIdentifier identifier;
    private PartitionCommitPolicyFactory partitionCommitPolicyFactory;

    public HiveStagedTable(
            String hiveVersion,
            JobConfWrapper jobConfWrapper,
            Table table,
            boolean ignoreIfExists) {
        this.hiveVersion = hiveVersion;
        this.jobConfWrapper = jobConfWrapper;
        this.table = table;
        this.ignoreIfExists = ignoreIfExists;
    }

    @Override
    public void begin() {
        // init hive metastore client
        client =
                HiveMetastoreClientFactory.create(
                        HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion);
    }

    @Override
    public void commit() {
        try {
            // create table first
            client.createTable(table);

            try {
                List<PartitionCommitPolicy> policies = Collections.emptyList();
                if (partitionCommitPolicyFactory != null) {
                    policies =
                            partitionCommitPolicyFactory.createPolicyChain(
                                    Thread.currentThread().getContextClassLoader(),
                                    () -> {
                                        try {
                                            return fsFactory.create(tmpPath.toUri());
                                        } catch (IOException e) {
                                            throw new RuntimeException(e);
                                        }
                                    });
                }

                FileSystemCommitter committer =
                        new FileSystemCommitter(
                                fsFactory,
                                msFactory,
                                overwrite,
                                tmpPath,
                                partitionColumns.length,
                                false,
                                identifier,
                                staticPartitions,
             
Code Block
languagejava
	@Override
    public Optional<TwoPhaseCatalogTable> twoPhaseCreateTable(
            ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists, boolean isStreamingModepolicies);
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {

 committer.commitPartitions();
            } ifcatch (isStreamingModeException e) {
            //HiveCatalog  does not supportthrow atomicity semanticsnew TableException("Exception in stream mode
two phase commit", e);
            } return Optional.empty();finally {
        }
		... ...
       try TwoPhaseCatalogTable{
 twoPhaseCatalogTable = new HiveTwoPhaseCatalogTable(
                getHiveVersion(),
fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
                } newcatch JobConfWrapper(JobConfUtils.createJobConfWithCredentials(hiveConf)),
        (IOException ignore) {
        hiveTable,
        }
        ignoreIfExists);

    }
    return Optional.of(twoPhaseCatalogTable);
    }

HiveTwoPhaseCatalogTable implements the core logic

Code Block
languagejava
 public class HiveTwoPhaseCatalogTable implements TwoPhaseCatalogTable {

 catch (AlreadyExistsException alreadyExistsException) {
    private static final long serialVersionUID = 1L;

  if  @Nullable private final String hiveVersion;
(!ignoreIfExists) {
        private final JobConfWrapper jobConfWrapper;

    private finalthrow Table tablenew FlinkHiveException(alreadyExistsException);
    private    final boolean ignoreIfExists;

  }
  private transient HiveMetastoreClientWrapper client;

   } publiccatch HiveTwoPhaseCatalogTable(
Exception e) {
          String hiveVersion,
 throw new FlinkHiveException(e);
        } JobConfWrapperfinally jobConfWrapper,{
            Table table,client.close();
        }
    boolean ignoreIfExists) {}

    @Override
    this.hiveVersion = hiveVersion;public void abort() {
        this.jobConfWrapper = jobConfWrapper;// do nothing
    }

    this.table = table;public void setFsFactory(FileSystemFactory fsFactory) {
        this.ignoreIfExistsfsFactory = ignoreIfExistsfsFactory;
    }

    @Override
    public void beginsetMsFactory(TableMetaStoreFactory msFactory) {
        // init hive metastore client
     this.msFactory = msFactory;
   client =}

    public void setOverwrite(boolean overwrite) {
        HiveMetastoreClientFactorythis.create(
overwrite = overwrite;
    }

    public void setTmpPath(Path tmpPath) {
        this.tmpPath  HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion)= tmpPath;
    }

    @Override
    public void commitsetPartitionColumns() {
        tryString[] partitionColumns) {
        this.partitionColumns    client.createTable(table)= partitionColumns;
    }

    }public catchvoid setDynamicGrouped(AlreadyExistsExceptionboolean alreadyExistsExceptiondynamicGrouped) {
        this.dynamicGrouped = dynamicGrouped;
  if (!ignoreIfExists) { }

    public void setStaticPartitions(LinkedHashMap<String, String> staticPartitions) {
        throwthis.staticPartitions new FlinkHiveException(alreadyExistsException)= staticPartitions;
            }

        } catch (Exception epublic void setIdentifier(ObjectIdentifier identifier) {
        this.identifier = identifier;
  throw new FlinkHiveException(e); }

    public void setPartitionCommitPolicyFactory(
  } finally {
        PartitionCommitPolicyFactory partitionCommitPolicyFactory) {
  client.close();
      this.partitionCommitPolicyFactory = }partitionCommitPolicyFactory;
    }

    @Override
    public voidTable abortgetTable() {
		// may clear staging dir
        client.close()return table;
    }
}

JdbcCatalog Demo

JdbcCatalog implements the twoPhaseCreateTable API:

Code Block
languagejava
	@Override     
    public Optional<TwoPhaseCatalogTable>StagedTable twoPhaseCreateTablegetStagedTable(
            ObjectPath tablePath, CatalogBaseTable table, booleanSupportsStagedContext ignoreIfExists, boolean isStreamingMode)context) {
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {

		   ... ...
         
		StagedTable stagedTable  TwoPhaseCatalogTable twoPhaseCatalogTable = new JdbcTwoPhaseCatalogTableJdbcStagedTable(
                new ObjectPath(tablePath.getDatabaseName(), tablePath.getObjectName() + "_" + System.currentTimeMillis()),
				tablePath,
				tableSchem,
				jdbcUrl,
                jdbcUserName,
                jdbcPassword);

        return Optional.of(twoPhaseCatalogTable)stagedTable;
    }

JdbcTwoPhaseCatalogTable implements the core logic

Code Block
languagejava
/** An implementation of {@link TwoPhaseCatalogTableStagedTable} for Jdbc to support atomic ctas. */
public class JdbcTwoPhaseCatalogTableJdbcStagedTable implements TwoPhaseCatalogTableStagedTable {

    private final ObjectPath tmpTablePath;
    private final ObjectPath finalTablePath;
    private final Map<String, String> schema;
    private final String jdbcUrl;
    private final String userName;
    private final String password;

    public JdbcTwoPhaseCatalogTableJdbcStagedTable(
            ObjectPath tmpTablePath,
            ObjectPath finalTablePath,
            Map<String, String> schema,
            String jdbcUrl,
            String userName,
            String password) {
        this.tmpTablePath = tmpTablePath;
        this.finalTablePath = finalTablePath;
        this.schema = schema;
        this.jdbcUrl = jdbcUrl;
        this.userName = userName;
        this.password = password;
    }

    @Override
    public void begin() {
        // create tmp table, writing data to the tmp table
        Connection connection = getConnection();
        connection
                .prepareStatement("create table " + tmpTablePath.getFullName() + "( ... ... )")
                .execute();
    }

    @Override
    public void commit() {
        // Rename the tmp table to the final table name
        Connection connection = getConnection();
        connection
                .prepareStatement(
                        "rename table "
                                + tmpTablePath.getFullName()
                                + " to "
                                + finalTablePath.getFullName())
                .execute();
    }

    @Override
    public void abort() {
        // drop tmp table
        Connection connection = getConnection();
        connection.prepareStatement("drop table " + tmpTablePath.getFullName()).execute();
    }

    private Connection getConnection() {
        // get jdbc connection
        return JDBCDriver.getConnection();
    }
}

...