Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Discussion thread-
Vote thread-
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-31275

Release-

Motivation

Flink ETL job consumes data from Source Table and produces result to Sink Table. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].

This FLIP aims to introduce listeners interface in Flink, users can implement them to report the progress of jobs and meta data to external systems. Flink SQL and Table jobs are supported in the first stage, and DataStream will be consider in the future. The main information is as follows

1. Source and Sink information, such as table name, fields, partition keys, primary keys, watermarks, configurations

2. Job information, such as job id/name, execution mode, scheduler type, logical plan

3. Relationship between Source/Sink and jobs, such as source and sink tables for job, fields relationships in job and vertex

4. Job execution information, such as job status, checkpoints

Public Interfaces

CatalogEventListener

CatalogEventListener listens events generated by ddl such as register catalog, create/alter/drop tables and etc. All events for CatalogEventListener extend the basic interface CatalogEvent and listeners can get catalog from it. Some general events for catalog/database/table are defined as follows and more events can be implemented based on the requirements in the future.

Code Block
/**
 * Different events will be fired when a catalog/database/table is changed. The customized listener can receive these events and then do some specific operations according to the event type.
 */
@PublicEvolving
public interface CatalogEventListener {
    /* Event fired after a catalog is modified. */
    void onEvent(CatalogEvent catalogEvent);

    /* The basic class for catalog related event. */
    @PublicEvolving
    public interface CatalogEvent {
        /* The catalog of the event. */
        Catalog catalog();
        /* The name of catalog. */
        String catalogName();
    }

    /* Event for catalog registration. */
    @PublicEvolving
    interface RegisterCatalogEvent extends CatalogEvent { }
 
    /* Event for catalog unregistration. */ 
    @PublicEvolving
    interface UnregisterCatalogEvent extends CatalogEvent {
        boolean ignoreIfNotExists();
    }

    /* Event for database creation. */
    @PublicEvolving
    interface CreateDatabaseEvent extends CatalogEvent {
        CatalogDatabase database();
        String databaseName(); 
        boolean ignoreIfExists();
    }

    /* Event for dropping database. */
    @PublicEvolving
    interface DropDatabaseEvent extends CatalogEvent  {
        String databaseName(); 
        boolean ignoreIfExists();
    }

    /* Base table event, provides column list, primary keys, partition keys, watermarks and properties in CatalogBaseTable. The table can be source or sink. */
    interface BaseTableEvent extends CatalogEvent {
        ObjectIdentifier identifier();  
        CatalogBaseTable table();
    }

    /* Event for table creation. */
    @PublicEvolving
    interface CreateTableEvent extends BaseTableEvent {
        boolean ignoreIfExists();
    }

    /* Event for altering table, provides all information in old table and new table. */
    @PublicEvolving
    interface AlterTableEvent extends BaseTableEvent {
        List<TableChange> tableChanges();
        boolean ignoreIfExists();
    }

    /* Event for dropping table. */
    @PublicEvolving
    interface DropTableEvent extends BaseTableEvent {
        boolean ignoreIfExists();   
    }
}

JobSubmissionListener

JobSubmissionListener will be notified before and after a job is submitted by client. There is JobSubmissionEvent with job id, name and logical plan for the listener. Users can get source/sink list from the plan and do their customized validation such as whether a table is written by multiple jobs.

Code Block
/**
 * Job submission listener will be notified before and after a job is submitted. Users can get job id, job name, source list and sink list from the submission event.
 */
@PublicEvolving
public interface JobSubmissionListener {
    /* Event is fired before and after a job is submitted. */
    void onEvent(JobSubmissionEvent submissionEvent);

    /* Event for job submission. */
    @PublicEvolving
    public interface JobSubmissionEvent {
        JobID jobId();
        String jobName();
        JobLogicalPlan plan();
    }

    /* Event before job is submitted. */
    @PublicEvolving
    public interface BeforeJobSubmissionEvent extends JobSubmissionEvent {
    }

