Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Flink ETL job consumes data from Source Table and produces result to Sink Table after computation. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships to other systems by customized listeners, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].

This FLIP aims to introduce listeners for users in Flink, then users can implement them to report the progress of jobs and meta data to external system. Flink SQL and Table api are supported in the first stage, and DataStream will be consider in the future. The main information is as follows

...

4. Job execution information, such as job status changes, vertex status changes, checkpoints

Public Interfaces

...

JobDeploymentListener is used to receive collect events of ddl , and job submission, it only supports sql/table jobs in this FLIP.

Code Block
/**
 * Job creationdeployment listener, client will create specific event and notify this listener when it submits a job.
 */
@PublicEvolving
public interface JobDeploymentListener {
	/* Start the listener. */
	void start(Map<String, String> config);

    /* Event fired after a catalog has been registered. */
    void onRegisterCatalog(CatalogEventRegisterCatalogEvent catalogEvent);

    void onUnregisterCatalog(UnregisterCatalogEvent /* Event fired after a catalog has been unregistered. */ 
    void onUnregisterCatalog(UnregisterCatalogEvent catalogEvent);

    /* Event fired after a database has been created. */
    void onCreateDatabase(DatabaseEventCreateDatabaseEvent databaseEvent);

    /* Event fired after a database has been dropped. */
    void onDropDatabase(DatabaseEventDropDatabaseEvent databaseEvent);

    /* Event fired after a table has been created. */
    void onCreateTable(CreateTableEvent tableEvent);

    /* Event fired after a table has been changed. */
    void onAlterTable(AlterTableEvent tableEvent);

    /* Event fired after a table has been dropped. */
    void onDropTable(DropTableEvent tableEvent); 

    /* Event fired before a job is submitted to do some validations. */
    void onJobPreSubmission(JobSubmissionEvent submitEvent); 

    /* Event fired after a job is submitted. */
    void onJobSubmission(JobSubmissionEvent submitEvent); 
 
    /* Event for catalog registration, provides catalog name, default database, database list and properties in the Catalog. */
    @PublicEvolving
    interface CatalogEventRegisterCatalogEvent {
        String catalog();
        Catalog catalog();
    }
 
    /* Event for catalog unregistration. */ 
    @PublicEvolving
    interface UnregisterCatalogEvent {
        String catalog();
        boolean ignoreIfNotExists();
    }

    /* Event for database creation, provides catalog name, database name, comment and properties of the database. */
    @PublicEvolving
    interface DatabaseEventCreateDatabaseEvent {
        String catalog();
        String name();
        CatalogDatabase database();
        boolean ignoreIfExists();
    }

      /* Event for dropping database. */
    @PublicEvolving
    interface DropDatabaseEvent {
        String catalog();
        String name(); 
        boolean ignoreIfExists();
    }

    /* Table information event, provides column list, primary keys, partition keys, watermarks and properties in the table. The table can be source or sink. */
    @PublicEvolving
    interface TableEvent {
        ObjectIdentifier identifier();  
        CatalogBaseTable table();
    }

    /* Event for table creation. */
    @PublicEvolving
    interface CreateTableEvent extends TableEvent {
        boolean ignoreIfExists();
    }

    /* Event for altering table, provides all information in old table and new table. */
    @PublicEvolving
    interface AlterTableEvent extends TableEvent {
        CreateTableEvent newTable();
        boolean ignoreIfExists();
    }

    /* Event for dropping table. */
    @PublicEvolving
    interface DropTableEvent {
        ObjectIdentifier identifier();
        boolean ignoreIfExists();   
    }

    /* Event for before and after job is submitted, provides source tables and sink tables. */
    @PublicEvolving
    interface JobSubmissionEvent {
        JobID jobId();
        String jobName();
        List<TableEvent> sources();
        List<TableEvent> sinks();
    }
}

JobExecutionListener

Added JobExecutionListener listens to the status changes in the job. JobManager  creates JobEvent for each status when it changesof job, and notify specific method in notifies JobExecutionListener. Users can implement different listeners according to their needs, such as Datahub listener or Atlas listener.

Code Block
/**
 * When job status is changed in job manager, it will generate job event toand notify job execution listener.
 */
@PublicEvolving
public interface JobExecutionListener extends AutoCloseable {
    /* StartEvent thefired listener withafter job config. */
    void start(Map<String, String> config) throws Exception;
  
    /* Event fired after job has has been created. */ 
    void onCreated(JobCreatedEvent createdEvent);

    /* Event fired after job has beed finished. */
    void onFinished(JobFinishedEvent finishedEvent);

    /* Event fired after job has beed canceled. */
    void onCanceled(JobCanceledEvent canceledEvent);

    /* Event fired after job has beed failed. */
    void onFailed(JobFailedEvent failedEvent);

    /* Event fired after checkpoint has been started. */
    void onCheckpointStarted(CheckpointEvent checkpointEvent);

