Status
Discussion thread | -https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | -https://lists.apache.org/thread/5f9806kw1q01536wzwgx1psh7jhmfmjw | ||||||||
JIRA |
| ||||||||
Release | -1.18 |
Motivation
Flink ETL job consumes data from Source and produces result to Sink. Source creates relationship with Sink through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].
...
This FLIP focuses on customized meta data listener and customized job lineage listener will be introduced in FLIP-314 [4]
Public Interfaces
CatalogModificationListener
DDL operations such as create/alter/drop tables will generate different events and notify CatalogModificationListener
. All events for CatalogModificationListener
extend the basic CatalogModificationEvent
and listeners can get catalog from it. Some general events for database/table are defined as follows and more events can be implemented based on the requirements in the future.
Code Block |
---|
/** * Different events will be fired when a catalog/database/table is modified. The customized listener can get and * report specific information from the event according to the event type. */ @PublicEvolving public interface CatalogModificationListener { /** The event will be fired when the database/table is modified. */ void onEvent(CatalogModificationEvent event, CatalogModificationContext context); } /* Basic classinterface for catalog modification. */ @PublicEvolving public abstractinterface class CatalogModificationEvent { } /* Context for catalogthe modification and event. */ CatalogContext context(); } /* Context for catalog modification and job lineage events. */ @PublicEvolving public classinterface CatalogModificationContextCatalogContext { /* The name of catalog. */ String catalogNamegetCatalogName(); /* Class of the catalog. */ Class<? extends Catalog> clazzgetClass(); /* Identifier for the catalog from catalog factory, such as jdbc/iceberg/paimon. */ Optional<String> factoryIdentifiergetFactoryIdentifier(); /* Config for catalog. */ Map<String, String> configConfiguration getConfiguration(); } /* The basic class for database related event. */ public abstract classinterface DatabaseModificationEvent extends CatalogModificationEvent { CatalogDatabase database(); } /* Event for creating database. */ @PublicEvolving public classinterface CreateDatabaseEvent extends DatabaseModificationEvent { boolean ignoreIfExists(); } /* Event for altering database. */ @PublicEvolving public classinterface AlterDatabaseEvent extends DatabaseModificationEvent { StringCatalogDatabase oldDatabaseNamenewDatabase(); boolean ignoreIfNotExists(); } /* Event for dropping database. */ @PublicEvolving public classinterface DropDatabaseEvent extends DatabaseModificationEvent { boolean ignoreIfExists(); } /** * Base table event, provides column list, primary keys, partition keys, watermarks and properties in * CatalogBaseTable. The table can be source or sink. */ public abstractinterface class TableModificationEvent extends CatalogModificationEvent { ObjectIdentifier identifier(); CatalogBaseTable table(); } /* Event for creating table. */ @PublicEvolving public classinterface CreateTableEvent extends CatalogModificationEvent { boolean ignoreIfExists(); } /* Event for altering table, provides all changes for old table. */ @PublicEvolving public classinterface AlterTableEvent extends CatalogModificationEvent { List<TableChange> tableChanges(); boolean ignoreIfExists(); } /* Event for dropping table. */ @PublicEvolving public classinterface DropTableEvent extends CatalogModificationEvent { boolean ignoreIfExists(); } /* Factory for catalog modification listener. */ @PublicEvolving public interface CatalogModificationListenerFactory { CatalogModificationListener createListener(Configuration config, ClassLoader classloader); /* Factory for catalog modification listener. */ @PublicEvolving public interface CatalogModificationListenerFactory { CatalogModificationListener createListener(Context context); @PublicEvolving public interface Context { Configuration getConfiguration(); ClassLoader getUserClassLoader(); /* * Get an Executor pool for the listener to run async operations that can potentially be IO-heavy. */ Executor getIOExecutor(); } } |
Users may create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1
and hive_catalog2
for the same metastore. The tables hive_catalog1.my_database.my_table
and hive_catalog2.my_database.my_table
are the same table in hive metastore.
...
For some sensitive information, users can encode and desensitize them in their customized listeners.
Config Customized Listener
Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options
Code Block |
---|
# Config for catalog modification listeners. table.catalog-modification.listeners: {table catalog listener factory1},{table catalog listener factory2} |
Proposed Changes
Use case of job lineage
Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink SQL jobs as follows.
...
After completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by connector identifier. These tables are associated through Flink jobs, users can report the tables and relationships to datahub as an example which is shown below (The job lineage will be supported in FLIP-314)
Changes for CatalogModificationListener
TableEnvironmentImpl
creates customized CatalogModificationListener
according to the option lineage.catalog-modification.listeners
, and build CatalogManager
with the listeners. Some other components such as Sql-Gateway can create CatalogManager
with the listeners themselves. Currently all table related operations such as create/alter are in CatalogManager
, but database operations are not. We can add database modification operations in CatalogManager
and notify the specified listeners for tables and databases.
Code Block |
---|
/* Listeners and related operations in the catalog manager. */ public final class CatalogManager { private final List<CatalogModificationListener> listeners; /* Create catalog manager with listener list. */ private CatalogManager( String defaultCatalogName, Catalog defaultCatalog, DataTypeFactory typeFactory, ManagedTableListener managedTableListener, List<CatalogModificationListener> listeners); /* Notify the listeners with given catalog event. */ private void notify(CatalogEventCatalogModificationEvent event) { listeners.forEach(listener -> listener.onEvent(event)); } /* Notify listener for tables. */ public void createTable/dropTable/alterTable(...) { ....; notify(Create Different Table Modification Event With Context); } /* Add database ddls and notify listener for databases. */ public void createDatabase/dropDatabase/alterDatabase(...) { ....; notify(Create Different Database Modification Event With Context); } /* Add listeners in Builder for catalog manager. */ public static final class Builder { Builder listeners(List<CatalogModificationListener> listeners); } } |
Listener Execution
Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.
...