Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

in this case use atomic semantic implementation, will improve the user experience, by the implementation of Catalog decision.At this point, the entire execution of Atomic CTAS is complete. 

HiveCatalog implementation of atomic CTAS demo

Then implementation of the atomic CTAS operation requires only two steps :

  1. Catalog implements the createTwoPhaseCatalogTable method;
  2. Introduce the implementation class of the TwoPhaseCatalogTable interface.

HiveCatalog implements the createTwoPhaseCatalogTable API:

Code Block
languagejava
	@Override
    public Optional<TwoPhaseCatalogTable> createTwoPhaseCatalogTable(
            ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists, boolean isStreamingMode)
            throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {

        if (isStreamingMode) {
            //HiveCatalog does not support atomicity semantics in stream mode
            return Optional.empty();
        }

        checkNotNull(tablePath, "tablePath cannot be null");
        checkArgument(table instanceof ResolvedCatalogBaseTable, "table should be resolved");

        ResolvedCatalogBaseTable<?> resolvedTable = (ResolvedCatalogBaseTable<?>) table;
        if (!databaseExists(tablePath.getDatabaseName())) {
            throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
        }
        if (!ignoreIfExists && tableExists(tablePath)) {
            throw new TableAlreadyExistException(getName(), tablePath);
        }

        boolean managedTable = ManagedTableListener.isManagedTable(this, resolvedTable);

        Table hiveTable =
                HiveTableUtil.instantiateHiveTable(
                        tablePath, resolvedTable, hiveConf, managedTable);

        TwoPhaseCatalogTable twoPhaseCatalogTable = new HiveTwoPhaseCatalogTable(
                getHiveVersion(),
                new JobConfWrapper(JobConfUtils.createJobConfWithCredentials(hiveConf)),
                hiveTable,
                ignoreIfExists);

        return Optional.of(twoPhaseCatalogTable);
    }

HiveTwoPhaseCatalogTable implements the core logic

Code Block
languagejava
/**
 * An implementation of {@link TwoPhaseCatalogTable} for Hive to
 * support atomic ctas.
 */
public class HiveTwoPhaseCatalogTable implements TwoPhaseCatalogTable {

    private static final long serialVersionUID = 1L;

    @Nullable private final String hiveVersion;
    private final JobConfWrapper jobConfWrapper;

    private final Table table;
    private final boolean ignoreIfExists;

    private transient HiveMetastoreClientWrapper client;

    public HiveTwoPhaseCatalogTable(
            String hiveVersion,
            JobConfWrapper jobConfWrapper,
            Table table,
            boolean ignoreIfExists) {
        this.hiveVersion = hiveVersion;
        this.jobConfWrapper = jobConfWrapper;
        this.table = table;
        this.ignoreIfExists = ignoreIfExists;
    }

    @Override
    public void beginTransaction() {
        // init hive metastore client
        client =
                HiveMetastoreClientFactory.create(
                        HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion);
    }

    @Override
    public void commit() {
        try {
            client.createTable(table);
        } catch (AlreadyExistsException alreadyExistsException) {
            if (!ignoreIfExists) {
                throw new FlinkHiveException(alreadyExistsException);
            }
        } catch (Exception e) {
            throw new FlinkHiveException(e);
        } finally {
            client.close();
        }
    }

    @Override
    public void abort() {
        // do nothing
    }
}


Compatibility, Deprecation, and Migration Plan

...