Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Public Interfaces


Introduce interface SupportsStaged SupportsStaging , which provided getStagedTable applyStaging API. If DynamicTableSink implements the interface SupportsStagedSupportsStaging, it indicates that it supports atomic operations.

Code Block
languagejava
@PublicEvolving/**
public interface* SupportsStagedEnables {

different staged operations to /**
ensure atomicity in   * Create a {@link StagedTableDynamicTableSink} that provided transaction abstraction..
 *
 * <p>By default, if *this StagedTableinterface willis benot combinedimplemented, withindicating {@linkthat JobStatusHook}atomic tooperations achieveare atomicitynot
 * supported, then a *non-atomic supportimplementation in the Flink frameworkis used.
 */
@PublicEvolving
public interface SupportsStaging  *{

     /**
 <p>The framework will make sure* toProvides calla this{@link methodStagedTable} withthat fullyprovided validated {@link
     * ResolvedCatalogTable}.transaction abstraction. StagedTable will be
     *
 combined with {@link JobStatusHook} *to @paramachieve tablePathatomicity pathsupport ofin the tableFlink to be createdframework. Call
     * @paramthe tablerelevant theAPI tableof definition
StagedTable when the Job state *is @paramswitched.
 SupportsStagedContext Sections beyond basic information, expandable*
     * @return<p>This {@linkmethod StagedTable} that can will be serializedcalled andat providesthe atomiccompile operationsstage.
     */
    StagedTable getStagedTable(
* @param StagingContext Tell DynamicTableSink, the operation type of this   ObjectPath tablePathStagedTable,
     *     expandable
  CatalogBaseTable table,
  * @return {@link StagedTable} that can be serialized and provides SupportsStagedContextatomic context);operations

