Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This FLIP aims to introduce listener interfaces in Flink, users can implement them to report the meta data and lineage to external systems. The main information is as follows

...

3. Relationship between Source/Sink and jobs, such as source and sink and their column lineages.

4. Job execution status changed information, such as job status, exception.

Public Interfaces

CatalogModificationListener

DDL operations such as create/alter/drop tables will generate different events and notify CatalogModificationListener . All events for CatalogModificationListener extend the basic CatalogEvent CatalogModificationEvent and listeners can get catalog from it. Some general events for database/table are defined as follows and more events can be implemented based on the requirements in the future.

Code Block
/**
 * Different events will be fired when a catalog/database/table is modified. The basiccustomized listener for lineage in Flink, all lineage related listener should extend this interface. can get and
 * report specific information from the event according to the event type.
 */
@PublicEvolving
public interface LineageListener<ECatalogModificationListener extends LineageEvent> {
    void onEvent(E event);
}

/** The basic factory to create lineage listener/** The event will be fired when the database/table is modified. */
public interface LineageListenerFactory<L extends LineageListener> {
    public L createListener(Configuration configuration, ClassLoader classLoader); void onEvent(CatalogModificationEvent event, CatalogModificationContext context);
}

/**
 *Basic Differentclass eventsfor willcatalog be fired when a catalog/database/table is modified. The customized listener can get and
 * report specific information from the event according to the event type.
 */
@PublicEvolving
public interface CatalogModificationListener extends LineageListener<CatalogEvent> {modification. */
@PublicEvolving
public abstract class CatalogModificationEvent {
}

/* Context for catalog modification listener and job lineage. */
@PublicEvolving
public class CatalogModificationContext {
    /* The name of catalog. */
    String catalogName();

    /* TheClass basicof classthe for catalog related event. */
    @PublicEvolving
Class<? extends   public abstract class CatalogEvent implements LineageEvent {Catalog> clazz();

        /* TheIdentifier catalog offor the event. */
        Catalog catalog();
        /* The name of catalog from catalog factory, such as paimon. It will be empty for memory catalog and hive catalog. */
        String catalogNameOptional<String> factoryIdentifier();
    
}

    /* The basic class for database related event. */
public abstract class  public abstract class BaseDatabaseEvent DatabaseModificationEvent extends CatalogEventCatalogModificationEvent {
        String databaseNameCatalogDatabase database();  
    }

    /* Event for creating database. */
    @PublicEvolving
    public class CreateDatabaseEvent extends BaseDatabaseEventDatabaseModificationEvent {
        CatalogDatabase database();
        boolean ignoreIfExists();
    }

    /* Event for altering database. */
    @PublicEvolving
    public class AlterDatabaseEvent extends BaseDatabaseEventDatabaseModificationEvent {
        CatalogDatabase newDatabaseString oldDatabaseName();
        boolean ignoreIfNotExistsboolean ignoreIfNotExists();
    }

    /* Event for dropping database. */
    @PublicEvolving
    public class DropDatabaseEvent extends BaseDatabaseEventDatabaseModificationEvent {
        boolean ignoreIfExists();
    }

    /**
     * Base table event, provides column list, primary keys, partition keys, watermarks and properties in
     * CatalogBaseTable. The table can be source or sink.
     */
    public abstract class BaseTableEventTableModificationEvent extends CatalogEventCatalogModificationEvent {
        ObjectIdentifierObjectIdentifier identifier();  
        CatalogBaseTableCatalogBaseTable table();
}

/* Event for creating table. */
@PublicEvolving
public class CreateTableEvent /*extends Get the storage identifier for the table.*/CatalogModificationEvent {
        StorageIdentifierboolean storageignoreIfExists();
    }

/* Event for altering table, provides /*all Eventchanges for creatingold table. */
    @PublicEvolving
    public class CreateTableEventAlterTableEvent extends BaseTableEventCatalogModificationEvent {
    List<TableChange> tableChanges();
    boolean ignoreIfExists();
    }

    /* Event for alteringdropping table, provides all changes for old table. */
    @PublicEvolving
    public class AlterTableEvent extends BaseTableEvent {
        List<TableChange> tableChanges();
        . */
@PublicEvolving
public class DropTableEvent extends CatalogModificationEvent {
    boolean ignoreIfExists();
    }

