THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
This section will describe how other systems can integrate to support TimeTravel.
Taking the support of Paimon for Timetravel as an example, when implementing the getTable(ObjectPath tablepath, long timestamp) method, we will specify the corresponding consumption timestamp.
Code Block | ||
---|---|---|
| ||
/** Catalog for paimon. */ public class FlinkCatalog extends AbstractCatalog { ... @Override public CatalogTable getTable(ObjectPath tablePath) throws TableNotExistException, CatalogException { Table table; try { table = catalog.getTable(toIdentifier(tablePath)); } catch (Catalog.TableNotExistException e) { throw new TableNotExistException(getName(), tablePath); } if (table instanceof FileStoreTable) { return toCatalogTable(table); } else { return new SystemCatalogTable(table); } } @Override public CatalogTable getTable(ObjectPath tablePath, long timestamp) throws TableNotExistException, CatalogException { // Due to Paimon's Catalog not providing an CatalogTableinterface catalogTablefor = this.getTable(tablePath); Options option = new Options() historical Schema, // we can only obtain the latest Schema at present. CatalogTable catalogTable = this.getTable(tablePath); // Attach the specified consumption time parameter. Options option = new .set(CoreOptions.SCAN_TIMESTAMP_MILLIS, timestampOptions() .set(CoreOptions.SCAN_TIMESTAMP_MODEMILLIS, CoreOptions.StartupMode.FROM_TIMESTAMPtimestamp); return catalogTable.copy(option.toMap()); } .... } |
...