     */**
    StagedTable * Extended information for {@link StagedTable}.applyStaging(StagingContext context);

    /**
     */
 The context is publicintended staticto classtell SupportsStagedContextDynamicTableSink {

the type of this operation. In this way,
 private boolean ignoreIfExists;
  * DynamicTableSink can return the corresponding privateimplementation boolean managedTable;

   of StagedTable according to the
     public* SupportsStagedContext(boolean ignoreIfExists, boolean managedTable) {
       specific operation. More types of operations can be extended in the future.
     this.ignoreIfExists = ignoreIfExists;
*/
    interface StagingContext {
        this.managedTable = managedTableStagingPurpose getStagingPurpose();
        }

    enum StagingPurpose {
  public boolean isIgnoreIfExists() {
            return ignoreIfExists;CREATE_TABLE_AS,
        }

        public boolean isManagedTable() {
            return managedTable;CREATE_TABLE_AS_IF_NOT_EXISTS
        }
    }
}


Introduce StagedTable interface that support atomic operations.

...

Therefore, we introduce the interface SupportsStaged SupportsStaging, which, if implemented by DynamicTableSink, indicates that it supports atomic operations, otherwise it does not support atomic operations.

Flink framework can determine whether DynamicTableSink supports atomicity CTAS by whether it implements the interface SupportsStagedSupportsStaging, and if it does, get the StagedTable object through the getStagedTable API applyStaging API, otherwise use the non-atomic CTAS implementation.

...

Code Block
languagejava
boolean isAtomicCtas = dynamicTableSink instanceof SupportsStagedSupportsStaging;

Integrate atomicity CTAS

Introduce CtasJobStatusHook (implements JobStatusHook interface), TwoPhaseCatalogTable is StagedTable is its member variable; 

The implementation of the API related to the call to StagedTable is as follows: 

...

We can infer atomicity CTAS support by whether DynamicTableSink implements the interface SupportsStaged or SupportsStaging or not:

not :  it means that atomicity semantics are not supported and the existing code logic is used;

...

Code Block
languagejava
Optional<DynamicTableSink> dynamicTableSinkOptional =
        getDynamicTableSink(
                catalogTable,
                tableIdentifier,
                createTableOperation.isTemporary(),
                catalogManager.getCatalog(catalogName));
ObjectPath objectPath = tableIdentifier.toObjectPath();
if (dynamicTableSinkOptional.isPresent()
        && dynamicTableSinkOptional.get() instanceof SupportsStagedSupportsStaging) {
    DynamicTableSink dynamicTableSink = dynamicTableSinkOptional.get();
    StagedTable stagedTable =
            ((SupportsStagedSupportsStaging) dynamicTableSink)
                    .getStagedTableapplyStaging(
                            objectPath,
new SupportsStaging.StagingContext() {
                                catalogTable,@Override
                            new SupportsStaged.SupportsStagedContext(
    public SupportsStaging.StagingPurpose
                                      createTableOperation.isIgnoreIfExists  getStagingPurpose(), {
                                    ManagedTableListener.isManagedTableif (createTableOperation
                                            ctasCatalog, catalogTable.isIgnoreIfExists()));
 {
   CtasJobStatusHook ctasJobStatusHook = new CtasJobStatusHook(stagedTable);
    mapOperations.add(
            ctasOperation.toStagedSinkModifyOperation(
                   return createTableOperation.getTableIdentifier(),SupportsStaging.StagingPurpose
                    catalogTable,
                     ctasCatalog,
       .CREATE_TABLE_AS_IF_NOT_EXISTS;
             dynamicTableSink));
    jobStatusHookList.add(ctasJobStatusHook);
} else {
    // execute CREATE TABLE first for non-atomic CTAS statements
    executeInternal(ctasOperation.getCreateTableOperation());
    mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
}

To avoid secondary generation of DynamicTableSink, we need to construct a StagedSinkModifyOperation that inherits from SinkModifyOperation and then add the DynamicTableSink member variable.

 Atomic CTAS demo

Then implementation of the atomic CTAS operation requires only two steps :

  1. DynamicTableSink implements the interface SupportsStaged;
  2. Introduce the implementation class of the StagedTable interface.

HiveCatalog demo

HiveTableSink implements the interface SupportsStaged :

Code Block
languagejava
@Override
public StagedTable getStagedTable(
 }
                                    return SupportsStaging.StagingPurpose
                        ObjectPath tablePath, CatalogBaseTable table, SupportsStagedContext context) {

    checkNotNull(tablePath, "tablePath cannot be null");
          .CREATE_TABLE_AS;
                          checkArgument(table instanceof ResolvedCatalogBaseTable, "table should be resolved");

}
                       ResolvedCatalogBaseTable<?>     });
    CtasJobStatusHook ctasJobStatusHook = new CtasJobStatusHook(stagedTable);
    mapOperations.add(
            ctasOperation.toStagedSinkModifyOperation(
                    createTableOperation.getTableIdentifier(),
                    catalogTable,
                    ctasCatalog,
                    dynamicTableSink));
    jobStatusHookList.add(ctasJobStatusHook);
} else {
    // execute CREATE TABLE first for non-atomic CTAS statements
    executeInternal(ctasOperation.getCreateTableOperation());
    mapOperations.add(ctasOperation.toSinkModifyOperation(catalogManager));
}

To avoid secondary generation of DynamicTableSink, we need to construct a StagedSinkModifyOperation that inherits from SinkModifyOperation and then add the DynamicTableSink member variable.

Atomic CTAS demo

Then implementation of the atomic CTAS operation requires only two steps :

  1. DynamicTableSink implements the interface SupportsStaging;
  2. Introduce the implementation class of the StagedTable interface.

HiveCatalog demo

HiveTableSink implements the applyStaging API:

Code Block
languagejava
@Override
public StagedTable applyStaging(StagingContext context) {
resolvedTable = (ResolvedCatalogBaseTable<?>) table;

    Table hiveTable =
            HiveTableUtil.instantiateHiveTable(
                    tablePathidentifier.toObjectPath(),
                    resolvedTablecatalogTable,
                    HiveConfUtils.create(jobConf),
                    context.isManagedTable()false);

     HiveStagedTable hiveStagedTable =
            new HiveStagedTable(
                    hiveVersion,
                    new JobConfWrapper(jobConf),
                    hiveTable,
                    context.isIgnoreIfExists()getStagingPurpose()
                            == SupportsStaging.StagingPurpose.CREATE_TABLE_AS_IF_NOT_EXISTS);

    return hiveStagedTable;
}

...

Code Block
languagejava
/** An implementation of {@link StagedTable} for Hive to support atomic ctas. */
public class HiveStagedTable implements StagedTable {

    private static final long serialVersionUID = 1L;

    @Nullable private final String hiveVersion;
    private final JobConfWrapper jobConfWrapper;

    private final Table table;

    private final boolean ignoreIfExists;

    private transient HiveMetastoreClientWrapper client;

    //
    private FileSystemFactory fsFactory;
    private TableMetaStoreFactory msFactory;
    private boolean overwrite;
    private Path tmpPath;
    private String[] partitionColumns;
    private boolean dynamicGrouped;
    private LinkedHashMap<String, String> staticPartitions;
    private ObjectIdentifier identifier;
    private PartitionCommitPolicyFactory partitionCommitPolicyFactory;

    public HiveStagedTable(
            String hiveVersion,
            JobConfWrapper jobConfWrapper,
            Table table,
            boolean ignoreIfExists) {
        this.hiveVersion = hiveVersion;
        this.jobConfWrapper = jobConfWrapper;
        this.table = table;
        this.ignoreIfExists = ignoreIfExists;
    }

    @Override
    public void begin() {
        // init hive metastore client
        client =
                HiveMetastoreClientFactory.create(
                        HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion);
    }

    @Override
    public void commit() {
        try {
            // create table first
            client.createTable(table);

            try {
                List<PartitionCommitPolicy> policies = Collections.emptyList();
                if (partitionCommitPolicyFactory != null) {
                    policies =
                            partitionCommitPolicyFactory.createPolicyChain(
                                    Thread.currentThread().getContextClassLoader(),
                                    () -> {
                                        try {
                                            return fsFactory.create(tmpPath.toUri());
                                        } catch (IOException e) {
                                            throw new RuntimeException(e);
                                        }
                                    });
                }

                FileSystemCommitter committer =
                        new FileSystemCommitter(
                                fsFactory,
                                msFactory,
                                overwrite,
                                tmpPath,
                                partitionColumns.length,
                                false,
                                identifier,
                                staticPartitions,
                                policies);
                committer.commitPartitions();
            } catch (Exception e) {
                throw new TableException("Exception in two phase commit", e);
            } finally {
                try {
                    fsFactory.create(tmpPath.toUri()).delete(tmpPath, true);
                } catch (IOException ignore) {
                }
            }
        } catch (AlreadyExistsException alreadyExistsException) {
            if (!ignoreIfExists) {
                throw new FlinkHiveException(alreadyExistsException);
            }
        } catch (Exception e) {
            throw new FlinkHiveException(e);
        } finally {
            client.close();
        }
    }

    @Override
    public void abort() {
        // do nothing
    }

    public void setFsFactory(FileSystemFactory fsFactory) {
        this.fsFactory = fsFactory;
    }

    public void setMsFactory(TableMetaStoreFactory msFactory) {
        this.msFactory = msFactory;
    }

    public void setOverwrite(boolean overwrite) {
        this.overwrite = overwrite;
    }

    public void setTmpPath(Path tmpPath) {
        this.tmpPath = tmpPath;
    }

    public void setPartitionColumns(String[] partitionColumns) {
        this.partitionColumns = partitionColumns;
    }

    public void setDynamicGrouped(boolean dynamicGrouped) {
        this.dynamicGrouped = dynamicGrouped;
    }

    public void setStaticPartitions(LinkedHashMap<String, String> staticPartitions) {
        this.staticPartitions = staticPartitions;
    }

    public void setIdentifier(ObjectIdentifier identifier) {
        this.identifier = identifier;
    }

    public void setPartitionCommitPolicyFactory(
            PartitionCommitPolicyFactory partitionCommitPolicyFactory) {
        this.partitionCommitPolicyFactory = partitionCommitPolicyFactory;
    }

    public Table getTable() {
        return table;
    }
}

...

Jdbc Demo

...

JdbcTableSink implements

...

the applyStaging API:

Code Block
languagejava
	@Override     
    public StagedTable getStagedTable(
            ObjectPath tablePath, CatalogBaseTable table, SupportsStagedContextapplyStaging(StagingContext context) {
        ... ...         
		StagedTable stagedTable = new JdbcStagedTable(
                new ObjectPath(tablePath.getDatabaseName(), tablePath.getObjectName() + "_" + System.currentTimeMillis()),
				tablePath,
				tableSchem,
				jdbcUrl,
                jdbcUserName,
                jdbcPassword);

        return stagedTable;
    }

...

JdbcStagedTable implements the core logic

Code Block
languagejava
/** An implementation of {@link StagedTable} for Jdbc to support atomic ctas. */
public class JdbcStagedTable implements StagedTable {

    private final ObjectPath tmpTablePath;
    private final ObjectPath finalTablePath;
    private final Map<String, String> schema;
    private final String jdbcUrl;
    private final String userName;
    private final String password;

    public JdbcStagedTable(
            ObjectPath tmpTablePath,
            ObjectPath finalTablePath,
            Map<String, String> schema,
            String jdbcUrl,
            String userName,
            String password) {
        this.tmpTablePath = tmpTablePath;
        this.finalTablePath = finalTablePath;
        this.schema = schema;
        this.jdbcUrl = jdbcUrl;
        this.userName = userName;
        this.password = password;
    }

    @Override
    public void begin() {
        // create tmp table, writing data to the tmp table
        Connection connection = getConnection();
        connection
                .prepareStatement("create table " + tmpTablePath.getFullName() + "( ... ... )")
                .execute();
    }

    @Override
    public void commit() {
        // Rename the tmp table to the final table name
        Connection connection = getConnection();
        connection
                .prepareStatement(
                        "rename table "
                                + tmpTablePath.getFullName()
                                + " to "
                                + finalTablePath.getFullName())
                .execute();
    }

    @Override
    public void abort() {
        // drop tmp table
        Connection connection = getConnection();
        connection.prepareStatement("drop table " + tmpTablePath.getFullName()).execute();
    }

    private Connection getConnection() {
        // get jdbc connection
        return JDBCDriver.getConnection();
    }
}

...