This page is meant as a template for writing a FLIP. To create a FLIP choose Tools->Copy on this page and modify with your content and replace the heading with the next FLIP number and a description of your issue. Replace anything in italics with your own description.
Status
Current state: ["Under Discussion"]
Discussion thread: here (<- link to https://mail-archives.apache.org/mod_mbox/flink-dev/)
JIRA: here (<- link to https://issues.apache.org/jira/browse/FLINK-XXXX)
Released: 1.18
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
CREATE TABLE AS SELECT(CTAS) statement has been support by FLIP-218, but it's not atomic. It will create the table first before job running. If the job execution fails, or is cancelled, the table will not be dropped.
We want Flink to support atomic CTAS, where only the table is created when the Job succeeds.
we refer to FLIP-218: Support SELECT clause in CREATE TABLE(CTAS) , use the existing JobStatusHook mechanism and extend Catalog's new API to implement atomic CTAS capabilities.
Public Interfaces
Introduce interface SupportsStaged , which provided getStagedTable API. If DynamicTableSink implements the interface SupportsStaged, it indicates that it supports atomic operations.
@PublicEvolving public interface SupportsStaged { /** * Create a {@link StagedTable} that provided transaction abstraction. * StagedTable will be combined with {@link JobStatusHook} to achieve atomicity * support in the Flink framework. * * <p>The framework will make sure to call this method with fully validated {@link * ResolvedCatalogTable}. * * @param tablePath path of the table to be created * @param table the table definition * @param SupportsStagedContext Sections beyond basic information, expandable * @return {@link StagedTable} that can be serialized and provides atomic operations */ StagedTable getStagedTable( ObjectPath tablePath, CatalogBaseTable table, SupportsStagedContext context); /** * Extended information for {@link StagedTable}. */ public static class SupportsStagedContext { private boolean ignoreIfExists; private boolean managedTable; public SupportsStagedContext(boolean ignoreIfExists, boolean managedTable) { this.ignoreIfExists = ignoreIfExists; this.managedTable = managedTable; } public boolean isIgnoreIfExists() { return ignoreIfExists; } public boolean isManagedTable() { return managedTable; } } }
Introduce StagedTable interface that support atomic operations.
/** * A {@link StagedTable} for atomic semantics using a two-phase commit protocol, combined with * {@link JobStatusHook} for atomic CTAS. {@link StagedTable} will be a member variable of * CtasJobStatusHook and can be serialized; * * <p>CtasJobStatusHook#onCreated will call the begin method of StagedTable; * CtasJobStatusHook#onFinished will call the commit method of StagedTable; * CtasJobStatusHook#onFailed and CtasJobStatusHook#onCanceled will call the abort method of StagedTable; */ @PublicEvolving public interface StagedTable extends Serializable { /** * This method will be called when the job is started. Similar to what it means to open a * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some * initialization work; For example, initializing the client of the underlying service, the tmp * path of the underlying storage, or even call the start transaction API of the underlying * service, etc. */ void begin(); /** * This method will be called when the job is succeeds. Similar to what it means to commit the * transaction in a relational database; In Flink's atomic CTAS scenario, it is used to do some * data visibility related work; For example, moving the underlying data to the target * directory, writing buffer data to the underlying storage service, or even call the commit * transaction API of the underlying service, etc. */ void commit(); /** * This method will be called when the job is failed or canceled. Similar to what it means to * rollback the transaction in a relational database; In Flink's atomic CTAS scenario, it is * used to do some data cleaning; For example, delete the data in tmp directory, delete the * temporary data in the underlying storage service, or even call the rollback transaction API * of the underlying service, etc. */ void abort(); }
Proposed Changes
First we need to have a Table interface that can be combined with the abstract transaction capability, so we introduce StagedTable, which can perform start transaction, commit transaction, and abort transaction operations.
The three APIs corresponding to StagedTable:
begin : Similar to open transactions, we can do some prep work, such as initializing the client, initializing the data, initializing the directory, etc.
commit : Similar to commit transactions, we can do some data writing, data visibility, table creation, etc.
abort : Similar to abort transactions, we can do some data cleaning, data restoration, etc.
Note: StagedTable must be serializable, because it used on JM.
Then we need somewhere to create the StagedTable, because different TableSink implement atomic CTAS and need to perform different operations,
for example, HiveTableSink needs to access the Hive Metastore and write to HDFS(OSS etc); JDBCTableSink needs to access the back-end database;
Therefore, we introduce the interface SupportsStaged, which, if implemented by DynamicTableSink, indicates that it supports atomic operations, otherwise it does not support atomic operations.
Flink framework can determine whether DynamicTableSink supports atomicity CTAS by whether it implements the interface SupportsStaged, and if it does, get the StagedTable object through the getStagedTable API, otherwise use the non-atomic CTAS implementation.
Identification of atomic CTAS
We can identify whether a DynamicTableSink supports atomic Ctas by determining its type in TableEnvironmentImpl, like the following:
boolean isAtomicCtas = dynamicTableSink instanceof SupportsStaged;
Integrate atomicity CTAS
Introduce CtasJobStatusHook (implements JobStatusHook interface), TwoPhaseCatalogTable is its member variable;
The implementation of the API related to the call to StagedTable is as follows:
/** * This Hook is used to implement the Flink CTAS atomicity semantics, calling the corresponding API * of {@link StagedTable} at different stages of the job. */ public class CtasJobStatusHook implements JobStatusHook { private final StagedTable stagedTable; public CtasJobStatusHook(StagedTable stagedTable) { this.stagedTable = stagedTable; } @Override public void onCreated(JobID jobId) { stagedTable.begin(); } @Override public void onFinished(JobID jobId) { stagedTable.commit(); } @Override public void onFailed(JobID jobId, Throwable throwable) { stagedTable.abort(); } @Override public void onCanceled(JobID jobId) { stagedTable.abort(); } }
Compatibility with existing non-atomic CTAS
We can infer atomicity CTAS support by whether DynamicTableSink implements the interface SupportsStaged or not:
not : it means that atomicity semantics are not supported and the existing code logic is used;
yes : it means that atomicity semantics are supported, then create a CtasJobStatusHook and use the JobStatusHook mechanism to implement atomicity semantics, as described in the code in the previous section.
Optional<DynamicTableSink> dynamicTableSinkOptional = getDynamicTableSink( catalogTable, tableIdentifier, createTableOperation.isTemporary(), catalogManager.getCatalog(catalogName)); ObjectPath objectPath = tableIdentifier.toObjectPath(); if (dynamicTableSinkOptional.isPresent() && dynamicTableSinkOptional.get() instanceof SupportsStaged) { DynamicTableSink dynamicTableSink = dynamicTableSinkOptional.get(); StagedTable stagedTable = ((SupportsStaged) dynamicTableSink) .getStagedTable( objectPath, catalogTable, new SupportsStaged.SupportsStagedContext( createTableOperation.isIgnoreIfExists(), ManagedTableListener.isManagedTable( ctasCatalog, catalogTable))); CtasJobStatusHook ctasJobStatusHook = new CtasJobStatusHook(stagedTable); mapOperations.add( ctasOperation.toStagedSinkModifyOperation( createTableOperation.getTableIdentifier(), catalogTable, ctasCatalog, dynamicTableSink)); jobStatusHookList.add(ctasJobStatusHook); } else { // execute CREATE TABLE first for non-atomic CTAS statements executeInternal(ctasOperation.getCreateTableOperation()); mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager)); }
To avoid secondary generation of DynamicTableSink, we need to construct a StagedSinkModifyOperation that inherits from SinkModifyOperation and then add the DynamicTableSink member variable.
Atomic CTAS demo
Then implementation of the atomic CTAS operation requires only two steps :
- DynamicTableSink implements the interface SupportsStaged;
- Introduce the implementation class of the StagedTable interface.
HiveCatalog demo
HiveTableSink implements the interface SupportsStaged :
@Override public StagedTable getStagedTable( ObjectPath tablePath, CatalogBaseTable table, SupportsStagedContext context) { checkNotNull(tablePath, "tablePath cannot be null"); checkArgument(table instanceof ResolvedCatalogBaseTable, "table should be resolved"); ResolvedCatalogBaseTable<?> resolvedTable = (ResolvedCatalogBaseTable<?>) table; Table hiveTable = HiveTableUtil.instantiateHiveTable( tablePath, resolvedTable, HiveConfUtils.create(jobConf), context.isManagedTable()); HiveStagedTable hiveStagedTable = new HiveStagedTable( hiveVersion, new JobConfWrapper(jobConf), hiveTable, context.isIgnoreIfExists()); return hiveStagedTable; }
HiveStagedTable implements the core logic
/** An implementation of {@link StagedTable} for Hive to support atomic ctas. */ public class HiveStagedTable implements StagedTable { private static final long serialVersionUID = 1L; @Nullable private final String hiveVersion; private final JobConfWrapper jobConfWrapper; private final Table table; private final boolean ignoreIfExists; private transient HiveMetastoreClientWrapper client; // private FileSystemFactory fsFactory; private TableMetaStoreFactory msFactory; private boolean overwrite; private Path tmpPath; private String[] partitionColumns; private boolean dynamicGrouped; private LinkedHashMap<String, String> staticPartitions; private ObjectIdentifier identifier; private PartitionCommitPolicyFactory partitionCommitPolicyFactory; public HiveStagedTable( String hiveVersion, JobConfWrapper jobConfWrapper, Table table, boolean ignoreIfExists) { this.hiveVersion = hiveVersion; this.jobConfWrapper = jobConfWrapper; this.table = table; this.ignoreIfExists = ignoreIfExists; } @Override public void begin() { // init hive metastore client client = HiveMetastoreClientFactory.create( HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion); } @Override public void commit() { try { // create table first client.createTable(table); try { List<PartitionCommitPolicy> policies = Collections.emptyList(); if (partitionCommitPolicyFactory != null) { policies = partitionCommitPolicyFactory.createPolicyChain( Thread.currentThread().getContextClassLoader(), () -> { try { return fsFactory.create(tmpPath.toUri()); } catch (IOException e) { throw new RuntimeException(e); } }); } FileSystemCommitter committer = new FileSystemCommitter( fsFactory, msFactory, overwrite, tmpPath, partitionColumns.length, false, identifier, staticPartitions, policies); committer.commitPartitions(); } catch (Exception e) { throw new TableException("Exception in two phase commit", e); } finally { try { fsFactory.create(tmpPath.toUri()).delete(tmpPath, true); } catch (IOException ignore) { } } } catch (AlreadyExistsException alreadyExistsException) { if (!ignoreIfExists) { throw new FlinkHiveException(alreadyExistsException); } } catch (Exception e) { throw new FlinkHiveException(e); } finally { client.close(); } } @Override public void abort() { // do nothing } public void setFsFactory(FileSystemFactory fsFactory) { this.fsFactory = fsFactory; } public void setMsFactory(TableMetaStoreFactory msFactory) { this.msFactory = msFactory; } public void setOverwrite(boolean overwrite) { this.overwrite = overwrite; } public void setTmpPath(Path tmpPath) { this.tmpPath = tmpPath; } public void setPartitionColumns(String[] partitionColumns) { this.partitionColumns = partitionColumns; } public void setDynamicGrouped(boolean dynamicGrouped) { this.dynamicGrouped = dynamicGrouped; } public void setStaticPartitions(LinkedHashMap<String, String> staticPartitions) { this.staticPartitions = staticPartitions; } public void setIdentifier(ObjectIdentifier identifier) { this.identifier = identifier; } public void setPartitionCommitPolicyFactory( PartitionCommitPolicyFactory partitionCommitPolicyFactory) { this.partitionCommitPolicyFactory = partitionCommitPolicyFactory; } public Table getTable() { return table; } }
JdbcCatalog Demo
JdbcCatalog implements the twoPhaseCreateTable API:
@Override public StagedTable getStagedTable( ObjectPath tablePath, CatalogBaseTable table, SupportsStagedContext context) { ... ... StagedTable stagedTable = new JdbcStagedTable( new ObjectPath(tablePath.getDatabaseName(), tablePath.getObjectName() + "_" + System.currentTimeMillis()), tablePath, tableSchem, jdbcUrl, jdbcUserName, jdbcPassword); return stagedTable; }
JdbcTwoPhaseCatalogTable implements the core logic
/** An implementation of {@link StagedTable} for Jdbc to support atomic ctas. */ public class JdbcStagedTable implements StagedTable { private final ObjectPath tmpTablePath; private final ObjectPath finalTablePath; private final Map<String, String> schema; private final String jdbcUrl; private final String userName; private final String password; public JdbcStagedTable( ObjectPath tmpTablePath, ObjectPath finalTablePath, Map<String, String> schema, String jdbcUrl, String userName, String password) { this.tmpTablePath = tmpTablePath; this.finalTablePath = finalTablePath; this.schema = schema; this.jdbcUrl = jdbcUrl; this.userName = userName; this.password = password; } @Override public void begin() { // create tmp table, writing data to the tmp table Connection connection = getConnection(); connection .prepareStatement("create table " + tmpTablePath.getFullName() + "( ... ... )") .execute(); } @Override public void commit() { // Rename the tmp table to the final table name Connection connection = getConnection(); connection .prepareStatement( "rename table " + tmpTablePath.getFullName() + " to " + finalTablePath.getFullName()) .execute(); } @Override public void abort() { // drop tmp table Connection connection = getConnection(); connection.prepareStatement("drop table " + tmpTablePath.getFullName()).execute(); } private Connection getConnection() { // get jdbc connection return JDBCDriver.getConnection(); } }
Compatibility, Deprecation, and Migration Plan
It is a new feature with no implication for backwards compatibility.
Test Plan
changes will be verified by UT