...
Code Block |
---|
/** * Different events will be fired when a catalog/database/table is modified. The customized listener can get and * report specific information from the event according to the event type. */ @PublicEvolving public interface CatalogModificationListener { /** The event will be fired when the database/table is modified. */ void onEvent(CatalogModificationEvent event, CatalogModificationContext context); } /* Basic class for catalog modification. */ @PublicEvolving public abstract class CatalogModificationEvent { } /* Context for catalog modification listener and job lineage events. */ @PublicEvolving public class CatalogModificationContext { /* The name of catalog. */ String catalogName(); /* Class of the catalog. */ Class<? extends Catalog> clazz(); /* Identifier for the catalog from catalog factory, such as paimon. It will be empty for memory catalog and hive catalogjdbc/iceberg/paimon. */ Optional<String> factoryIdentifier(); } /* The basic class for database related event. */ public abstract class DatabaseModificationEvent extends CatalogModificationEvent { CatalogDatabase database(); } /* Event for creating database. */ @PublicEvolving public class CreateDatabaseEvent extends DatabaseModificationEvent { boolean ignoreIfExists(); } /* Event for altering database. */ @PublicEvolving public class AlterDatabaseEvent extends DatabaseModificationEvent { String oldDatabaseName(); boolean ignoreIfNotExists(); } /* Event for dropping database. */ @PublicEvolving public class DropDatabaseEvent extends DatabaseModificationEvent { boolean ignoreIfExists(); } /** * Base table event, provides column list, primary keys, partition keys, watermarks and properties in * CatalogBaseTable. The table can be source or sink. */ public abstract class TableModificationEvent extends CatalogModificationEvent { ObjectIdentifier identifier(); CatalogBaseTable table(); } /* Event for creating table. */ @PublicEvolving public class CreateTableEvent extends CatalogModificationEvent { boolean ignoreIfExists(); } /* Event for altering table, provides all changes for old table. */ @PublicEvolving public class AlterTableEvent extends CatalogModificationEvent { List<TableChange> tableChanges(); boolean ignoreIfExists(); } /* Event for dropping table. */ @PublicEvolving public class DropTableEvent extends CatalogModificationEvent { boolean ignoreIfExists(); } /* Factory for catalog modification listener. */ @PublicEvolving public interface CatalogModificationListenerFactory { CatalogModificationListener createListener(Configuration config, ClassLoader classloader); } |
...
Users can identify the physical connector by CatalogContext
and options in CatalogBaseTable
through the following processsteps:
1. Get connector name.
Users can get value of option 'connector' from options in CatalogBaseTable
for temporal tables. If it doesn't exist, users can get factory identifier from CatalogContext as connector name. If none of the above exist, users can define the connector name themselves through Class<? extends Catalog>
.
...
For Table/SQL jobs, Flink creates table lineages according to tables for source and sink. There're column lineages in table lineages, and Flink jobs can create the dependencies between source and sink columns. Flink creates these lineages for Table/SQL jobs from job plannerplan, the entire processing has nothing to do with users.
...
Code Block |
---|
/** * Add setLineageEntity method in data stream source. */ @Public public class DataStreamSource { private LineageEntity lineageEntity; public DataStreamSource setLineageEntity(LineageEntity lineageEntity); } /** * Add setLineageEntity and addLineageRelations methods in data stream sink. */ @Public public class DataStreamSink { private LineageEntity lineageEntity; public DataStreamSink setLineageEntity(LineageEntity lineageEntity); /* Add lineage relations for data stream jobs. */ public DataStreamSink addLineageRelations(LineageRelation ... relations); } |
...