    /* EventFactory for droppingcatalog modification tablelistener. */
    @PublicEvolving
    public class DropTableEvent extends BaseTableEventinterface CatalogModificationListenerFactory {
    CatalogModificationListener  createListener(Configuration config, ClassLoader  boolean ignoreIfExists();   
    }
}

/* Factory for catalog modification listener. */
@PublicEvolving
public interface CatalogModificationListenerFactory extends LineageListenerFactory<CatalogModificationListener> {classloader);
}

Users may create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1  and hive_catalog2  for the same metastore. The tables hive_catalog1.my_database.my_table  and hive_catalog2.my_database.my_table  are the same table in hive metastore.

In addition, there are two table types: persistent table  and temporal table . The persistent table  can be identified by catalog and database above, while the temporal table  can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka.

StorageIdentifier  is introduced to address these issues, listeners can get it from table events.

Code Block
/* Storage identifier for different physical table. */
@PublicEvolving
public class StorageIdentifier {
    /* Storage type such as kafka, hive, iceberg or paimon. */
    String type();

    /* Properties for the storage identifier, users can get value from different keys for different storages, such as server and topic for kafka. */
    Map<String, String> properties();
}

Different storages put their options in the properties according to dynamic source and sink, users get option value with them too. Flink Table/SQL jobs can get options from table properties automatically, and users need to add them manually for DataStream jobs. Flink has many connectors, and we given the example of kafka options below.

Users can identify the physical connector by CatalogContext and options in CatalogBaseTable through the following process

1. Get connector name.

Users can get value of option 'connector' from options in CatalogBaseTable  for temporal tables. If it doesn't exist, users can get factory identifier from CatalogContext as connector name. If none of the above exist, users can define the connector name themselves through Class<? extends Catalog> .

2. Uses can get different properties based on connector name from table options and create connector identifier. Flink has many connectors, and we given the example of kafka options below, users can create kafka identifier with servers, group and topic as needed.

Code Block
/* Kafka storage identifier options. */
"properties.bootstrap.servers" for Kafka bootstrap servers
"topic" for Kafka Topic
"properties.group.id" for Kafka group id
"topic-pattern" for Kafka topic pattern
Code Block
/* Kafka storage identifier options. */
"properties.bootstrap.servers" for Kafka bootstrap servers
"topic" for Kafka Topic
"properties.group.id" for Kafka group id
"topic-pattern" for Kafka topic pattern

For some sensitive information, users can encode and desensitize them in their customized listeners.

...

Flink creates events and notify JobStatusChangedListener when status of job is changed. There are two types of job status event for the listener: JobCreatedEvent and JobExecutionStatusEvent. JobCreatedEvent will be fired when job is created, it has job lineage and the listener can create lineages for source and sink. JobExecutionStatusEvent has old and new job statuses in runtime and listener can even delete the lineages when job goes to termination.

Code Block
/**
 * When job is created or its status is changed, Flink will generate job event and notify job status changed listener.
 */
@PublicEvolving
public interface JobStatusChangedListener extends LineageListener<JobStatusEvent> {
    /** Basic Event will be fired when job status is eventchanged. */
    public void  @PublicEvolving
    onEvent(JobStatusChangedEvent event);
}

/** Basic job status event. */
@PublicEvolving
public abstract class JobStatusEventJobStatusChangedEvent implements LineageEvent {
        JobID jobId();
        String jobName();
    }

    /** Job created event with job lineage. */
    @PublicEvolving
public    class JobCreatedEvent extends JobStatusEventJobStatusChangedEvent {
		    /* Lineage for the current job. */
        JobLineage lineage();

    	/* Job type, TableOrSQL or DataStream. */
    	String jobType();

    	/* Job execution type, BATCH or STREAMING. */
    	String executionType(); 
     
    	/* Job configuration. */
    	Map<String, String> config();  
    }

