Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Discussion thread
-
https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx
Vote thread
-
https://lists.apache.org/thread/5f9806kw1q01536wzwgx1psh7jhmfmjw
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-

31275

32402

Release
-
1.18

Motivation

Flink ETL job consumes data from Source Table and produces result to Sink Table after computation. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships to other external systems by customized listeners, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].

This FLIP aims We would like to introduce listeners for users listener interfaces in Flink, then users can implement them to report the progress of jobs and meta data to external system. Flink SQL and Table api are supported in the first stage, and DataStream will be consider in the futureand lineage to external systems. The main information is as follows

...

2. Job information, such as job id/name, execution mode, scheduler job type, logical planjob configuration

3. Relationship between Source/Sink and jobs, such as source and sink tables for job, fields relationships in job and vertexand their column lineages.

4. Job execution status changed information, such as job status changes, vertex status changes, checkpoints

Public Interfaces

JobDeploymentListener

JobDeploymentListener is used to receive events of ddl, job submission, it only supports sql/table jobs in this FLIP.

, exception.

This FLIP focuses on customized meta data listener and customized job lineage listener will be introduced in FLIP-314 [4]

Public Interfaces

CatalogModificationListener

DDL operations such as create/alter/drop tables will generate different events and notify CatalogModificationListener . All events for CatalogModificationListener extend the basic CatalogModificationEvent and listeners can get catalog from it. Some general events for database/table are defined as follows and more events can be implemented based on the requirements in the future.

Code Block
/**
 * Different events will be fired when a catalog/database/table is modified. The customized listener can get and
 * report specific information from the event according to the event type
Code Block
/**
 * Job creation listener, client will create specific event and notify this listener.
 */
@PublicEvolving
public interface JobDeploymentListenerCatalogModificationListener {
	/* Start the listener. */
	void start(Map<String, String> config);

    /* Event fired after a catalog has been registered/** The event will be fired when the database/table is modified. */
    void onRegisterCatalogonEvent(CatalogEventCatalogModificationEvent catalogEventevent);
}

/* Basic interface for void onUnregisterCatalog(UnregisterCatalogEvent catalogEvent);

    /* Event fired after a database has been createdcatalog modification. */
@PublicEvolving
public interface class CatalogModificationEvent {
    /* Context for the event. */
    CatalogContext void onCreateDatabase(DatabaseEvent databaseEventcontext();
}

    /* EventContext firedfor aftercatalog amodification databaseand hasjob beenlineage droppedevents. */
@PublicEvolving
public interface CatalogContext {
 void onDropDatabase(DatabaseEvent databaseEvent);
  
    /* EventThe firedname after a table has been createdof catalog. */
    voidString onCreateTablegetCatalogName(CreateTableEvent tableEvent);