    /* Event after job is submitted successful. */
    @PublicEvolving
    public interface SuccessJobSubmissionEvent extends JobSubmissionEvent {
    }

    /* Event after job is submitted failed. */
    @PublicEvolving
    public interface FailedJobSubmissionEvent extends JobSubmissionEvent {
        Throwable exception();
    }
}

JobExecutionListener

JobExecutionListener listens to the status and checkpoint for running job in JobManager .  There is JobStatusEvent which indicates the status of Flink job in JobStatus , the specific event has been defined as follows and more job status event can be added base on the requirements in the future.

In addition to the job status, the JobExecutionListener also listens for checkpoint events such as checkpoint started/completed/aborted, all checkpoint related events extend CheckpointEvent and more events can be added in the future too.

Code Block
/**
 * When job status is changed in job manager, it will generate job event and notify job execution listener.
 */
@PublicEvolving
public interface JobExecutionListener {
    /* Event fired after job status has been changed. */ 
    void onJobStatusChanged(JobStatusEvent jobStatusEvent);

    /* Event fired when a checkpoint is started/completed/aborted. */
    void onCheckpoint(CheckpointEvent checkpointEvent);

    /* Job status event with plan. */
    @PublicEvolving
    public interface JobStatusEvent {
        JobLogicalPlan plan();
        JobStatus oldStatus();
        JobStatus newStatus();
    }

    /* Event for job checkpoint. */
    @PublicEvolving
    public interface CheckpointEvent {
        /* Snapshot type, checkpoint or savepoint. */
        String snapshotType(); 
        long checkpoint();
        @Nullable String externalSavepointLocation();
        boolean isPeriodic;
        long timestamp();
        Map<String, String> config(); 
        
    }

    /* Checkpoint started/completed/aborted event. */
    @PublicEvolving
    public interface CheckpointStartedEvent extends CheckpointEvent {}
    @PublicEvolving 
    public interface CheckpointCompletedEvent extends CheckpointEvent {}
    @PublicEvolving   
    public interface CheckpointStartedEvent extends CheckpointEvent {} 
}

Job Logical Plan

There is job logical plan in the events for the listeners above. Users can get the plan to report more information about the job, such as source/sink tables in the job, column relation between source/sink tables and vertex in the job. There is JobPlanVertex  which is built on JobVertex in JobGraph and provides basic information. In addition, JobPlanVertex also require additional information, such as schema for source/sink in Table and SQL job. Table source and sink vertexes are defined based on these basic vertexes, and datastream vertexes can be defined on them in the future.

Code Block
/**
 * Job logical plan is built according to JobGraph. Users can get sources, sinks and the relationship between nodes from plan.
 */
@PublicEvolvig
public interface JobLogicalPlan {
    JobID jobId();
    String jobName();

    /* Scheduler type such as Default/Adaptive/AdaptiveBatch. */
    String scheduler();

    /* Job execution mode, PIPELINED/PIPELINED_FORCED/BATCH/BATCH_FORCED. */
    ExecutionMode executionMode();

    /* Job type, BATCH or STREAMING. */
    String jobType();

    /* Source vertex list. */
    List<JobPlanVertex> sources();

    /* Sink vertex list. */
    List<JobPlanVertex> sinks();

    /* Get all vertex list. */
    List<JobPlanVertex> getVerticesSortedTopologicallyFromSources();

    /* Get specific vertex by id. */
    JobPlanVertex vertex(String id); 

    /* Vertex in job logical plan based on JobVertex. */
    @PublicEvolving
    public interface JobPlanVertex {
        String id();
        String name();
        String operatorName();
        String operatorDescription();
        int parallelism(); 
        String invokableClassName();
        boolean supportsConcurrentExecutionAttempts();
        List<JobPlanEdge> inputs();
    }

    /* Edge between vertexes in the logical plan. */
    @PublicEvolving
    public interface JobPlanEdge {
        JobPlanVertex source();
        JobPlanVertex target();
        String distribution();
        String shipStrategyName();
        boolean isBroadcast();
        boolean isForward();
    }
}