    /* Event fired after checkpoint has been completed. */
    void onCheckpointCompleted(CheckpointEvent checkpointEvent);

    /* Event fired after checkpoint has been aborted. */
    void onCheckpointAborted(CheckpointEvent checkpointEvent);

    /* Column Vertexinformation in jobvertex plan,inputs providesand id, name, outputs. */
    @PublicEvolving
    interface VertexColumn {
        /* The column name. */
        String name();
        /* The column type such as INT/BIGINT/FLOAT/DOUBLE and etc. */
        String type();
    }

    /* Vertex in job plan, provides id, name, parallelism, input edges and output column namescolumns. */
    @PublicEvolving
    interface JobVertexInfo {
        String id();
        String name();
        String operatorName();
        String operatorDescription();
        int parallelism(); 
        String invokableClassName();
        boolean supportsConcurrentExecutionAttempts();
        List<JobEdgeInfo> inputs();
        List<String>List<VertexColumn> outputColumns();
    }
 
    /* Edge in job plan, provides source/target vertex, input columns, distribution, isBroadcast and isForward. */
    @PublicEvolving 
    interface JobEdgeInfo {
        JobVertexInfo source();
        JobVertexInfo target();
        /* Input column namescolumns of the edge. */
        List<String>List<VertexColumn> inputColumns();
        String distribution();
        String shipStrategyName();
        boolean isBroadcast();
        boolean isForward();
        String preProcessingOperationName();
        String operatorLevelCachingDescription();
    }

    /* Job scan source vertex, provides source table name, input table columns and source type. */ 
    @PublicEvolving 
    interface JobScanSourceVertexInfo extends JobVertexInfo {
        /* `Catalog.Database.Table` format name. */
        String sourceName();

        /* Values or Table. */
        String type();

        /* Columns from source table, detailed information such as column type is provided in {@Code JobCreationListener#onCreateTable} for the source vertex. */
        List<String>List<VertexColumn> columns();

        /* Source config provides options in source such as source type. */
        Map<String, String> config();
    }

    /* */ 
    @PublicEvolving 
    interface JobSinkVertexInfo extends JobVertexInfo {
        /* `Catalog.Database.Table` format name. */ 
        String sinkName();

        /* CollectSink or ModifySink. */
        String type();

        /* INSERT/UPDATE/DELETE. */
        String modifyType(); 

        /* APPEND/RETRACT/UPSERT. */
        String updateMode();

        /* Columns fromto sink table, detailedfrom informationthe such as column type is provided in {@Code JobCreationListener#onCreateTable}sink vertex. */ 
        List<String>List<VertexColumn> columns();  
        boolean overwrite();
        Map<String, String> staticPartitions(); 
        Map<String, String> config();
    }

    /* Event for job status is changed. */
    interface JobBaseEvent {
        JobID jobId();
        String jobName();

        /* Scheduler type such as Default/Adaptive/AdaptiveBatch. */
        String scheduler();

        /* Job execution mode, PIPELINED/PIPELINED_FORCED/BATCH/BATCH_FORCED. */
        ExecutionMode executionMode();

        /* BATCH or STREAMING. */
        String jobType();
        
        /* Timestamp for current job status. */
        long timestamp();
    }

    @PublicEvolving
    interface JobLogicalPlanInfo {
        /* Source list. */
        List<JobSourceVertex> sources();

        /* Sink list. */
        List<JobSinkVertex> sinks();

        List<JobVertexInfo> getVerticesSortedTopologicallyFromSources();
        JobVertexInfo vertex(String id);  
    }

    /* Event for job is created. */
    @PublicEvolving
    interface JobCreatedEvent extends JobBaseEvent {
        JobLogicalPlanInfo plan();
    }

    /* Event for job is finished. */
    @PublicEvolving
    interface JobFinishedEvent extends JobBaseEvent { }

    /* Event for job is canceled. */
    @PublicEvolving
    interface JobCanceledEvent extends JobBaseEvent { }

    /* Event for job is failed. */
    @PublicEvolving
    interface JobFailedEvent extends JobBaseEvent {
        Throwable exception();
    }

    /* Event for job checkpoint. */
    @PublicEvolving
    interface CheckpointEvent extends JobBaseEvent {
        long checkpoint();
        String externalSavepointLocation();
  /* checkpoint or savepoint. */
      boolean isPeriodic;
 String snapshotType(); 
        long timestampcheckpoint();
        String externalSavepointLocation();
        /* checkpoint or savepoint. */boolean isPeriodic;
        Stringlong snapshotTypetimestamp();
        Map<String, String> config();
    }
}

...

Users should add their listener to the classpath of client and flink cluster, and use config them in the following options to add listeners

Code Block
# Config job deployment listeners.
table.job.deployment.listeners: {job deployment listener class1},{job deployment listener class2}

# Config job execution listeners.
jobmanager.execution.listeners: {job execution listener class1},{job execution listener class2}

...