    /* EventClass firedof after a table has been changedthe catalog. */
    void onAlterTable(AlterTableEvent tableEventClass<? extends Catalog> getClass();

    /* EventIdentifier fired after a table has been droppedfor the catalog from catalog factory, such as jdbc/iceberg/paimon. */
    voidOptional<String> onDropTablegetFactoryIdentifier(DropTableEvent tableEvent); 

    /* EventConfig fired before a job is submitted to do some validationsfor catalog. */
    Configuration void onJobPreSubmission(JobSubmissionEvent submitEventgetConfiguration(); 
}

    /* EventThe firedbasic afterclass afor jobdatabase isrelated submittedevent. */
public interface DatabaseModificationEvent extends void onJobSubmission(JobSubmissionEvent submitEvent); CatalogModificationEvent {
 
   CatalogDatabase database();
}

/* Event for catalogcreating registration, provides catalog name, default database, database list and properties in the Catalogdatabase. */
@PublicEvolving
public interface CreateDatabaseEvent extends DatabaseModificationEvent {
    boolean ignoreIfExists();
}

/* Event for altering database. */
    @PublicEvolving
public interface AlterDatabaseEvent extends interface CatalogEventDatabaseModificationEvent {
        String catalogCatalogDatabase newDatabase();
        Catalog catalogboolean ignoreIfNotExists();
    }
 
    /* Event for catalogdropping unregistrationdatabase. */ 
    @PublicEvolving
public interface DropDatabaseEvent extends interface UnregisterCatalogEventDatabaseModificationEvent {
        String catalog();
        boolean ignoreIfNotExistsignoreIfExists();
    }

    /**
 Event* forBase databasetable creationevent, provides catalog namecolumn list, primary keys, databasepartition namekeys, commentwatermarks and properties ofin
 the* databaseCatalogBaseTable. */
    @PublicEvolving
    interface DatabaseEvent The table can be source or sink.
 */
public interface TableModificationEvent extends CatalogModificationEvent {
        String catalogObjectIdentifier identifier();
        String nameCatalogBaseTable table();
        CatalogDatabase database();
    }

/* Event for creating table. */
@PublicEvolving
public interface CreateTableEvent extends CatalogModificationEvent {
    boolean ignoreIfExists();
    }

/* Event for altering table, provides /*all Eventchanges for droppingold databasetable. */
    @PublicEvolving
public interface AlterTableEvent extends interfaceCatalogModificationEvent DropDatabaseEvent {
    List<TableChange> tableChanges();
    Stringboolean catalogignoreIfExists();
}

/* Event for dropping table. */
@PublicEvolving
public interface DropTableEvent String name(); 
    extends CatalogModificationEvent {
    boolean ignoreIfExists();
    }

    /* TableFactory informationfor event,catalog providesmodification column list, primary keys, partition keys, watermarks and properties in the table. The table can be source or sink. */listener. */
@PublicEvolving
public interface CatalogModificationListenerFactory {
    CatalogModificationListener createListener(Context context);

    @PublicEvolving 
    public interface Context {
    @PublicEvolving
    interface TableEvent {Configuration getConfiguration();
        ObjectIdentifier identifier();  
        CatalogBaseTable tableClassLoader getUserClassLoader();
    }
	/*
    /	* EventGet foran tableExecutor creation. */
    @PublicEvolving
    interface CreateTableEvent extends TableEvent {pool for the listener to run async operations that can potentially be IO-heavy.
    	*/
    boolean	Executor ignoreIfExistsgetIOExecutor();
      }

    /* Event for altering table, provides all information in old table and new table. */
    @PublicEvolving
    interface AlterTableEvent extends TableEvent {
        CreateTableEvent newTable();
        boolean ignoreIfExists();
    }

    /* Event for dropping table. */
    @PublicEvolving
    interface DropTableEvent {
        ObjectIdentifier identifier();
        boolean ignoreIfExists();   
    }

    /* Event for before and after job is submitted, provides source tables and sink tables. */
    @PublicEvolving
    interface JobSubmissionEvent {
        JobID jobId();
        String jobName();
        List<TableEvent> sources();
        List<TableEvent> sinks();
    }
}

JobExecutionListener

Added JobExecutionListener listens to the status changes in the job. JobManager  creates JobEvent for each status when it changes, and notify specific method in JobExecutionListener. Users can implement different listeners according to their needs, such as Datahub listener or Atlas listener.

Code Block
/**
 * When job status is changed in job manager, it will generate job event to notify job execution listener.
 */
@PublicEvolving
public interface JobExecutionListener extends AutoCloseable {
    /* Start the listener with job config. */
    void start(Map<String, String> config) throws Exception;
  
    /* Event fired after job has been created. */ 
    void onCreated(JobCreatedEvent createdEvent);

    /* Event fired after job has beed finished. */
    void onFinished(JobFinishedEvent finishedEvent);

    /* Event fired after job has beed canceled. */
    void onCanceled(JobCanceledEvent canceledEvent);

    /* Event fired after job has beed failed. */
    void onFailed(JobFailedEvent failedEvent);

    /* Event fired after checkpoint has been started. */
    void onCheckpointStarted(CheckpointEvent checkpointEvent);

    /* Event fired after checkpoint has been completed. */
    void onCheckpointCompleted(CheckpointEvent checkpointEvent);

    /* Event fired after checkpoint has been aborted. */
    void onCheckpointAborted(CheckpointEvent checkpointEvent);

    /* Vertex in job plan, provides id, name, parallelism, input edges and output column names. */
    @PublicEvolving
    interface JobVertexInfo {
        String id();
        String name();
        String operatorName();
        String operatorDescription();
        int parallelism(); 
        String invokableClassName();
        boolean supportsConcurrentExecutionAttempts();
        List<JobEdgeInfo> inputs();
        List<String> outputColumns();
    }
 
    /* Edge in job plan, provides source/target vertex, input columns, distribution, isBroadcast and isForward. */
    @PublicEvolving 
    interface JobEdgeInfo {
        JobVertexInfo source();
        JobVertexInfo target();
        /* Input column names of the edge. */
        List<String> inputColumns();
        String distribution();
        String shipStrategyName();
        boolean isBroadcast();
        boolean isForward();
        String preProcessingOperationName();
        String operatorLevelCachingDescription();
    }

    /* Job scan source vertex, provides source table name, input table columns and source type. */ 
    @PublicEvolving 
    interface JobScanSourceVertexInfo extends JobVertexInfo {
        /* `Catalog.Database.Table` format name. */
        String sourceName();

        /* Values or Table. */
        String type();

        /* Columns from source table, detailed information such as column type is provided in {@Code JobCreationListener#onCreateTable}. */
        List<String> columns();

        /* Source config provides options in source such as source type. */
        Map<String, String> config();
    }

    /* */ 
    @PublicEvolving 
    interface JobSinkVertexInfo extends JobVertexInfo {
        /* `Catalog.Database.Table` format name. */ 
        String sinkName();

        /* CollectSink or ModifySink. */
        String type();

        /* INSERT/UPDATE/DELETE. */
        String modifyType(); 

        /* APPEND/RETRACT/UPSERT. */
        String updateMode();

        /* Columns from sink table, detailed information such as column type is provided in {@Code JobCreationListener#onCreateTable}. */ 
        List<String> columns();  
        boolean overwrite();
        Map<String, String> staticPartitions(); 
        Map<String, String> config();
    }

    /* Event for job status is changed. */
    interface JobBaseEvent {
        JobID jobId();
        String jobName();

        /* Scheduler type such as Default/Adaptive/AdaptiveBatch. */
        String scheduler();

        /* Job execution mode, PIPELINED/PIPELINED_FORCED/BATCH/BATCH_FORCED. */
        ExecutionMode executionMode();

        /* BATCH or STREAMING. */
        String jobType();
        
        /* Timestamp for current job status. */
        long timestamp();
    }

    @PublicEvolving
    interface JobLogicalPlanInfo {
        /* Source list. */
        List<JobSourceVertex> sources();

        /* Sink list. */
        List<JobSinkVertex> sinks();

        List<JobVertexInfo> getVerticesSortedTopologicallyFromSources();
        JobVertexInfo vertex(String id);  
    }

    /* Event for job is created. */
    @PublicEvolving
    interface JobCreatedEvent extends JobBaseEvent {
        JobLogicalPlanInfo plan();
    }

    /* Event for job is finished. */
    @PublicEvolving
    interface JobFinishedEvent extends JobBaseEvent { }

    /* Event for job is canceled. */
    @PublicEvolving
    interface JobCanceledEvent extends JobBaseEvent { }

    /* Event for job is failed. */
    @PublicEvolving
    interface JobFailedEvent extends JobBaseEvent {
        Throwable exception();
    }

    /* Event for job checkpoint. */
    @PublicEvolving
    interface CheckpointEvent extends JobBaseEvent {
        long checkpoint();
        String externalSavepointLocation();
        boolean isPeriodic;
        long timestamp();
        
        /* checkpoint or savepoint. */
        String snapshotType();
        Map<String, String> config();
    }
}

Config Customized Listener

Users should add their listener to the classpath of client and flink cluster, and use the following options to add listeners

Code Block
table.job.deployment.listeners: {job deployment listener class1},{job deployment listener class2}

jobmanager.execution.listeners: {job execution listener class1},{job execution listener class2}

Proposed Changes

Changes for JobDeploymentListener

TableEnvironmentImpl creates customized JobDeploymentListener according to the option table.job.deployment.listeners , and put the listener into CatalogManager and AbstractCatalog. When DDL related operations are executed in CatalogManager and AbstractCatalog , they should notify the listener.

TableEnvironmentImpl will submit the job after it is created, it notifies the listener before and after the job is submitted.

Changes for JobExecutionListener

Flink sql or table jobs are created from Planner which contains exec nodes, then it is converted to Operation , Transformation and StreamGraph. Finally, the jobs are submitted as JobGraph and job managers create ExecutionGraph from it. The operations of source/sink list are as follows.

...

SourceScan in Planner contains source information such as table name, fields and configurations. But these information is hidden in the Source which is an interface when the SourceScan  is converted to Transformation. We should add source information in the conversion of SourceScan->Operation->Transformation->StreamNode.

Similar to sources, Sink and DataStreamSink contain sink information such as table names and configuration. We should add sink information in the conversion of Sink->Operation->Transformation->StreamNode, then we can add Map<JobVertexID, JobSinkVertexInfo> sources in JobGraph and ExecutionGraph too.

After completing the above changes, JobManager can create JobLogicalPlanInfo  from JobGraph  for JobExecutionListener . When the status of job is changed, DefaultExecutionGraph  in JobManager  will notify the listener. At the same time, this listener will also listen to the execution of checkpoint. When CheckpointCoordinator starts/completes/aborts a specific checkpoint, it will notify the listener too.

Plan For The Future

...

Source/Sink relationships for SQL/Table jobs are supported, DataStream  jobs will be supported later.

...

Currently we only supports scan source, lookup join source should be supported later.

...

}

Users may create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1  and hive_catalog2  for the same metastore. The tables hive_catalog1.my_database.my_table  and hive_catalog2.my_database.my_table  are the same table in hive metastore.

In addition, there are two table types: persistent table  and temporal table . The persistent table  can be identified by catalog and database above, while the temporal table  can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka.

Users can identify the physical connector by CatalogContext and options in CatalogBaseTable through the following steps:

1. Get connector name.

Users can get value of option 'connector' from options in CatalogBaseTable  for temporal tables. If it doesn't exist, users can get factory identifier from CatalogContext as connector name. If none of the above exist, users can define the connector name themselves through Class<? extends Catalog> .

2. Uses can get different properties based on connector name from table options and create connector identifier. Flink has many connectors, and we given the example of kafka options below, users can create kafka identifier with servers, group and topic as needed.

Code Block
/* Kafka storage identifier options. */
"properties.bootstrap.servers" for Kafka bootstrap servers
"topic" for Kafka Topic
"properties.group.id" for Kafka group id
"topic-pattern" for Kafka topic pattern

For some sensitive information, users can encode and desensitize them in their customized listeners.

Config Customized Listener

Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options

Code Block
# Config for catalog modification listeners.
table.catalog-modification.listeners: {table catalog listener factory1},{table catalog listener factory2}

Proposed Changes

Use case of job lineage

Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink SQL jobs as follows.

a) Create Kafka and Paimon tables for Flink SQL job

Code Block
-- Create a table my_kafka_table1 for kafka topic 'kafka_topic'
CREATE TABLE my_kafka_table1 (
  val1 STRING,
  val2 STRING,
  val3 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafka_topic',
  'properties.bootstrap.servers' = 'kafka_bootstrap_server',
  'properties.group.id' = 'kafka_group',
  'format' = 'json'
);

-- Create a Paimon catalog and table for warehouse 'paimon_path1'
CREATE CATALOG paimon_catalog WITH (
    'type'='paimon',
    'warehouse'='paimon_path1'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (...);

-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table1 WHERE ...;

b) Create another Kafka and Paimon tables for Flink SQL job

Code Block
-- Create another table my_kafka_table2 for kafka topic 'kafka_topic' which is same as my_kafka_table1 above
CREATE TABLE my_kafka_table2 (
  val1 STRING,
  val2 STRING,
  val3 STRING
) WITH (
  'connector' = 'kafka',
  'topic' = 'kafka_topic',
  'properties.bootstrap.servers' = 'kafka_bootstrap_server',
  'properties.group.id' = 'kafka_group',
  'format' = 'json'
);

-- Create a Paimon catalog with the same name 'paimon_catalog' for different warehouse 'paimon_path2'
CREATE CATALOG paimon_catalog WITH (
    'type'='paimon',
    'warehouse'='paimon_path2'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (...);

-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table2 WHERE ...;

c) Create Mysql table for Flink SQL job

Code Block
-- Create two catalogs for warehouse 'paimon_path1' and 'paimon_path2', there are two different tables 'my_paimon_table'
CREATE CATALOG paimon_catalog1 WITH (
    'type'='paimon',
    'warehouse'='paimon_path1'
);
CREATE CATALOG paimon_catalog2 WITH (
    'type'='paimon',
    'warehouse'='paimon_path2'
);

-- Create mysql table
CREATE TABLE mysql_table (
    val1 STRING,
    val2 STRING,
    val3 STRING
) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://HOST:PORT/my_database',
    'table-name' = 'my_mysql_table',
    'username' = '***',
    'password' = '***'
);

-- Insert data to mysql table from two paimon tales
INSERT INTO mysql_table 
    SELECT ... FROM paimon_catalog1.default.my_paimon_table
        JOIN paimon_catalog2.default.my_paimon_table
    ON ... WHERE ...;

After completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by connector identifier. These tables are associated through Flink jobs, users can report the tables and relationships to datahub as an example which is shown below (The job lineage will be supported in FLIP-314)

Image Added

Changes for CatalogModificationListener

TableEnvironmentImpl creates customized CatalogModificationListener according to the option lineage.catalog-modification.listeners , and build CatalogManager with the listeners. Some other components such as Sql-Gateway can create CatalogManager with the listeners themselves. Currently all table related operations such as create/alter are in CatalogManager , but database operations are not. We can add database modification operations in CatalogManager  and notify the specified listeners for tables and databases.

Code Block
/* Listeners and related operations in the catalog manager. */
public final class CatalogManager {
    private final List<CatalogModificationListener> listeners;

    /* Create catalog manager with listener list. */
    private CatalogManager(
            String defaultCatalogName,
            Catalog defaultCatalog,
            DataTypeFactory typeFactory,
            ManagedTableListener managedTableListener,
            List<CatalogModificationListener> listeners);

    /* Notify the listeners with given catalog event. */
    private void notify(CatalogModificationEvent event) {
        listeners.forEach(listener -> listener.onEvent(event));
    }

    /* Notify listener for tables. */
    public void createTable/dropTable/alterTable(...) {
        ....;
        notify(Create Different Table Modification Event With Context);
    }

    /* Add database ddls and notify listener for databases. */
    public void createDatabase/dropDatabase/alterDatabase(...) {
        ....;
        notify(Create Different Database Modification Event With Context); 
    }

    /* Add listeners in Builder for catalog manager. */
    public static final class Builder {
        Builder listeners(List<CatalogModificationListener> listeners);
    }
}

Listener Execution

Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.


[1] https://datahub.io/

[2] https://atlas.apache.org/#/

[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

[4]FLIP-314: Support Customized Job Lineage Listener