/* Table scan source and sink base interface, datastream source/sink vertexes can be added based on the requirements in the future. */
public interface JobPlanTableVertex extends JobPlanVertex {
    /* `catalog`.`database`.`table` for scan source. */
    ObjectIdentifier table();

    /* For Scan source, the type is Values or Table; for sink, the type is CollectSink or ModifySink. */
    String type();

    /* Table options. */
    Map<String, String> config();

    /* For scan source, column list consumed by job; for sink, column list produced by job. */
    List<JobTableColumn> columns();

    /* Column with name and type in the table. */
    public interface JobTableColumn extends Serializable {
        String name();
        LogicalType type();
    }
 
    /* Table scan source vertex. */
	@PublicEvolving
	public interface JobPlanTableSourceVertex extends JobPlanTableVertex {}
 
    /* Table sink vertex. */
	@PublicEvolving
	public interface JobPlanTableSinkVertex extends JobPlanTableVertex {
        /* Modify type, INSERT/UPDATE/DELETE. */
        String modifyType();

        /* Update mode, APPEND/RETRACT/UPSERT. */
        String updateMode();
        boolean overwrite();
        Map<String, String> staticPartitions();
    } 
}

Config Customized Listener

Users should add their listeners to the classpath of client and flink cluster, and config them in the following options

Code Block
# Config job deployment listeners.
table.job.deployment.listeners: {job deployment listener class1},{job deployment listener class2}

# Config job execution listeners.
jobmanager.execution.listeners: {job execution listener class1},{job execution listener class2}

Proposed Changes

Changes for JobDeploymentListener

TableEnvironmentImpl creates customized JobDeploymentListener according to the option table.job.deployment.listeners , and put the listener into CatalogManager and AbstractCatalog. TableEnvironmentImpl can receive existing listeners in constructor with CatalogManager  too, which can be used in some other classes such sql gateway. When DDL related operations are executed in CatalogManager and AbstractCatalog , they should notify the listener.

TableEnvironmentImpl will submit the job after it is created, it notifies the listener before and after the job is submitted.

Changes for JobExecutionListener

Flink sql or table jobs are created from Planner which contains exec nodes, then it is converted to Operation , Transformation and StreamGraph. Finally, the jobs are submitted as JobGraph and job managers create ExecutionGraph from it. The operations of source/sink list are as follows.

draw.io Diagram
bordertrue
diagramNameflow
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1391
revision14

SourceScan in Planner contains source information such as table name, fields and configurations. But these information is hidden in the Source which is an interface when the SourceScan  is converted to Transformation. We should add source information in the conversion of SourceScan->Operation->Transformation->StreamNode.

Similar to sources, Sink and DataStreamSink contain sink information such as table names and configuration. We should add sink information in the conversion of Sink->Operation->Transformation->StreamNode, then we can add Map<JobVertexID, JobSinkVertexInfo> sources in JobGraph and ExecutionGraph too.

After completing the above changes, JobManager can create JobLogicalPlanInfo  from JobGraph  for JobExecutionListener . When the status of job is changed, DefaultExecutionGraph  in JobManager  will notify the listener. At the same time, this listener will also listen to the execution of checkpoint. When CheckpointCoordinator starts/completes/aborts a specific checkpoint, it will notify the listener too.

Plan For The Future

  1. We add column relationships between job vertex in JobLogicalPlanInfo, but it is not supported in Flink at present. We'd like to implement them in the next FLIP. 
  2. Source/Sink relationships for SQL/Table jobs are supported, DataStream  jobs will be supported later.

  3. Currently we only supports scan source, lookup join source should be supported later.

  4. Add Job vertex listener for batch mode, such as scheduling and execution status of vertex, execution status of subtask, etc.


[1] https://datahub.io/

[2] https://atlas.apache.org/#/

[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store