    /** Job status changed event for runtime. */
	@PublicEvolving
  public  class JobExecutionStatusEvent extends JobStatusEventJobStatusChangedEvent {
        JobStatus oldStatus();
        JobStatus newStatus();
        @Nullable Throwable exception();
    }

/** Factory for job status changed listener. */
@PublicEvolving
public interface JobStatusChangedListenerFactory extends{
 LineageListenerFactory<JobStatusChangedListener> {
}   JobStatusChangedListener createListener(Configuration config, ClassLoader classloader);
}


Job lineage is divided into two layers: the first layer is global abstraction for all Flink jobs and connectors, and the second layer defines the lineages for Table/Sql and DataStream independently based on the first one. 

Code Block
/**
 * Job lineage is built according to StreamGraph. Users can get sources, sinks and relationships from lineage.
 */
@PublicEvolvig
public interfaceclass JobLineage {
    /* Source lineage entity list. */
    List<SourceLineage>List<LineageEntity> sources();

    /* Sink lineage entity list. */
    List<SinkLineage>List<LineageEntity> sinks();
}

    /** Base connector lineage interface Lineage relations from sources to sinks. */
public interface ConnectorLineage {
 List<LineageRelation>   StorageIdentifier connectorrelations();
}

@PublicEvolving
public class LineageEntity {
    /** Config Basefor sourcethe lineage entity. */
@PublicEvolving
public interface SourceLineage extends ConnectorLineage {Map<String, String> config();
}

/** Base Lineage relation from source to sink lineage. */
@PublicEvolving
public interfaceclass SinkLineageLineageRelation extends ConnectorLineage {
    /*LineageEntity Get source();
 list for the givenLineageEntity sink. */
    List<SourceLineage> sources();
}

Job lineage for Table/SQL job

...

Code Block
/** Basic table lineage entity which has catalog context and table in it. */
public abstract class TableLineageTableLineageEntity implementsextends ConnectorLineageLineageEntity {
    /* The catalog context of the table lineage entity. */
    public CatalogCatalogContext catalogcatalogContext();

    /* The table of the table lineage entity. */
    public CatalogBaseTable table();
}

/** Source lineage entity for table. */
@PublicEvolving
public class TableSourceLineageTableSourceLineageEntity extends TableLineage implements SourceLineage {
    @Override
    public StorageIdentifier connector();

    /* Output columns for the source and the detailed column information such as data type are in the tableTableLineageEntity {
}

/** Sink lineage entity for table. */
@PublicEvolving
public class TableSinkLineageEntity extends TableLineageEntity {
    /* Modify type, INSERT/UPDATE/DELETE. */
    publicString List<String> columnsmodifyType();
}

 
    /** SinkUpdate lineage for tablemode, APPEND/RETRACT/UPSERT. */
@PublicEvolving
public class TableSinkLineage extends TableLineage implements SinkLineage {String updateMode();
    @Override
    public StorageIdentifier connectorboolean overwrite(); 
}

/* Table lineage relations /*from The source lineagestable forto the sink table. */
@PublicEvolving
public class TableLineageRelation extends @OverrideLineageRelation {
    public/* List<TableSourceLineage> sources(); 

    /* Modify type, INSERT/UPDATE/DELETETable column lineage relations from source to sink. */
    StringList<TableColumnLineageRelation> modifyTypecolumnRelations();
}
 
/* Column lineage from /* Update mode, APPEND/RETRACT/UPSERTsource table to sink table. */
@PublicEvolving
public class TableColumnLineageRelation {
    String updateMode();/* Source table column. */
    booleanString overwritesourceColumn();

    /* TheSink outputtable columns for the sink table. *column. */
    publicString List<String> columnssinkColumn();

    /* The source column lineages for each target column in sink table, this will be supported by Flink in the future. */
    public Map<String, List<TableColumnLineage>> columnLineages();    

    /* Source table and columns for the target column in sink lineage. Multiple source table columns would generate one sink column. */
	@PublicEvolving
    public class TableColumnLineage}

Job lineage for DataStream job

The data structures of connectors in DataStream jobs are much more complex than in Table/SQL jobs, they may be tables, user customized POJO classes or even vector for ML jobs. We added setLineageEntity in DataStreamSource and DataStreamSink which allows users to define job lineage entity using TaleSourceLineageEntity and TableSinkLineageEntity if the connectors are tables, or to implement customized LineageEntity for source and sink based on their specific requirements.

Code Block
/**
 * Add setLineageEntity method in data stream source.
 */
@Public
public class DataStreamSource {
    private LineageEntity lineageEntity;
 
