THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
in this case use atomic semantic implementation, will improve the user experience, by the implementation of Catalog decision.At this point, the entire execution of Atomic CTAS is complete.
HiveCatalog implementation of atomic CTAS demo
Then implementation of the atomic CTAS operation requires only two steps :
- Catalog implements the createTwoPhaseCatalogTable method;
- Introduce the implementation class of the TwoPhaseCatalogTable interface.
HiveCatalog implements the createTwoPhaseCatalogTable API:
Code Block | ||
---|---|---|
| ||
@Override
public Optional<TwoPhaseCatalogTable> createTwoPhaseCatalogTable(
ObjectPath tablePath, CatalogBaseTable table, boolean ignoreIfExists, boolean isStreamingMode)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException {
if (isStreamingMode) {
//HiveCatalog does not support atomicity semantics in stream mode
return Optional.empty();
}
checkNotNull(tablePath, "tablePath cannot be null");
checkArgument(table instanceof ResolvedCatalogBaseTable, "table should be resolved");
ResolvedCatalogBaseTable<?> resolvedTable = (ResolvedCatalogBaseTable<?>) table;
if (!databaseExists(tablePath.getDatabaseName())) {
throw new DatabaseNotExistException(getName(), tablePath.getDatabaseName());
}
if (!ignoreIfExists && tableExists(tablePath)) {
throw new TableAlreadyExistException(getName(), tablePath);
}
boolean managedTable = ManagedTableListener.isManagedTable(this, resolvedTable);
Table hiveTable =
HiveTableUtil.instantiateHiveTable(
tablePath, resolvedTable, hiveConf, managedTable);
TwoPhaseCatalogTable twoPhaseCatalogTable = new HiveTwoPhaseCatalogTable(
getHiveVersion(),
new JobConfWrapper(JobConfUtils.createJobConfWithCredentials(hiveConf)),
hiveTable,
ignoreIfExists);
return Optional.of(twoPhaseCatalogTable);
} |
HiveTwoPhaseCatalogTable implements the core logic
Code Block | ||
---|---|---|
| ||
/**
* An implementation of {@link TwoPhaseCatalogTable} for Hive to
* support atomic ctas.
*/
public class HiveTwoPhaseCatalogTable implements TwoPhaseCatalogTable {
private static final long serialVersionUID = 1L;
@Nullable private final String hiveVersion;
private final JobConfWrapper jobConfWrapper;
private final Table table;
private final boolean ignoreIfExists;
private transient HiveMetastoreClientWrapper client;
public HiveTwoPhaseCatalogTable(
String hiveVersion,
JobConfWrapper jobConfWrapper,
Table table,
boolean ignoreIfExists) {
this.hiveVersion = hiveVersion;
this.jobConfWrapper = jobConfWrapper;
this.table = table;
this.ignoreIfExists = ignoreIfExists;
}
@Override
public void beginTransaction() {
// init hive metastore client
client =
HiveMetastoreClientFactory.create(
HiveConfUtils.create(jobConfWrapper.conf()), hiveVersion);
}
@Override
public void commit() {
try {
client.createTable(table);
} catch (AlreadyExistsException alreadyExistsException) {
if (!ignoreIfExists) {
throw new FlinkHiveException(alreadyExistsException);
}
} catch (Exception e) {
throw new FlinkHiveException(e);
} finally {
client.close();
}
}
@Override
public void abort() {
// do nothing
}
} |
Compatibility, Deprecation, and Migration Plan
...