This FLIP aims to introduce listener interfaces in Flink, users can implement them to report the meta data and lineage to external systems. The main information is as follows


3. Relationship between Source/Sink and jobs, such as source and sink and their column lineages.

4. Job execution status changed information, such as job status, exception.

Public Interfaces


DDL operations such as create/alter/drop tables will generate different events and notify CatalogModificationListener . All events for CatalogModificationListener extend the basic CatalogEvent CatalogModificationEvent and listeners can get catalog from it. Some general events for database/table are defined as follows and more events can be implemented based on the requirements in the future.

Code Block
 * customized listener for lineage in Flink, all lineage related listener should extend this interface. can get and
 * report specific information from the event according to the event type.
public interface CatalogModificationListener {
    void onEvent(E event);

/** The event will be fired when the database/table is modified. */
public interface LineageListenerFactory<L extends LineageListener> {
    void onEvent(CatalogModificationEvent event, CatalogModificationContext context);

 * Differentclass eventsfor willcatalog be fired when a catalog/database/table is modified. The customized listener can get and
 * report specific information from the event according to the event type.
public abstract class CatalogModificationEvent {
public abstract class CatalogModificationEvent {

/* Context for catalog modification listener and job lineage. */
public class CatalogModificationContext {
    /* The name of catalog. */
    String catalogName();

    /* TheClass basicof classthe for catalog related event. */
Class<? extends Catalog> clazz();

        /* TheIdentifier catalog offor the event. */
        Catalog catalog();
        /* The name of catalog from catalog factory, such as paimon. It will be empty for memory catalog and hive catalog. */
        Optional<String> factoryIdentifier();

    /* The basic class for database related event. */
public abstract class DatabaseModificationEvent extends CatalogModificationEvent {
        CatalogDatabase database();  

    /* Event for creating database. */
    public class CreateDatabaseEvent extends DatabaseModificationEvent {
        CatalogDatabase database();
        boolean ignoreIfExists();

    /* Event for altering database. */
    public class AlterDatabaseEvent extends DatabaseModificationEvent {
        String oldDatabaseName();
        boolean ignoreIfNotExists();

    /* Event for dropping database. */
    public class DropDatabaseEvent extends DatabaseModificationEvent {
        boolean ignoreIfExists();

     * Base table event, provides column list, primary keys, partition keys, watermarks and properties in
     * CatalogBaseTable. The table can be source or sink.
    public abstract class TableModificationEvent extends CatalogModificationEvent {
        ObjectIdentifier identifier();  
        CatalogBaseTable table();

/* Event for creating table. */
public class CreateTableEvent extends CatalogModificationEvent {
        boolean ignoreIfExists();

/* Event for altering table, provides /*all Eventchanges for creatingold table. */
    public class AlterTableEvent extends CatalogModificationEvent {
    List<TableChange> tableChanges();
    boolean ignoreIfExists();

    /* Event for dropping table. */
    public class AlterTableEvent extends BaseTableEvent {
        List<TableChange> tableChanges();
        . */
public class DropTableEvent extends CatalogModificationEvent {
    boolean ignoreIfExists();

    /* EventFactory for droppingcatalog modification tablelistener. */
    public interface CatalogModificationListenerFactory {
    CatalogModificationListener createListener(Configuration config, ClassLoader classloader);   

/* Factory for catalog modification listener. */
public interface CatalogModificationListenerFactory extends LineageListenerFactory<CatalogModificationListener> {classloader);

Users may create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1  and hive_catalog2  for the same metastore. The tables hive_catalog1.my_database.my_table  and hive_catalog2.my_database.my_table  are the same table in hive metastore.

In addition, there are two table types: persistent table  and temporal table . The persistent table  can be identified by catalog and database above, while the temporal table  can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka.

StorageIdentifier  is introduced to address these issues, listeners can get it from table events.

Code Block
/* Storage identifier for different physical table. */
public class StorageIdentifier {
    /* Storage type such as kafka, hive, iceberg or paimon. */
    String type();