    public DataStreamSource setLineageEntity(LineageEntity lineageEntity);
}
 
/**
 * Add setLineageEntity and addLineageRelations methods in data stream sink.
 */
@Public
public class DataStreamSink {
    private LineageEntity lineageEntity;
 
 /* The source tablepublic forDataStreamSink column lineage. */setLineageEntity(LineageEntity lineageEntity);

    public DataStreamSink addLineageRelations(LineageRelation  public TableSourceLineage source();

        /* The columns in source lineage for the sink column. */
        public List<String> columns();
    }
}

Job lineage for DataStream job

...

... relations);
}

Config Customized Listener

Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options

Code Block
# Config for catalog modification listeners.
lineage.catalog-modification.listeners: {job catalog listener factory1},{job catalog listener factory2}

# Config for job status changed listeners.
lineage.job-status-changed.listeners: {job status changed listener factory1},{job status changed listener factory2}

Proposed Changes

Use case of job lineage

1. User customized lineage for DataStream

Users can implement customized source and sink lineages for datastream job, for example, KafkaVectorLineageEntity for kafka source and sink as follows.

Code Block
/**
 *User Adddefined setLineagevector methodsource inand datasink streamlineage sourceentity.
 */
@Public
public class KafkaVectorLineageEntity extends DataStreamSourceLineageEntity {
    private SourceLineage lineage;
 /* The capacity of source lineage. */
    publicint DataStreamSource setLineage(SourceLineage lineagecapacity();
}
    
/**
 *The Addvalue setLineage methodtype in datathe stream sinkvector.
 */
@Public
public class DataStreamSink {
    private SinkLineage lineage;String valueType();
}
 
/* User can  public DataStreamSink setLineage(SinkLineage lineage);
}

Config Customized Listener

Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options

Code Block
# Config for catalog modification listeners.
lineage.catalog-modification.listeners: {job catalog listener factory1},{job catalog listener factory2}

# Config for job status changed listeners.
lineage.job-status-changed.listeners: {job status changed listener factory1},{job status changed listener factory2}

Proposed Changes

Use case of job lineage

1. User customized lineage for DataStream

Users can implement customized source and sink lineages for datastream job, for example, VectorSourceLineage and VectorSinkLineage for kafka source and sink as follows.

Code Block
/** User defined vector source lineage. */
public class VectorSourceLineage implements SourceLineage {
    /* The capacity of source lineage. */
    int capacity();

