Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Discussion thread
-
https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx
Vote thread
-
https://lists.apache.org/thread/5f9806kw1q01536wzwgx1psh7jhmfmjw
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-

31275

32402

Release
-
1.18

Motivation

Flink ETL job consumes data from Source Table and produces result to Sink Table. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].

This FLIP aims We would like to introduce listeners interface listener interfaces in Flink, users can implement them to report the progress of jobs and meta data and lineage to external systems. Flink SQL and Table jobs are supported in the first stage, and DataStream will be consider in the future. The main information is The main information is as follows

1. Source and Sink information, such as table name, fields, partition keys, primary keys, watermarks, configurations

2. Job information, such as job id/name, execution mode, scheduler job type, logical planjob configuration

3. Relationship between Source/Sink and jobs, such as source and sink tables for job, fields relationships in job and vertexand their column lineages.

4. Job execution status changed information, such as job status, checkpoints

Public Interfaces

CatalogEventListener

exception.

This FLIP focuses on customized meta data listener and customized job lineage listener will be introduced in FLIP-314 [4]

Public Interfaces

CatalogModificationListener

DDL operations such as CatalogEventListener listens events generated by ddl such as register catalog, create/alter/drop tables and etcwill generate different events and notify CatalogModificationListener . All events for CatalogEventListener CatalogModificationListener extend the basic interface CatalogEvent CatalogModificationEvent and listeners can get catalog from it. Some general events for catalog/ database/table are defined as follows and more events can be implemented based on the requirements in the future.

Code Block
/**
 * Different events will be fired when a catalog/database/table is changedmodified. The customized listener can receiveget theseand
 events* andreport thenspecific doinformation somefrom specificthe operationsevent according to the event type.
 */
@PublicEvolving
public interface CatalogEventListenerCatalogModificationListener {
    /** Event The event will be fired afterwhen athe catalogdatabase/table is modified. */
    void onEvent(CatalogEventCatalogModificationEvent catalogEventevent);
}

    /* TheBasic basic classinterface for catalog related eventmodification. */
    @PublicEvolving
    public interface class CatalogEventCatalogModificationEvent {
    /* Context for the event. */
    CatalogContext context();
}

/* TheContext for catalog of the eventmodification and job lineage events. */
@PublicEvolving
public interface CatalogContext   {
   Catalog catalog();
        /* The name of catalog. */
        String catalogNamegetCatalogName();
    }

    /* Class Eventof forthe catalog registration. */
    @PublicEvolving
    interface RegisterCatalogEvent Class<? extends CatalogEvent { }
 Catalog> getClass();

    /* EventIdentifier for the catalog unregistration from catalog factory, such as jdbc/iceberg/paimon. */ 
    @PublicEvolving
 Optional<String> getFactoryIdentifier();

  interface UnregisterCatalogEvent extends/* CatalogEventConfig {
for catalog. */
    Configuration   boolean ignoreIfNotExistsgetConfiguration();
    }

/* The basic class /* Event for database creationrelated event. */
public interface DatabaseModificationEvent extends CatalogModificationEvent @PublicEvolving{
    interface CreateDatabaseEvent extends CatalogEvent {
        CatalogDatabase database();
        String databaseName(); 
       CatalogDatabase database();
}

/* Event for creating database. */
@PublicEvolving
public interface CreateDatabaseEvent extends DatabaseModificationEvent {
    boolean ignoreIfExists();
    }

    /* Event for droppingaltering database. */
@PublicEvolving
public    @PublicEvolving
    interface DropDatabaseEventinterface AlterDatabaseEvent extends CatalogEvent DatabaseModificationEvent {
        String databaseNameCatalogDatabase newDatabase(); 
        boolean ignoreIfExistsignoreIfNotExists();
}

/* Event for dropping }database. */
@PublicEvolving
public interface DropDatabaseEvent extends /*DatabaseModificationEvent Base{
 table event, provides column boolean ignoreIfExists();
}

/**
 * Base table event, provides column list, primary keys, partition keys, watermarks and properties in
 * CatalogBaseTable. The table can be source or sink.
 */
public    interface BaseTableEventTableModificationEvent extends CatalogEventCatalogModificationEvent {
        ObjectIdentifierObjectIdentifier identifier();  
        CatalogBaseTableCatalogBaseTable table();
    }

    /* Event for creating table. creation. */
    @PublicEvolving
   public interface CreateTableEvent extends BaseTableEventCatalogModificationEvent {
        boolean ignoreIfExists();
    }

    /* Event for altering table, provides all informationchanges infor old table and new table. */
    @PublicEvolving
  public  interface AlterTableEvent extends BaseTableEventCatalogModificationEvent {
        List<TableChange> tableChanges();
        boolean ignoreIfExists();
    }