    /* Properties for the storage identifier, users can get value from different keys for different storages, such as server and topic for kafka. */
    Map<String, String> properties();

Different storages put their options in the properties according to dynamic source and sink, users get option value with them too. Flink Table/SQL jobs can get options from table properties automatically, and users need to add them manually for DataStream jobs. Flink has many connectors, and we given the example of kafka options below.

Users can identify the physical connector by CatalogContext and options in CatalogBaseTable through the following process

1. Get connector name.

Users can get value of option 'connector' from options in CatalogBaseTable  for temporal tables. If it doesn't exist, users can get factory identifier from CatalogContext as connector name. If none of the above exist, users can define the connector name themselves through Class<? extends Catalog> .

2. Uses can get different properties based on connector name from table options and create connector identifier. Flink has many connectors, and we given the example of kafka options below, users can create kafka identifier with servers, group and topic as needed.

Code Block
/* Kafka storage identifier options. */
"properties.bootstrap.servers" for Kafka bootstrap servers
"topic" for Kafka Topic
"" for Kafka group id
"topic-pattern" for Kafka topic pattern
Code Block
/* Kafka storage identifier options. */
"properties.bootstrap.servers" for Kafka bootstrap servers
"topic" for Kafka Topic
"" for Kafka group id
"topic-pattern" for Kafka topic pattern

For some sensitive information, users can encode and desensitize them in their customized listeners.


Flink creates events and notify JobStatusChangedListener when status of job is changed. There are two types of job status event for the listener: JobCreatedEvent and JobExecutionStatusEvent. JobCreatedEvent will be fired when job is created, it has job lineage and the listener can create lineages for source and sink. JobExecutionStatusEvent has old and new job statuses in runtime and listener can even delete the lineages when job goes to termination.

Code Block
 * When job is created or its status is changed, Flink will generate job event and notify job status changed listener.
public interface JobStatusChangedListener {
    /** Event will be fired when job status is changed. */
    @PublicEvolving
    void onEvent(JobStatusChangedEvent event);

/** Basic job status event. */
public abstract class JobStatusChangedEvent {
        JobID jobId();
        String jobName();

    /** Job created event with job lineage. */
public class JobCreatedEvent extends JobStatusChangedEvent {
		    /* Lineage for the current job. */
        JobLineage lineage();

    	/* Job type, TableOrSQL or DataStream. */
    	String jobType();

    	/* Job execution type, BATCH or STREAMING. */
    	String executionType(); 
    	/* Job configuration. */
    	Map<String, String> config();  

