Status
Discussion thread | -https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | -https://lists.apache.org/thread/5f9806kw1q01536wzwgx1psh7jhmfmjw | ||||||||
JIRA |
| ||||||||
Release | -1.18 |
Motivation
Flink ETL job consumes data from Source and produces result to Sink. Source creates relationship with Sink through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].
...
Code Block |
---|
/** * Different events will be fired when a catalog/database/table is modified. The customized listener can get and * report specific information from the event according to the event type. */ @PublicEvolving public interface CatalogModificationListener { /** The event will be fired when the database/table is modified. */ void onEvent(CatalogModificationEvent event); } /* Basic interface for catalog modification. */ @PublicEvolving public interface class CatalogModificationEvent { /* Context for the event. */ CatalogModificationContextCatalogContext context(); } /* Context for catalog modification and job lineage events. */ @PublicEvolving public interface CatalogModificationContextCatalogContext { /* The name of catalog. */ String getCatalogName(); /* Class of the catalog. */ Class<? extends Catalog> getClass(); /* Identifier for the catalog from catalog factory, such as jdbc/iceberg/paimon. */ Optional<String> getFactoryIdentifier(); /* Config for catalog. */ Configuration getConfiguration(); } /* The basic class for database related event. */ public interface DatabaseModificationEvent extends CatalogModificationEvent { CatalogDatabase database(); } /* Event for creating database. */ @PublicEvolving public interface CreateDatabaseEvent extends DatabaseModificationEvent { boolean ignoreIfExists(); } /* Event for altering database. */ @PublicEvolving public interface AlterDatabaseEvent extends DatabaseModificationEvent { CatalogDatabase newDatabase(); boolean ignoreIfNotExists(); } /* Event for dropping database. */ @PublicEvolving public interface DropDatabaseEvent extends DatabaseModificationEvent { boolean ignoreIfExists(); } /** * Base table event, provides column list, primary keys, partition keys, watermarks and properties in * CatalogBaseTable. The table can be source or sink. */ public interface TableModificationEvent extends CatalogModificationEvent { ObjectIdentifier identifier(); CatalogBaseTable table(); } /* Event for creating table. */ @PublicEvolving public interface CreateTableEvent extends CatalogModificationEvent { boolean ignoreIfExists(); } /* Event for altering table, provides all changes for old table. */ @PublicEvolving public interface AlterTableEvent extends CatalogModificationEvent { List<TableChange> tableChanges(); boolean ignoreIfExists(); } /* Event for dropping table. */ @PublicEvolving public interface DropTableEvent extends CatalogModificationEvent { boolean ignoreIfExists(); } /* Factory for catalog modification listener. */ @PublicEvolving public interface CatalogModificationListenerFactory { CatalogModificationListener createListener(Context context); @PublicEvolving public interface Context { Configuration getConfiguration(); ClassLoader getUserClassLoader(); /* * Get an Executor pool for the listener to run async operations that can potentially be IO-heavy. */ Executor getIOExecutor(); } } |
...
Users can identify the physical connector by CatalogModificationContext
CatalogContext
and options in CatalogBaseTable
through the following steps:
...
Users can get value of option 'connector' from options in CatalogBaseTable
for temporal tables. If it doesn't exist, users can get factory identifier from CatalogModification
Context CatalogContext as connector name. If none of the above exist, users can define the connector name themselves through Class<? extends Catalog>
.
...