    /* Event for dropping table. */
@PublicEvolving
public interface DropTableEvent extends CatalogModificationEvent @PublicEvolving{
    interface DropTableEvent extends BaseTableEvent {
        boolean ignoreIfExists();   boolean ignoreIfExists();
}

/* Factory for catalog modification listener. */
@PublicEvolving
public interface CatalogModificationListenerFactory {
    CatalogModificationListener createListener(Context context);

    @PublicEvolving 
    }
}

JobListener 

There is an existing JobListener which will be notified when job is submitted. Before job submission event should be added to the listener with source/sink list in the job, then users can do their customized validation such as whether a table is written by multiple jobs. JobSubmissionEvent is created for the listener and onJobBeforeSubmitted method is added to the listener  as follows.

Code Block
@PublicEvolving
public interface JobListener {
    /* Event is fired before a job is submitted. */
    void onJobPreSubmitted(JobSubmissionEvent submissionEvent);

    /* Event for job submission. */
    @PublicEvolving
    public interface JobSubmissionEvent {
        JobID jobId();
        String jobName();
        JobLogicalPlan plan();
    }
}

JobExecutionListener

JobExecutionListener listens to the status and checkpoint for running job in JobManager .  There is JobStatusEvent which indicates the status of Flink job in JobStatus , the specific event has been defined as follows and more job status event can be added base on the requirements in the future.

In addition to the job status, the JobExecutionListener also listens for checkpoint events such as checkpoint started/completed/aborted, all checkpoint related events extend CheckpointEvent and more events can be added in the future too.

Code Block
/**
 * When job status is changed in job manager, it will generate job event and notify job execution listener.
 */
@PublicEvolving
public interface JobExecutionListener {
    /* Event fired after job status has been changed. */ 
    void onJobStatusChanged(JobStatusEvent jobStatusEvent);

    /* Event fired when a checkpoint is started/completed/aborted. */
    void onCheckpoint(CheckpointEvent checkpointEvent);

    /* Job status event with plan. */
    @PublicEvolving
    public interface JobStatusEvent {
        JobLogicalPlan plan();
        JobStatus oldStatus();
        JobStatus newStatus();
    }

    /* Event for job checkpoint. */
    @PublicEvolving
    public interface CheckpointEvent {
        /* Snapshot type, checkpoint or savepoint. */
        String snapshotType(); 
        long checkpoint();
        @Nullable String externalSavepointLocation();
        boolean isPeriodic;
        long timestamp();
        Map<String, String> config(); 
        
    }

    /* Checkpoint started/completed/aborted event. */
    @PublicEvolving
    public interface CheckpointStartedEvent extends CheckpointEvent {}
    @PublicEvolving 
    public interface CheckpointCompletedEvent extends CheckpointEvent {}
    @PublicEvolving   
    public interface CheckpointStartedEvent extends CheckpointEvent {} 
}

Job Logical Plan

There is job logical plan in the events for the listeners above. Users can get the plan to report more information about the job, such as source/sink tables in the job, column relation between source/sink tables and vertex in the job. There is JobPlanVertex  which is built on JobVertex in JobGraph and provides basic information. In addition, JobPlanVertex also require additional information, such as schema for source/sink in Table and SQL job. Table source and sink vertexes are defined based on these basic vertexes, and datastream vertexes can be defined on them in the future.

public interface Context {
        Configuration getConfiguration();
        ClassLoader getUserClassLoader();
    	/*
    	* Get an Executor pool for the listener to run async operations that can potentially be IO-heavy.
    	*/
    	Executor getIOExecutor();
     }
}

Users may create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1  and hive_catalog2  for the same metastore. The tables hive_catalog1.my_database.my_table  and hive_catalog2.my_database.my_table  are the same table in hive metastore.

In addition, there are two table types: persistent table  and temporal table . The persistent table  can be identified by catalog and database above, while the temporal table  can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka.

Users can identify the physical connector by CatalogContext and options in CatalogBaseTable through the following steps:

1. Get connector name.

Users can get value of option 'connector' from options in CatalogBaseTable  for temporal tables. If it doesn't exist, users can get factory identifier from CatalogContext as connector name. If none of the above exist, users can define the connector name themselves through Class<? extends Catalog> .

2. Uses can get different properties based on connector name from table options and create connector identifier. Flink has many connectors, and we given the example of kafka options below, users can create kafka identifier with servers, group and topic as needed.

Code Block
/* Kafka storage identifier options. */
"properties.bootstrap.servers" for Kafka bootstrap servers
"topic" for Kafka Topic
"properties.group.id" for Kafka group id
"topic-pattern" for Kafka topic pattern