    /* The value type in the vector. */
    String valueType();
}

/** User defined vector sink lineage. */
public class VectorSinkLineage implements SinkLineage {
    List<SourceLineage> sources();
    int capacity();
    String valueType();
}
 
/* User can use vector source/sink lineages in datastream job. */
Map<String, String> kafkaSourceConf = ...;
Map<String, String> kafkaSinkConf = ...;
StreamExecutionEnvironment env = ...;

VectorSourceLineage sourceLineage = new VectorSourceLineage(new StorageIdentifier("kafka", kafkaSourceConf), 10, "int");
env.fromSource(...).setLineage(sourceLineage)
	.map(...).keyBy(..).reduce(..)...
	.sinkTo(...).setLineage(new VectorSinkLineages(new StorageIdentifier("kafka", kafkaSinkConf), 20, "int", Arrays.asList(sourceLineage)));
env.execute();

After that, users can cast the source and sink lineages to vector lineage, get capacity and value type in the customized listeners.

2. Connectors identified by StorageIdentifier 

Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink SQL jobs as follows.

a) Create Kafka and Paimon tables for Flink SQL job

use vector source/sink lineages in datastream job. */
Map<String, String> kafkaSourceConf = ...;
Map<String, String> kafkaSinkConf = ...;
StreamExecutionEnvironment env = ...;
KafkaVectorLineageEntity sourceLineageEntity = new KafkaVectorLineageEntity(10, "int", kafkaSourceConf);
KafkaVectorLineageEntity sinkLineageEntity = new KafkaVectorLineageEntity(20, "double", kafkaSinkConf);
env.fromSource(...)
        .setLineageEntity(sourceLineageEntity) // Set source lineage entity
	.map(...).keyBy(..).reduce(..)...
	.sinkTo(...)
        .setLineageEntity(sinkLineageEntity)  // Set sink lineage entity
        .addLineageRelations(                 // Add lineage relations from sources to the current sink
            new LineageRelation(sourceLineageEntity, sinkLineageEntity);
env.execute();

After that, users can cast the source and sink lineage entities to vector lineage entity, get capacity and value type in the customized listeners.

2. Connectors identified by connector identifier

Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink SQL jobs as follows.

a) Create Kafka and Paimon tables for Flink SQL job

Code Block
-- Create a table my_kafka_table1 for kafka topic 'kafka_topic'
CREATE TABLE my_kafka_table1 (
  val1 STRING,
  val2 STRING,
  val3 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafka_topic',
  'properties.bootstrap.servers' = 'kafka_bootstrap_server',
  'properties.group.id' = 'kafka_group',
  'format' = 'json'
);

-- Create a Paimon catalog and table for warehouse 'paimon_path1'
CREATE CATALOG paimon_catalog WITH (
    'type'='paimon',
    'warehouse'='paimon_path1'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
  
Code Block
-- Create a table my_kafka_table1 for kafka topic 'kafka_topic'
CREATE TABLE my_kafka_table1 (
  val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (...);

-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table1 WHERE ...;

b) Create another Kafka and Paimon tables for Flink SQL job

Code Block
'connector' = 'kafka',
  'topic' = 'kafka_topic',
  'properties.bootstrap.servers' = 'kafka_bootstrap_server',
  'properties.group.id' = 'kafka_group',
  'format' = 'json'
);

-- Create aanother Paimon catalog and table for warehouse 'paimon_path1'
CREATE CATALOG paimon_catalog WITH table my_kafka_table2 for kafka topic 'kafka_topic' which is same as my_kafka_table1 above
CREATE TABLE my_kafka_table2 (
  val1  'type'='paimon'STRING,
  val2 STRING,
  val3 STRING
)  'warehouse'='paimon_path1'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (...);

-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table1 WHERE ...;

b) Create another Kafka and Paimon tables for Flink SQL job

Code Block
-- Create another table my_kafka_table2 for kafka topic 'kafka_topic' which is same as my_kafka_table1 aboveWITH (
  'connector' = 'kafka',
  'topic' = 'kafka_topic',
  'properties.bootstrap.servers' = 'kafka_bootstrap_server',
  'properties.group.id' = 'kafka_group',
  'format' = 'json'
);

-- Create a Paimon catalog with the same name 'paimon_catalog' for different warehouse 'paimon_path2'
CREATE CATALOG paimon_catalog WITH (
    'type'='paimon',
    'warehouse'='paimon_path2'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_kafkapaimon_table2table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (...);

-- Insert 'connector'data = 'kafka',
  'topic' = 'kafka_topic',
  'properties.bootstrap.servers' = 'kafka_bootstrap_server',
  'properties.group.id' = 'kafka_group',
  'format' = 'json'
);

-- Create a Paimon catalog with the same name 'paimon_catalog' for different warehouse 'paimon_path2to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table2 WHERE ...;

c) Create Mysql table for Flink SQL job

Code Block
-- Create two catalogs for warehouse 'paimon_path1' and 'paimon_path2', there are two different tables 'my_paimon_table'
CREATE CATALOG paimon_catalogcatalog1 WITH (
    'type'='paimon',
    'warehouse'='paimon_path2path1'
);
USECREATE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table catalog2 WITH (
    val1 STRING'type'='paimon',
    val2 STRING'warehouse'='paimon_path2'
);

-- Create mysql table
CREATE TABLE mysql_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (...);

-- Insert data to Paimon'connector' table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table2 WHERE ...;

c) Create Mysql table for Flink SQL job

Code Block
-- Create two catalogs for warehouse 'paimon_path1' and 'paimon_path2', there are two different tables 'my_paimon_table'
CREATE CATALOG paimon_catalog1 WITH (
    'type'='paimon',
    'warehouse'='paimon_path1'
);
CREATE CATALOG paimon_catalog2 WITH (
    'type'='paimon',
    'warehouse'='paimon_path2'
);