    /** Job status changed event for runtime. */
  public class JobExecutionStatusEvent extends JobStatusChangedEvent {
        JobStatus oldStatus();
        JobStatus newStatus();
        @Nullable Throwable exception();

/** Factory for job status changed listener. */
public interface JobStatusChangedListenerFactory {
 LineageListenerFactory<JobStatusChangedListener> {
JobStatusChangedListener createListener(Configuration config, ClassLoader classloader);

Job lineage is divided into two layers: the first layer is global abstraction for all Flink jobs and connectors, and the second layer defines the lineages for Table/Sql and DataStream independently based on the first one. 

Code Block
 * Job lineage is built according to StreamGraph. Users can get sources, sinks and relationships from lineage.
public class JobLineage {
    /* Source lineage entity list. */
    List<SourceLineage>List<LineageEntity> sources();

    /* Sink lineage entity list. */
    List<SinkLineage>List<LineageEntity> sinks();

    /** Lineage relations from sources to sinks. */
List<LineageRelation> relations();
 List<LineageRelation>   StorageIdentifier connectorrelations();

public class LineageEntity {
    /** Config Basefor sourcethe lineage entity. */
public interface SourceLineage extends ConnectorLineage {Map<String, String> config();

/** Base Lineage relation from source to sink lineage. */
public interfaceclass SinkLineageLineageRelation extends ConnectorLineage {
    /*LineageEntity Get source();
 list for the givenLineageEntity sink. */
    List<SourceLineage> sources();

Job lineage for Table/SQL job


Code Block
/** Basic table lineage entity which has catalog context and table in it. */
public abstract class TableLineageEntity extends LineageEntity {
    /* The catalog context of the table lineage entity. */
    CatalogContext catalogContext();

    /* The table of the table lineage entity. */
    public CatalogBaseTable table();

/** Source lineage entity for table. */
public class TableSourceLineageEntity extends TableLineageEntity {
    public StorageIdentifier connector();

    /* Output columns for the source and the detailed column information such as data type are in the tableTableLineageEntity {

/** Sink lineage entity for table. */
public class TableSinkLineageEntity extends TableLineageEntity {
    /* Modify type, INSERT/UPDATE/DELETE. */
    publicString List<String> columnsmodifyType();

    /** SinkUpdate lineage for tablemode, APPEND/RETRACT/UPSERT. */
String updateMode();
    boolean overwrite(); 

/* Table lineage relations /*from The source lineagestable forto the sink table. */
public class TableLineageRelation extends LineageRelation {
    /* Table column lineage relations from source to sink. */ 

    /* Modify type, INSERT/UPDATE/DELETETable column lineage relations from source to sink. */
    List<TableColumnLineageRelation> columnRelations();
/* Column lineage from /* Update mode, APPEND/RETRACT/UPSERTsource table to sink table. */
public class TableColumnLineageRelation {
    /* Source table column. */
    booleanString overwritesourceColumn();

    /* TheSink outputtable columns for the sink table. *column. */
    String sinkColumn();

    /* The source column lineages for each target column in sink table, this will be supported by Flink in the future. */
    public Map<String, List<TableColumnLineage>> columnLineages();    

    /* Source table and columns for the target column in sink lineage. Multiple source table columns would generate one sink column. */
    public class TableColumnLineage}

Job lineage for DataStream job

The data structures of connectors in DataStream jobs are much more complex than in Table/SQL jobs, they may be tables, user customized POJO classes or even vector for ML jobs. We added setLineageEntity in DataStreamSource and DataStreamSink which allows users to define job lineage entity using TaleSourceLineageEntity and TableSinkLineageEntity if the connectors are tables, or to implement customized LineageEntity for source and sink based on their specific requirements.

Code Block
 * Add setLineageEntity method in data stream source.
public class DataStreamSource {
    private LineageEntity lineageEntity;
    public DataStreamSource setLineageEntity(LineageEntity lineageEntity);
 * Add setLineageEntity and addLineageRelations methods in data stream sink.
public class DataStreamSink {
    private LineageEntity lineageEntity;
 /* The source tablepublic forDataStreamSink column lineage. */setLineageEntity(LineageEntity lineageEntity);

    public DataStreamSink addLineageRelations(LineageRelation  public TableSourceLineage source();

        /* The columns in source lineage for the sink column. */
        public List<String> columns();

Job lineage for DataStream job


... relations);

Config Customized Listener

Use case of job lineage

1. User customized lineage for DataStream

Users can implement customized source and sink lineages for datastream job, for example, VectorSourceLineage and VectorSinkLineage for kafka source and sink as follows.

Code Block
/** User defined vector source lineage. */
public class VectorSourceLineage implements SourceLineage {
    /* The capacity of source lineage. */
    int capacity();

    /* The value type in the vector. */
    String valueType();

/** User defined vector sink lineage. */
public class VectorSinkLineage implements SinkLineage {
    List<SourceLineage> sources();
    int capacity();
    String valueType();
/* User can use vector source/sink lineages in datastream job. */
Map<String, String> kafkaSourceConf = ...;
Map<String, String> kafkaSinkConf = ...;
StreamExecutionEnvironment env = ...;

VectorSourceLineage sourceLineage = new VectorSourceLineage(new StorageIdentifier("kafka", kafkaSourceConf), 10, "int");
	.sinkTo(...).setLineage(new VectorSinkLineages(new StorageIdentifier("kafka", kafkaSinkConf), 20, "int", Arrays.asList(sourceLineage)));

After that, users can cast the source and sink lineages to vector lineage, get capacity and value type in the customized listeners.

2. Connectors identified by StorageIdentifier 

Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink SQL jobs as follows.

a) Create Kafka and Paimon tables for Flink SQL job

use vector source/sink lineages in datastream job. */
Map<String, String> kafkaSourceConf = ...;
Map<String, String> kafkaSinkConf = ...;
StreamExecutionEnvironment env = ...;
KafkaVectorLineageEntity sourceLineageEntity = new KafkaVectorLineageEntity(10, "int", kafkaSourceConf);
KafkaVectorLineageEntity sinkLineageEntity = new KafkaVectorLineageEntity(20, "double", kafkaSinkConf);
        .setLineageEntity(sourceLineageEntity) // Set source lineage entity
        .setLineageEntity(sinkLineageEntity)  // Set sink lineage entity
        .addLineageRelations(                 // Add lineage relations from sources to the current sink
            new LineageRelation(sourceLineageEntity, sinkLineageEntity);

After that, users can cast the source and sink lineage entities to vector lineage entity, get capacity and value type in the customized listeners.

2. Connectors identified by connector identifier

Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink SQL jobs as follows.

a) Create Kafka and Paimon tables for Flink SQL job

Code Block
-- Create a table my_kafka_table1 for kafka topic 'kafka_topic'
CREATE TABLE my_kafka_table1 (
  val1 STRING,
  val2 STRING,
  val3 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafka_topic',
  'properties.bootstrap.servers' = 'kafka_bootstrap_server',
  '' = 'kafka_group',
  'format' = 'json'

-- Create a Paimon catalog and table for warehouse 'paimon_path1'
CREATE CATALOG paimon_catalog WITH (
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
c) Create Mysql table for Flink SQL job

After completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by connector identifierAfter completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by StorageIdentifier. These tables are associated through Flink jobs, users can report the tables and relationships to datahub[1] as an example which is shown below


TableEnvironmentImpl creates customized CatalogModificationListener according to the option lineage.catalog-modification.listeners , and build CatalogManager with the listeners. Some other components such as Sql-Gateway can create CatalogManager with the listeners themselves. Currently all table related operations such as create/alter are in CatalogManager , but database operations are not. We can add database modification operations in CatalogManager  and notify the specified listeners for tables and databases.

Changes for JobStatusChangedListener