For some sensitive information, users can encode and desensitize them in their customized listeners.

Config Customized Listener

Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options

Code Block
# Config for catalog modification listeners.
table.catalog-modification.listeners: {table catalog listener factory1},{table catalog listener factory2}

Proposed Changes

Use case of job lineage

Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink SQL jobs as follows.

a) Create Kafka and Paimon tables for Flink SQL job

Code Block
-- Create a table my_kafka_table1 for kafka topic 'kafka_topic'
CREATE TABLE my_kafka_table1 (
  val1 STRING,
  val2 STRING,
  val3 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafka_topic',
  'properties.bootstrap.servers' = 'kafka_bootstrap_server',
  'properties.group.id' = 'kafka_group',
  'format' = 'json'
);

-- Create a Paimon catalog and table for warehouse 'paimon_path1'
CREATE CATALOG paimon_catalog WITH (
    'type'='paimon',
    'warehouse'='paimon_path1'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (...);

-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table1 WHERE ...;

b) Create another Kafka and Paimon tables for Flink SQL job

Code Block
-- Create another table my_kafka_table2 for kafka topic 'kafka_topic' which is same as my_kafka_table1 above
CREATE TABLE my_kafka_table2 (
  val1 STRING,
  val2 STRING,
  val3 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafka_topic',
  'properties.bootstrap.servers' = 'kafka_bootstrap_server',
  'properties.group.id' = 'kafka_group',
  'format' = 'json'
);

-- Create a Paimon catalog with the same name 'paimon_catalog' for different warehouse 'paimon_path2'
CREATE CATALOG paimon_catalog WITH (
    'type'='paimon',
    'warehouse'='paimon_path2'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (...);

-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table2 WHERE ...;

c) Create Mysql table for Flink SQL job

Code Block
-- Create two catalogs for warehouse 'paimon_path1' and 'paimon_path2', there are two different tables 'my_paimon_table'
CREATE CATALOG paimon_catalog1 WITH (
    'type'='paimon',
    'warehouse'='paimon_path1'
);
CREATE CATALOG paimon_catalog2 WITH (
    'type'='paimon',
    'warehouse'='paimon_path2'
);

-- Create mysql table
CREATE TABLE mysql_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://HOST:PORT/my_database',
    'table-name' = 'my_mysql_table',
    'username' = '***',
    'password' = '***'
);

-- Insert data to mysql table from two paimon tales
INSERT INTO mysql_table 
    SELECT ... FROM paimon_catalog1.default.my_paimon_table
        JOIN paimon_catalog2.default.my_paimon_table
    ON ... WHERE ...;

After completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by connector identifier. These tables are associated through Flink jobs, users can report the tables and relationships to datahub as an example which is shown below (The job lineage will be supported in FLIP-314)

Image Added

Changes for CatalogModificationListener

TableEnvironmentImpl creates customized CatalogModificationListener according to the option lineage.catalog-modification.listeners , and build CatalogManager with the listeners. Some other components such as Sql-Gateway can create CatalogManager with the listeners themselves. Currently all table related operations such as create/alter are in CatalogManager , but database operations are not. We can add database modification operations in CatalogManager  and notify the specified listeners for tables and databases.

Code Block
/* Listeners and related operations in the catalog manager. */
public final class CatalogManager {
    private final List<CatalogModificationListener> listeners;