-- Create mysql table
CREATE TABLE mysql_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://HOST:PORT/my_database',
    'table-name' = 'my_mysql_table',
    'username' = '***',
    'password' = '***'
);

-- Insert data to mysql table from two paimon tales
INSERT INTO mysql_table 
    SELECT ... FROM paimon_catalog1.default.my_paimon_table
        JOIN paimon_catalog2.default.my_paimon_table
    ON ... WHERE ...;
= 'jdbc',
    'url' = 'jdbc:mysql://HOST:PORT/my_database',
    'table-name' = 'my_mysql_table',
    'username' = '***',
    'password' = '***'
);

-- Insert data to mysql table from two paimon tales
INSERT INTO mysql_table 
    SELECT ... FROM paimon_catalog1.default.my_paimon_table
        JOIN paimon_catalog2.default.my_paimon_table
    ON ... WHERE ...;


After completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by connector identifierAfter completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by StorageIdentifier. These tables are associated through Flink jobs, users can report the tables and relationships to datahub[1] as an example which is shown below

...

TableEnvironmentImpl creates customized CatalogModificationListener according to the option lineage.catalog-modification.listeners , and build CatalogManager with the listeners. Some other components such as Sql-Gateway can create CatalogManager with the listeners themselves. The database related operations are in Catalog , then the listeners are added in AbstractCatalog  and users can notify them for database related operations in their customized catalog.the option lineage.catalog-modification.listeners , and build CatalogManager with the listeners. Some other components such as Sql-Gateway can create CatalogManager with the listeners themselves. Currently all table related operations such as create/alter are in CatalogManager , but database operations are not. We can add database modification operations in CatalogManager  and notify the specified listeners for tables and databases.

Code Block
/* Listeners and related operations in the catalog manager. */
public final class CatalogManager {
    private final List<CatalogModificationListener> listeners;

    /* Create catalog manager with listener list. */
    private CatalogManager(
            String defaultCatalogName,
   
Code Block
/* Listeners and related operations in the catalog manager. */
public final class CatalogManager {
    private final List<CatalogModificationListener> listeners;

    /* Create catalog manager with listener list. */
    private CatalogManager(
            String defaultCatalogName,
            Catalog defaultCatalog,
            DataTypeFactory typeFactory,
            ManagedTableListener managedTableListener,
            List<CatalogModificationListener> listeners);

    /* Notify the listeners with given catalog event. */
    private void notify(CatalogEvent event) {
        listeners.forEach(listener -> listener.onEvent(event)); Catalog defaultCatalog,
    }

    /* Notify listener for tables. */DataTypeFactory typeFactory,
    public void createTable/dropTable/alterTable(...) {
        ....;ManagedTableListener managedTableListener,
        notify(Create   Different TableList<CatalogModificationListener> Eventlisteners);
    }

    /* AddNotify the listeners inwith Buildergiven for catalog managerevent. */
    publicprivate staticvoid final class Buildernotify(CatalogEvent event) {
        Builder listeners(List<CatalogModificationListener> listeners.forEach(listener -> listener.onEvent(event));
    }
}

    /* ListenersNotify andlistener related operations in AbstractCatalogfor tables. */
public abstract  class public AbstractCatalog implements Catalog {
 void createTable/dropTable/alterTable(...) {
   private final List<CatalogModificationListener> listeners;

    /* Create the catalog with listeners.... */
;
       public AbstractCatalognotify(StringCreate name,Different StringTable defaultDatabase, List<CatalogModificationListener> listenersModification Event); 

    /**}

     /* Notify the listeners with given Add database event,ddls afterand thenotify customizedlistener implementationfor of AbstractCatalog create/alter/drop a database,databases. */
    public * it can create the specific event and call the notify method.void createDatabase/dropDatabase/alterDatabase(...) {
        ....;
     */
   notify(Create protectedDifferent voidDatabase notify(BaseDatabaseEventModification eventEvent); {
    }

    for/* (CatalogModificationListenerAdd listenerlisteners :in listeners)Builder {
for catalog manager. */
    public static final class  listener.onEvent(event);Builder {
        }Builder listeners(List<CatalogModificationListener> listeners);
    }
}

Changes for JobStatusChangedListener

...