...
Public Interfaces
Introduce getTwoPhaseCommitCreateTable twoPhaseCreateTable API for Catalog.
Code Block | ||
---|---|---|
| ||
@PublicEvolving public interface Catalog { /** * Create a {@link TwoPhaseCommitCatalogTableTwoPhaseCatalogTable} that provided transaction abstraction. * TwoPhaseCommitCatalogTableTwoPhaseCatalogTable will be combined with {@link JobStatusHook} to achieve atomicity * support in the Flink framework. Default returns empty, indicating that atomic operations are * not supported, then using non-atomic implementations. * * <p>The framework will make sure to call this method with fully validated {@link * ResolvedCatalogTable}. * * @param tablePath path of the table to be created * @param table the table definition * @param ignoreIfExists flag to specify behavior when a table or view already exists at the * given path: if set to false, it throws a TableAlreadyExistException, if set to true, do * nothing. * @param isStreamingMode A flag that tells if the current table is in stream mode, Different * modes can have different implementations of atomicity support. * @return {@link TwoPhaseCommitCatalogTableTwoPhaseCatalogTable} that can be serialized and provides atomic * operations * @throws TableAlreadyExistException if table already exists and ignoreIfExists is false * @throws DatabaseNotExistException if the database in tablePath doesn't exist * @throws CatalogException in case of any runti me exception */ default Optional<TwoPhaseCommitCatalogTable>Optional<TwoPhaseCatalogTable> getTwoPhaseCommitCreateTabletwoPhaseCreateTable( ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists, boolean isStreamingMode) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { return Optional.empty(); } } |
Introduce TwoPhaseCommitCatalogTable TwoPhaseCatalogTable interface that support atomic operations.
Code Block | ||
---|---|---|
| ||
/** * A {@link CatalogTable} for atomic semantics using a two-phase commit protocol, combined with * {@link JobStatusHook} for atomic CTAS. {@link TwoPhaseCommitCatalogTableTwoPhaseCatalogTable} will be a member * variable of CtasJobStatusHook and can be serialized; * * <p> * CtasJobStatusHook#onCreated will call the beginTransactionbegin method of TwoPhaseCommitCatalogTableTwoPhaseCatalogTable; * CtasJobStatusHook#onFinished will call the commit method of TwoPhaseCommitCatalogTableTwoPhaseCatalogTable; * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the abort method of * TwoPhaseCommitCatalogTableTwoPhaseCatalogTable; */ @PublicEvolving public interface TwoPhaseCommitCatalogTableTwoPhaseCatalogTable extends CatalogTable, Serializable { /** * This method will be called when the job is started. Similar to what it means to open a * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some * initialization work; For example, initializing the client of the underlying service, the tmp * path of the underlying storage, or even call the start transaction API of the underlying * service, etc. */ void beginTransactionbegin(); /** * This method will be called when the job is succeeds. Similar to what it means to commit the * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some * data visibility related work; For example, moving the underlying data to the target * directory, writing buffer data to the underlying storage service, or even call the commit * transaction API of the underlying service, etc. */ void commit(); /** * This method will be called when the job is failed or canceled. Similar to what it means to * rollback the transaction in a relational database; In Flink's atomic CTAS scenario, it is * used to do some data cleaning; For example, delete the data in tmp directory, delete the * temporary data in the underlying storage service, or even call the rollback transaction API * of the underlying service, etc. */ void abort(); } |
...
First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce TwoPhaseCommitCatalogTable TwoPhaseCatalogTable, which can perform start transaction, commit transaction, and abort transaction operations.
The three APIs corresponding to TwoPhaseCommitCatalogTableTwoPhaseCatalogTable:
beginTransaction begin : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.
...
abort : Similar to abort transactions, we can do some data cleaning, data restoration, etc.
Note: TwoPhaseCommitCatalogTable must TwoPhaseCatalogTable must be serializable, because it used on JM.
Then we need somewhere to create the TwoPhaseCommitCatalogTableTwoPhaseCatalogTable, because different Catalogs implement atomic CTAS and need to perform different operations,
for example, HiveCatalog needs to access the Hive Metastore; JDBCCatalog needs to access the back-end database, so we introduce the getTwoPhaseCommitCreateTabletwoPhaseCreateTable API on the Catalog interface.
The definition of the getTwoPhaseCommitCreateTabletwoPhaseCreateTable API is similar to that of the createTable API, with the extension of the isStreamingMode parameter, in order to provide a different atomicity implementation in different modes.
...
Introduce CtasJobStatusHook (implements JobStatusHook interface), TwoPhaseCommitCatalogTable is TwoPhaseCatalogTable is its member variable;
The implementation of the API related to the call to TwoPhaseCommitCatalogTable is TwoPhaseCatalogTable is as follows:
Code Block | ||
---|---|---|
| ||
/** * This Hook is used to implement the Flink CTAS atomicity semantics, calling the corresponding API * of {@link TwoPhaseCatalogTable} at different stages of the job. */ public class CtasJobStatusHook implements JobStatusHook { private final TwoPhaseCatalogTable twoPhaseCatalogTable; public CtasJobStatusHook(TwoPhaseCatalogTable twoPhaseCatalogTable) { this.twoPhaseCatalogTable = twoPhaseCatalogTable; } @Override public void onCreated(JobID jobId) { twoPhaseCommitCatalogTabletwoPhaseCatalogTable.beginTransactionbegin(); } @Override public void onFinished(JobID jobId) { twoPhaseCommitCatalogTabletwoPhaseCatalogTable.commit(); } @Override public void onFailed(JobID jobId, Throwable throwable) { twoPhaseCommitCatalogTabletwoPhaseCatalogTable.abort(); } @Override public void onCanceled(JobID jobId) { twoPhaseCommitCatalogTabletwoPhaseCatalogTable.abort(); } } |
Compatibility with existing non-atomic CTAS
The return value of Catalog#getTwoPhaseCommitCreateTable is Catalog#twoPhaseCreateTable is Optional, and we can determine whether atomicity semantics are supported based on whether the return value is empty:
...
not empty : it means that atomicity semantics are supported, then create a CtasJobStatusHook and use the JobStatusHook mechanism to implement atomicity semantics, as described in the code in the previous section.
Code Block | ||
---|---|---|
| ||
Optional<TwoPhaseCommitCatalogTable>Optional<TwoPhaseCatalogTable> twoPhaseCatalogTableOptional = ctasCatalog.getTwoPhaseCommitCreateTabletwoPhaseCreateTable( objectPath, catalogTable, createTableOperation.isIgnoreIfExists(), isStreamingMode); if (twoPhaseCommitCatalogTableOptionaltwoPhaseCatalogTableOptional.isPresent()) { // use TwoPhaseCommitCatalogTableTwoPhaseCatalogTable for atomic CTAS statements TwoPhaseCommitCatalogTableTwoPhaseCatalogTable twoPhaseCommitCatalogTabletwoPhaseCatalogTable = twoPhaseCommitCatalogTableOptionaltwoPhaseCatalogTableOptional.get(); CtasJobStatusHook ctasJobStatusHook = new CtasJobStatusHook(twoPhaseCommitCatalogTabletwoPhaseCatalogTable); mapOperations.add( ctasOperation.toSinkModifyOperation( createTableOperation.getTableIdentifier(), createTableOperation.getCatalogTable(), twoPhaseCommitCatalogTabletwoPhaseCatalogTable, ctasCatalog, catalogManager)); jobStatusHookList.add(ctasJobStatusHook); } else { // execute CREATE TABLE first for non-atomic CTAS statements executeInternal(ctasOperation.getCreateTableOperation()); mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager)); } |
...
so we introduce isStreamingMode when we define Catalog#getTwoPhaseCommitCreateTableCatalog#twoPhaseCreateTable, and Catalog can decide whether to provide atomicity semantic support.
...
Then implementation of the atomic CTAS operation requires only two steps :
- Catalog implements the getTwoPhaseCommitCreateTable methodtwoPhaseCreateTable method;
- Introduce the implementation class of the TwoPhaseCommitCatalogTable interfaceTwoPhaseCatalogTable interface.
HiveCatalog demo
HiveCatalog implements the getTwoPhaseCommitCreateTable APItwoPhaseCreateTable API:
Code Block | ||
---|---|---|
| ||
@Override public Optional<TwoPhaseCommitCatalogTable>Optional<TwoPhaseCatalogTable> getTwoPhaseCommitCreateTabletwoPhaseCreateTable( ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists, boolean isStreamingMode) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { if (isStreamingMode) { //HiveCatalog does not support atomicity semantics in stream mode return Optional.empty(); } ... ... TwoPhaseCommitCatalogTableTwoPhaseCatalogTable twoPhaseCommitCatalogTabletwoPhaseCatalogTable = new HiveTwoPhaseCommitCatalogTableHiveTwoPhaseCatalogTable( getHiveVersion(), new JobConfWrapper(JobConfUtils.createJobConfWithCredentials(hiveConf)), hiveTable, ignoreIfExists); return Optional.of(twoPhaseCommitCatalogTabletwoPhaseCatalogTable); } |
...
HiveTwoPhaseCatalogTable implements the core logic
Code Block | ||
---|---|---|
| ||
public class HiveTwoPhaseCommitCatalogTableHiveTwoPhaseCatalogTable implements TwoPhaseCommitCatalogTableTwoPhaseCatalogTable { private static final long serialVersionUID = 1L; @Nullable private final String hiveVersion; private final JobConfWrapper jobConfWrapper; private final Table table; private final boolean ignoreIfExists; private transient HiveMetastoreClientWrapper client; public HiveTwoPhaseCommitCatalogTableHiveTwoPhaseCatalogTable( String hiveVersion, JobConfWrapper jobConfWrapper, Table table, boolean ignoreIfExists) { this.hiveVersion = hiveVersion; this.jobConfWrapper = jobConfWrapper; this.table = table; this.ignoreIfExists = ignoreIfExists; } @Override public void beginTransactionbegin() { // init hive metastore client client = HiveMetastoreClientFactory.create( HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion); } @Override public void commit() { try { client.createTable(table); } catch (AlreadyExistsException alreadyExistsException) { if (!ignoreIfExists) { throw new FlinkHiveException(alreadyExistsException); } } catch (Exception e) { throw new FlinkHiveException(e); } finally { client.close(); } } @Override public void abort() { // may clear staging dir client.close(); } } |
JdbcCatalog Demo
JdbcCatalog
...
implements the
...
twoPhaseCreateTable API:
Code Block | ||
---|---|---|
| ||
@Override public Optional<TwoPhaseCommitCatalogTable>Optional<TwoPhaseCatalogTable> getTwoPhaseCommitCreateTabletwoPhaseCreateTable( ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists, boolean isStreamingMode) throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { ... ... TwoPhaseCommitCatalogTableTwoPhaseCatalogTable twoPhaseCommitCatalogTabletwoPhaseCatalogTable = new JdbcTwoPhaseCommitCatalogTableJdbcTwoPhaseCatalogTable( new ObjectPath(tablePath.getDatabaseName(), tablePath.getObjectName() + "_" + System.currentTimeMillis()), tablePath, tableSchem, jdbcUrl, jdbcUserName, jdbcPassword); return Optional.of(twoPhaseCommitCatalogTabletwoPhaseCatalogTable); } |
...
JdbcTwoPhaseCatalogTable implements the core logic
Code Block | ||
---|---|---|
| ||
/** An implementation of {@link TwoPhaseCommitCatalogTableTwoPhaseCatalogTable} for Jdbc to support atomic ctas. */ public class JdbcTwoPhaseCommitCatalogTableJdbcTwoPhaseCatalogTable implements TwoPhaseCommitCatalogTableTwoPhaseCatalogTable { private final ObjectPath tmpTablePath; private final ObjectPath finalTablePath; private final Map<String, String> schema; private final String jdbcUrl; private final String userName; private final String password; public JdbcTwoPhaseCommitCatalogTableJdbcTwoPhaseCatalogTable( ObjectPath tmpTablePath, ObjectPath finalTablePath, Map<String, String> schema, String jdbcUrl, String userName, String password) { this.tmpTablePath = tmpTablePath; this.finalTablePath = finalTablePath; this.schema = schema; this.jdbcUrl = jdbcUrl; this.userName = userName; this.password = password; } @Override public void beginTransactionbegin() { // create tmp table, writing data to the tmp table Connection connection = getConnection(); connection .prepareStatement("create table " + tmpTablePath.getFullName() + "( ... ... )") .execute(); } @Override public void commit() { // Rename the tmp table to the final table name Connection connection = getConnection(); connection .prepareStatement( "rename table " + tmpTablePath.getFullName() + " to " + finalTablePath.getFullName()) .execute(); } @Override public void abort() { // drop tmp table Connection connection = getConnection(); connection.prepareStatement("drop table " + tmpTablePath.getFullName()).execute(); } private Connection getConnection() { // get jdbc connection return JDBCDriver.getConnection(); } } |
...