    /* Create catalog manager with listener list. */
    private CatalogManager(
            String defaultCatalogName,
            Catalog defaultCatalog,
            DataTypeFactory typeFactory,
            ManagedTableListener managedTableListener,
            List<CatalogModificationListener> listeners
Code Block
/**
 * Job logical plan is built according to JobGraph. Users can get sources, sinks and the relationship between nodes from plan.
 */
@PublicEvolvig
public interface JobLogicalPlan {
    JobID jobId();
    String jobName();

    /* Scheduler type such as Default/Adaptive/AdaptiveBatch. */
    String scheduler();

    /* Job execution mode, PIPELINED/PIPELINED_FORCED/BATCH/BATCH_FORCED. */
    ExecutionMode executionMode();

    /* Job type, BATCH or STREAMING. */
    String jobType();

    /* Source vertex list. */
    List<JobPlanVertex> sources();

    /* Sink vertex list. */
    List<JobPlanVertex> sinks();

    /* Get all vertex list. */
    List<JobPlanVertex> getVerticesSortedTopologicallyFromSources();

    /* Get specific vertex by id. */
    JobPlanVertex vertex(String id); 

    /* Vertex in job logical plan based on JobVertex. */
    @PublicEvolving
    public interface JobPlanVertex {
        String id();
        String name();
        String operatorName();
        String operatorDescription();
        int parallelism(); 
        String invokableClassName();
        boolean supportsConcurrentExecutionAttempts();
        List<JobPlanEdge> inputs();
    }

    /* Edge between vertexes in the logical plan. */
    @PublicEvolving
    public interface JobPlanEdge {
        JobPlanVertex source();
        JobPlanVertex target();
        String distribution();
        String shipStrategyName();
        boolean isBroadcast();
        boolean isForward();
    }
}

/* Table scan source and sink base interface, datastream source/sink vertexes can be added based on the requirements in the future. */
public interface JobPlanTableVertex extends JobPlanVertex {
    /* `catalog`.`database`.`table` for scan source. */
    ObjectIdentifier table();

    /* ForNotify Scan source, the type is Values or Table; for sink, the type is CollectSink or ModifySink. */
    String type();

    /* Table optionsthe listeners with given catalog event. */
    private void notify(CatalogModificationEvent event) {
        listeners.forEach(listener -> listener.onEvent(event));
    }

    /* Notify listener for tables. */
    public  Map<String, String> config();
void createTable/dropTable/alterTable(...) {
    /* For scan source, column list consumed by job; for sink, column list produced by job. */
    List<JobTableColumn> columns();
 ....;
        notify(Create Different Table Modification Event With Context);
    /*}

 Column with name and type in the table. */
    public interface JobTableColumn extends Serializable {
        String name();
        LogicalType type();
    }
 
    /* Table scan source vertex. */
	@PublicEvolving
	public interface JobPlanTableSourceVertex extends JobPlanTableVertex {}
 
    /* Table sink vertex. */
	@PublicEvolving
	public interface JobPlanTableSinkVertex extends JobPlanTableVertex {
        /* Modify type, INSERT/UPDATE/DELETE. */
        String modifyType();

        /* Update mode, APPEND/RETRACT/UPSERT. */
        String updateMode();
        boolean overwrite();
        Map<String, String> staticPartitions();
    } 
}

Config Customized Listener

Users should add their listeners to the classpath of client and flink cluster, and config them in the following options

Code Block
# Config catalog event listeners.
table.catalog.listeners: {job catalog listener class1},{job catalog listener class2}

# Existing config job submission listeners.
execution.job-listeners: {job submission listener class1},{job submission listener class2}

# Config job execution listeners.
jobmanager.execution.listeners: {job execution listener class1},{job execution listener class2}

Proposed Changes

Changes for JobDeploymentListener

TableEnvironmentImpl creates customized JobDeploymentListener according to the option table.job.deployment.listeners , and put the listener into CatalogManager and AbstractCatalog. TableEnvironmentImpl can receive existing listeners in constructor with CatalogManager  too, which can be used in some other classes such sql gateway. When DDL related operations are executed in CatalogManager and AbstractCatalog , they should notify the listener.

TableEnvironmentImpl will submit the job after it is created, it notifies the listener before and after the job is submitted.

Changes for JobExecutionListener

Flink sql or table jobs are created from Planner which contains exec nodes, then it is converted to Operation , Transformation and StreamGraph. Finally, the jobs are submitted as JobGraph and job managers create ExecutionGraph from it. The operations of source/sink list are as follows.

...

SourceScan in Planner contains source information such as table name, fields and configurations. But these information is hidden in the Source which is an interface when the SourceScan  is converted to Transformation. We should add source information in the conversion of SourceScan->Operation->Transformation->StreamNode.

Similar to sources, Sink and DataStreamSink contain sink information such as table names and configuration. We should add sink information in the conversion of Sink->Operation->Transformation->StreamNode, then we can add Map<JobVertexID, JobSinkVertexInfo> sources in JobGraph and ExecutionGraph too.

After completing the above changes, JobManager can create JobLogicalPlanInfo  from JobGraph  for JobExecutionListener . When the status of job is changed, DefaultExecutionGraph  in JobManager  will notify the listener. At the same time, this listener will also listen to the execution of checkpoint. When CheckpointCoordinator starts/completes/aborts a specific checkpoint, it will notify the listener too.

Plan For The Future

...

Source/Sink relationships for SQL/Table jobs are supported, DataStream  jobs will be supported later.

...

Currently we only supports scan source, lookup join source should be supported later.

...

 /* Add database ddls and notify listener for databases. */
    public void createDatabase/dropDatabase/alterDatabase(...) {
        ....;
        notify(Create Different Database Modification Event With Context); 
    }

    /* Add listeners in Builder for catalog manager. */
    public static final class Builder {
        Builder listeners(List<CatalogModificationListener> listeners);
    }
}

Listener Execution

Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.


[1] https://datahub.io/

[2] https://atlas.apache.org/#/

[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

[4]FLIP-314: Support Customized Job Lineage Listener