...
Code Block | ||
---|---|---|
| ||
@PublicEvolving public interface SupportsStaged { /** * Create a {@link StagedTable} that provided transaction abstraction. * StagedTable will be combined with {@link JobStatusHook} to achieve atomicity * support in the Flink framework. * * <p>The framework will make sure to call this method with fully validated {@link * ResolvedCatalogTable}. * * @param tablePath path of the table to be created * @param table the table definition * @param SupportsStagedContext Sections beyond basic information, expandable * @return {@link StagedTable} that can be serialized and provides atomic operations */ StagedTable getStagedTable( ObjectPath tablePath, CatalogBaseTable table, SupportsStagedContext context); /** * Extended information for {@link StagedTable}. */ public static class SupportsStagedContext { private boolean ignoreIfExists; private boolean managedTable; public SupportsStagedContext(boolean ignoreIfExists, boolean managedTable) { this.ignoreIfExists = ignoreIfExists; this.managedTable = managedTable; } public boolean isIgnoreIfExists() { return ignoreIfExists; } public boolean isManagedTable() { return managedTable; } } } |
Introduce TwoPhaseCatalogTable StagedTable interface that support atomic operations.
Code Block | ||
---|---|---|
| ||
/** * A {@link CatalogTableStagedTable} for atomic semantics using a two-phase commit protocol, combined with * {@link JobStatusHook} for atomic CTAS. {@link TwoPhaseCatalogTableStagedTable} will be a member * variable of * CtasJobStatusHook and can be serialized; * * <p> * CtasJobStatusHook#onCreated <p>CtasJobStatusHook#onCreated will call the begin method of TwoPhaseCatalogTableStagedTable; * CtasJobStatusHook#onFinished will call the commit method of TwoPhaseCatalogTableStagedTable; * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the abort method of * TwoPhaseCatalogTableStagedTable; */ @PublicEvolving public interface TwoPhaseCatalogTableStagedTable extends CatalogTable, Serializable { /** * This method will be called when the job is started. Similar to what it means to open a * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some * initialization work; For example, initializing the client of the underlying service, the tmp * path of the underlying storage, or even call the start transaction API of the underlying * service, etc. */ void begin(); /** * This method will be called when the job is succeeds. Similar to what it means to commit the * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some * data visibility related work; For example, moving the underlying data to the target * directory, writing buffer data to the underlying storage service, or even call the commit * transaction API of the underlying service, etc. */ void commit(); /** * This method will be called when the job is failed or canceled. Similar to what it means to * rollback the transaction in a relational database; In Flink's atomic CTAS scenario, it is * used to do some data cleaning; For example, delete the data in tmp directory, delete the * temporary data in the underlying storage service, or even call the rollback transaction API * of the underlying service, etc. */ void abort(); } |
...
First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce TwoPhaseCatalogTable StagedTable, which can perform start transaction, commit transaction, and abort transaction operations.
The three APIs corresponding to TwoPhaseCatalogTableStagedTable:
begin : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.
...
abort : Similar to abort transactions, we can do some data cleaning, data restoration, etc.
Note: TwoPhaseCatalogTable must StagedTable must be serializable, because it used on JM.
Then we need somewhere to create the TwoPhaseCatalogTableStagedTable, because different Catalogs TableSink implement atomic CTAS and need to perform different operations,
for example, HiveCatalog HiveTableSink needs to access the Hive Metastore ; JDBCCatalog and write to HDFS(OSS etc); JDBCTableSink needs to access the back-end database;
Therefore, so we introduce the twoPhaseCreateTable API on the Catalog interface.The definition of the twoPhaseCreateTable API is similar to that of the createTable API, with the extension of the isStreamingMode parameter, in order to provide a different atomicity implementation in different modesinterface SupportsStaged, which, if implemented by DynamicTableSink, indicates that it supports atomic operations, otherwise it does not support atomic operations.
Flink framework can determine whether DynamicTableSink supports atomicity CTAS by whether it implements the interface SupportsStaged, and if it does, get the StagedTable object through the getStagedTable API, otherwise use the non-atomic CTAS implementation.
Identification of atomic CTAS
We can identify whether a CatalogTable DynamicTableSink supports atomic Ctas by determining its type in DynamicTableFactory/DynamicTableSink TableEnvironmentImpl, like the following:
Code Block | ||
---|---|---|
| ||
boolean isAtomicCtas = context.getCatalogTable().getOrigin()dynamicTableSink instanceof TwoPhaseCatalogTableSupportsStaged; |
Integrate atomicity CTAS
...
The implementation of the API related to the call to TwoPhaseCatalogTable is to StagedTable is as follows:
Code Block | ||
---|---|---|
| ||
/** * This Hook is used to implement the Flink CTAS atomicity semantics, calling the corresponding API * of {@link TwoPhaseCatalogTableStagedTable} at different stages of the job. */ public class CtasJobStatusHook implements JobStatusHook { private final TwoPhaseCatalogTableStagedTable twoPhaseCatalogTablestagedTable; public CtasJobStatusHook(TwoPhaseCatalogTableStagedTable twoPhaseCatalogTablestagedTable) { this.twoPhaseCatalogTablestagedTable = twoPhaseCatalogTablestagedTable; } @Override public void onCreated(JobID jobId) { twoPhaseCatalogTablestagedTable.begin(); } @Override public void onFinished(JobID jobId) { twoPhaseCatalogTablestagedTable.commit(); } @Override public void onFailed(JobID jobId, Throwable throwable) { twoPhaseCatalogTablestagedTable.abort(); } @Override public void onCanceled(JobID jobId) { twoPhaseCatalogTablestagedTable.abort(); } } |
Compatibility with existing non-atomic CTAS
The return value of Catalog#twoPhaseCreateTable is Optional, and we can determine whether atomicity semantics are supported based on whether the return value is empty:
We can infer atomicity CTAS support by whether DynamicTableSink implements the interface SupportsStaged or not:
not empty : it means that atomicity semantics are not supported and the existing code logic is used;
not empty yes : it means that atomicity semantics are supported, then create a CtasJobStatusHook and use the JobStatusHook mechanism to implement atomicity semantics, as described in the code in the previous section.
Code Block | ||
---|---|---|
| ||
Optional<TwoPhaseCatalogTable>Optional<DynamicTableSink> twoPhaseCatalogTableOptionaldynamicTableSinkOptional = ctasCatalog.twoPhaseCreateTablegetDynamicTableSink( objectPathcatalogTable, catalogTabletableIdentifier, createTableOperation.isIgnoreIfExistsisTemporary(), isStreamingModecatalogManager.getCatalog(catalogName)); ObjectPath objectPath = tableIdentifier.toObjectPath(); if (twoPhaseCatalogTableOptionaldynamicTableSinkOptional.isPresent()) { // use TwoPhaseCatalogTable for atomic CTAS statements && dynamicTableSinkOptional.get() TwoPhaseCatalogTableinstanceof twoPhaseCatalogTableSupportsStaged) ={ DynamicTableSink dynamicTableSink twoPhaseCatalogTableOptional= dynamicTableSinkOptional.get(); CtasJobStatusHookStagedTable ctasJobStatusHookstagedTable = new CtasJobStatusHook(twoPhaseCatalogTable); ((SupportsStaged) dynamicTableSink) mapOperations.add( ctasOperation.toSinkModifyOperationgetStagedTable( createTableOperation.getTableIdentifier(), objectPath, createTableOperation.getCatalogTable() catalogTable, twoPhaseCatalogTable, new SupportsStaged.SupportsStagedContext( ctasCatalog, catalogManager)); jobStatusHookListcreateTableOperation.addisIgnoreIfExists(ctasJobStatusHook);, } else { // execute CREATE TABLE first for non-atomic CTAS statements executeInternal(ctasOperationManagedTableListener.getCreateTableOperationisManagedTable()); mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager)); } |
Atomicity support on Stream and Batch mode
We usually think of Stream mode jobs as LONG RUNNING, i.e. they never stop, so there is no atomicity semantics, but now flink is the stream batch unified computing engine,
so we introduce isStreamingMode when we define Catalog#twoPhaseCreateTable, and Catalog can decide whether to provide atomicity semantic support.
In the production environment, there are some user-defined streams source will also be finished(no more data input), the job will also be finished,
in this case use atomic semantic implementation, will improve the user experience, by the implementation of Catalog decision.
Atomic CTAS demo
Then implementation of the atomic CTAS operation requires only two steps :
- Catalog implements the twoPhaseCreateTable method;
- Introduce the implementation class of the TwoPhaseCatalogTable interface.
HiveCatalog demo
HiveCatalog implements the twoPhaseCreateTable API:
ctasCatalog, catalogTable)));
CtasJobStatusHook ctasJobStatusHook = new CtasJobStatusHook(stagedTable);
mapOperations.add(
ctasOperation.toStagedSinkModifyOperation(
createTableOperation.getTableIdentifier(),
catalogTable,
ctasCatalog,
dynamicTableSink));
jobStatusHookList.add(ctasJobStatusHook);
} else {
// execute CREATE TABLE first for non-atomic CTAS statements
executeInternal(ctasOperation.getCreateTableOperation());
mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
} |
To avoid secondary generation of DynamicTableSink, we need to construct a StagedSinkModifyOperation that inherits from SinkModifyOperation and then add the DynamicTableSink member variable.
Atomic CTAS demo
Then implementation of the atomic CTAS operation requires only two steps :
- DynamicTableSink implements the interface SupportsStaged;
- Introduce the implementation class of the StagedTable interface.
HiveCatalog demo
HiveTableSink implements the interface SupportsStaged :
Code Block | ||
---|---|---|
| ||
@Override
public StagedTable getStagedTable(
ObjectPath tablePath, CatalogBaseTable table, SupportsStagedContext context) {
checkNotNull(tablePath, "tablePath cannot be null");
checkArgument(table instanceof ResolvedCatalogBaseTable, "table should be resolved");
ResolvedCatalogBaseTable<?> resolvedTable = (ResolvedCatalogBaseTable<?>) table;
Table hiveTable =
HiveTableUtil.instantiateHiveTable(
tablePath,
resolvedTable,
HiveConfUtils.create(jobConf),
context.isManagedTable());
HiveStagedTable hiveStagedTable =
new HiveStagedTable(
hiveVersion,
new JobConfWrapper(jobConf),
hiveTable,
context.isIgnoreIfExists());
return hiveStagedTable;
} |
HiveStagedTable implements the core logic
Code Block | ||
---|---|---|
| ||
/** An implementation of {@link StagedTable} for Hive to support atomic ctas. */
public class HiveStagedTable implements StagedTable {
private static final long serialVersionUID = 1L;
@Nullable private final String hiveVersion;
private final JobConfWrapper jobConfWrapper;
private final Table table;
private final boolean ignoreIfExists;
private transient HiveMetastoreClientWrapper client;
//
private FileSystemFactory fsFactory;
private TableMetaStoreFactory msFactory;
private boolean overwrite;
private Path tmpPath;
private String[] partitionColumns;
private boolean dynamicGrouped;
private LinkedHashMap<String, String> staticPartitions;
private ObjectIdentifier identifier;
private PartitionCommitPolicyFactory partitionCommitPolicyFactory;
public HiveStagedTable(
String hiveVersion,
JobConfWrapper jobConfWrapper,
Table table,
boolean ignoreIfExists) {
this.hiveVersion = hiveVersion;
this.jobConfWrapper = jobConfWrapper;
this.table = table;
this.ignoreIfExists = ignoreIfExists;
}
@Override
public void begin() {
// init hive metastore client
client =
HiveMetastoreClientFactory.create(
HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion);
}
@Override
public void commit() {
try {
// create table first
client.createTable(table);
try {
List<PartitionCommitPolicy> policies = Collections.emptyList();
if (partitionCommitPolicyFactory != null) {
policies =
partitionCommitPolicyFactory.createPolicyChain(
Thread.currentThread().getContextClassLoader(),
() -> {
try {
return fsFactory.create(tmpPath.toUri());
} catch (IOException e) {
throw new RuntimeException(e);
}
});
}
FileSystemCommitter committer =
new FileSystemCommitter(
fsFactory,
msFactory,
overwrite,
tmpPath,
partitionColumns.length,
false,
identifier,
staticPartitions,
| ||
Code Block | ||
| ||
@Override public Optional<TwoPhaseCatalogTable> twoPhaseCreateTable( ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists, boolean isStreamingModepolicies); throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { committer.commitPartitions(); } ifcatch (isStreamingModeException e) { //HiveCatalog does not supportthrow atomicity semanticsnew TableException("Exception in stream mode two phase commit", e); } return Optional.empty();finally { } ... ... try TwoPhaseCatalogTable{ twoPhaseCatalogTable = new HiveTwoPhaseCatalogTable( getHiveVersion(), fsFactory.create(tmpPath.toUri()).delete(tmpPath, true); } newcatch JobConfWrapper(JobConfUtils.createJobConfWithCredentials(hiveConf)), (IOException ignore) { hiveTable, } ignoreIfExists); } return Optional.of(twoPhaseCatalogTable); } |
HiveTwoPhaseCatalogTable implements the core logic
Code Block | ||
---|---|---|
| ||
public class HiveTwoPhaseCatalogTable implements TwoPhaseCatalogTable { catch (AlreadyExistsException alreadyExistsException) { private static final long serialVersionUID = 1L; if @Nullable private final String hiveVersion; (!ignoreIfExists) { private final JobConfWrapper jobConfWrapper; private finalthrow Table tablenew FlinkHiveException(alreadyExistsException); private final boolean ignoreIfExists; } private transient HiveMetastoreClientWrapper client; } publiccatch HiveTwoPhaseCatalogTable( Exception e) { String hiveVersion, throw new FlinkHiveException(e); } JobConfWrapperfinally jobConfWrapper,{ Table table,client.close(); } boolean ignoreIfExists) {} @Override this.hiveVersion = hiveVersion;public void abort() { this.jobConfWrapper = jobConfWrapper;// do nothing } this.table = table;public void setFsFactory(FileSystemFactory fsFactory) { this.ignoreIfExistsfsFactory = ignoreIfExistsfsFactory; } @Override public void beginsetMsFactory(TableMetaStoreFactory msFactory) { // init hive metastore client this.msFactory = msFactory; client =} public void setOverwrite(boolean overwrite) { HiveMetastoreClientFactorythis.create( overwrite = overwrite; } public void setTmpPath(Path tmpPath) { this.tmpPath HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion)= tmpPath; } @Override public void commitsetPartitionColumns() { tryString[] partitionColumns) { this.partitionColumns client.createTable(table)= partitionColumns; } }public catchvoid setDynamicGrouped(AlreadyExistsExceptionboolean alreadyExistsExceptiondynamicGrouped) { this.dynamicGrouped = dynamicGrouped; if (!ignoreIfExists) { } public void setStaticPartitions(LinkedHashMap<String, String> staticPartitions) { throwthis.staticPartitions new FlinkHiveException(alreadyExistsException)= staticPartitions; } } catch (Exception epublic void setIdentifier(ObjectIdentifier identifier) { this.identifier = identifier; throw new FlinkHiveException(e); } public void setPartitionCommitPolicyFactory( } finally { PartitionCommitPolicyFactory partitionCommitPolicyFactory) { client.close(); this.partitionCommitPolicyFactory = }partitionCommitPolicyFactory; } @Override public voidTable abortgetTable() { // may clear staging dir client.close()return table; } } |
JdbcCatalog Demo
JdbcCatalog implements the twoPhaseCreateTable API:
Code Block | ||
---|---|---|
| ||
@Override public Optional<TwoPhaseCatalogTable>StagedTable twoPhaseCreateTablegetStagedTable( ObjectPath tablePath, CatalogBaseTable table, booleanSupportsStagedContext ignoreIfExists, boolean isStreamingMode)context) { throws TableAlreadyExistException, DatabaseNotExistException, CatalogException { ... ... StagedTable stagedTable TwoPhaseCatalogTable twoPhaseCatalogTable = new JdbcTwoPhaseCatalogTableJdbcStagedTable( new ObjectPath(tablePath.getDatabaseName(), tablePath.getObjectName() + "_" + System.currentTimeMillis()), tablePath, tableSchem, jdbcUrl, jdbcUserName, jdbcPassword); return Optional.of(twoPhaseCatalogTable)stagedTable; } |
JdbcTwoPhaseCatalogTable implements the core logic
Code Block | ||
---|---|---|
| ||
/** An implementation of {@link TwoPhaseCatalogTableStagedTable} for Jdbc to support atomic ctas. */ public class JdbcTwoPhaseCatalogTableJdbcStagedTable implements TwoPhaseCatalogTableStagedTable { private final ObjectPath tmpTablePath; private final ObjectPath finalTablePath; private final Map<String, String> schema; private final String jdbcUrl; private final String userName; private final String password; public JdbcTwoPhaseCatalogTableJdbcStagedTable( ObjectPath tmpTablePath, ObjectPath finalTablePath, Map<String, String> schema, String jdbcUrl, String userName, String password) { this.tmpTablePath = tmpTablePath; this.finalTablePath = finalTablePath; this.schema = schema; this.jdbcUrl = jdbcUrl; this.userName = userName; this.password = password; } @Override public void begin() { // create tmp table, writing data to the tmp table Connection connection = getConnection(); connection .prepareStatement("create table " + tmpTablePath.getFullName() + "( ... ... )") .execute(); } @Override public void commit() { // Rename the tmp table to the final table name Connection connection = getConnection(); connection .prepareStatement( "rename table " + tmpTablePath.getFullName() + " to " + finalTablePath.getFullName()) .execute(); } @Override public void abort() { // drop tmp table Connection connection = getConnection(); connection.prepareStatement("drop table " + tmpTablePath.getFullName()).execute(); } private Connection getConnection() { // get jdbc connection return JDBCDriver.getConnection(); } } |
...