You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 12 Next »

Status

Discussion thread-
Vote thread-
JIRA

Unable to render Jira issues macro, execution error.

Release-

Motivation

Flink ETL job consumes data from Source Table and produces result to Sink Table after computation. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships with customized listeners to other systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLINK-276 [3].

This FLIP aims to introduce listeners for users in Flink, then users can implement them to report the progress of jobs and meta data to external system. The main information is as follows

1. Source and Sink information, such as table name, fields, partition keys, primary keys, watermarks, configurations

2. Job information, such as job id/name, execution mode, scheduler type, logical plan

3. Relationship between Source/Sink and jobs, such as source and sink tables for job, fields relationships in job and vertex

4. Job execution information, such as job status changes, vertex status changes, checkpoints

Public Interfaces

JobCreationListener

JobCreationListener is used to receive events of ddl, job submission, it only supports sql/table jobs in this FLIP.

/**
 * Job creation listener, client will create specific event and notify this listener.
 */
@PublicEvolving
public interface JobCreationListener {
	/* Start the listener. */
	void start(Map<String, String> config);

    /* Event fired after a catalog has been registered. */
    void onRegisterCatalog(CatalogEvent catalogEvent);

    void onUnregisterCatalog(UnregisterCatalogEvent catalogEvent);

    /* Event fired after a database has been created. */
    void onCreateDatabase(DatabaseEvent databaseEvent);

    /* Event fired after a database has been dropped. */
    void onDropDatabase(DatabaseEvent databaseEvent);

    /* Event fired after a table has been created. */
    void onCreateTable(CreateTableEvent tableEvent);

    /* Event fired after a table has been changed. */
    void onAlterTable(AlterTableEvent tableEvent);

    /* Event fired after a table has been dropped. */
    void onDropTable(DropTableEvent tableEvent); 

    /* Event fired before a job is submitted to do some validations. */
    void onJobPreSubmission(JobPreSubmitEvent submitEvent);

    @PublicEvolving
    interface CatalogEvent {
        String catalog();
        Catalog catalog();
    }

    @PublicEvolving
    interface UnregisterCatalogEvent {
        String catalog();
        boolean ignoreIfNotExists();
    }

    @PublicEvolving
    interface DatabaseEvent {
        String catalog();
        String name();
        CatalogDatabase database();
        boolean ignoreIfExists();
    }

    @PublicEvolving
    interface DropDatabaseEvent {
        String catalog();
        String name(); 
        boolean ignoreIfExists();
    }

    @PublicEvolving
    interface TableEvent {
        ObjectIdentifier identifier();  
        CatalogBaseTable table();
    }

    @PublicEvolving
    interface CreateTableEvent extends TableEvent {
        boolean ignoreIfExists();
    }

    @PublicEvolving
    interface AlterTableEvent extends TableEvent {
        CreateTableEvent newTable();
        boolean ignoreIfExists();
    }

    @PublicEvolving
    interface DropTableEvent {
        ObjectIdentifier identifier();
        boolean ignoreIfExists();   
    }

    @PublicEvolving
    interface JobPreSubmitEvent {
        List<TableEvent> sources();
        List<TableEvent> sinks();
    }
}

JobExecutionListener

Added JobExecutionListener listens to the status changes in the job. Job creates JobEvent for each status when it changes, and notify specific method in JobExecutionListener. Users can implement different listeners according to their needs, such as Datahub listener or Atlas listener.

/**
 * When job status is changed in job manager, it will generate job event to notify job execution listener.
 */
@PublicEvolving
public interface JobExecutionListener extends AutoCloseable {
    /* Start the listener with job configuration. */
    void start(Configuration configuration) throws Exception;

    /* Notify listener when job is created, it will be notified once. */
    void onCreated(JobCreatedEvent createdEvent);

    /* Notify listener when job is finished. */
    void onFinished(JobFinishedEvent finishedEvent);

    /* Notify listener when job is canceled. */
    void onCanceled(JobCanceledEvent canceledEvent);

    /* Notify listener when job is failed. */
    void onFailed(JobFailedEvent failedEvent);

    /* Event for job status is changed. */
    interface JobEvent {
        /* Job id. */
        JobID jobId();
        /* Job name. */
        String jobName();
        /* Timestamp for current job status. */
        long timestamp();
    }
    
    /* Source/Sink information. */
    @PublicEvolving
    interface SourceSinkInformation {
        /* Use catalog.database.table for table api and use source/sink name for datastream. */
        String name();
        /* Source/Sink operator name. */
        String operatorName();
        /* Configuration for source/sink. */
        Configuration configuration();
    }

    /* Event for job is created. */
    @PublicEvolving
    interface JobCreatedEvent extends JobEvent {
        /* Scan source list. */
        List<SourceSinkInformation> scanSources();

        /* Sink list. */
        List<SourceSinkInformation> sinks();
    }

    /* Event for job is finished. */
    @PublicEvolving
    interface JobFinishedEvent extends JobEvent { }

    /* Event for job is canceled. */
    @PublicEvolving
    interface JobCanceledEvent extends JobEvent { }

    /* Event for job is failed. */
    @PublicEvolving
    interface JobFailedEvent extends JobEvent {
        Throwable exception();
    }
}

Config Customized Listener

Users should add their listener to the classpath of flink cluster, and use the listener with the option in JobManagerOptions as followed

jobmanager.execution.listener: {user's listener class}

Proposed Changes

The basic information such as job id and name are in ExecutionGraph, but the source and sink list are not. They should be added to ExecutionGraph for JobExecutionListener too.

Flink jobs are created from Planner(sql and table) and DataStream, then they are converted to Transformation and StreamGraph. Finally, the jobs are submitted as JobGraph and job managers create ExecutionGraph from it. The operations of source/sink list are as follows.

SourceScan in Planner and DataStreamSource in DataStream contain source information such as table name, source configuration. But these information is hidden in the Source which is an interface when the SourceScan and DataStreamSource  are converted to Transformation. We should add source information in the conversion of SourceScan/DataStreamSource->Transformation->StreamNode, then we can add Map<JobVertexID, SourceSinkInformation> sources in JobGraph and ExecutionGraph.

Similar to sources, Sink and DataStreamSink contain sink information such as table names and configuration. We should add sink information in the conversion of Sink/DataStreamSink->Transformation->StreamNode, then we can add Map<JobVertexID, SourceSinkInformation> sources in JobGraph and ExecutionGraph too.

JobManager creates instances of user's JobExecutionListener and gets sources/sinks information from ExecutionGraph. When the status of job is changed, JobManager creates specific job events and notifies the JobExecutionListener in DefaultExecutionGraph.transitionState 

Plan For The Future

  1. Add ddl listener and submit listener to report fields in tables and supports customized validations for tables and jobs.

  2. This FLIP only supports scan source, we can support lookup join source later.

  3. Job vertex listener, such as scheduling and execution status of vertex, execution status of subtask, etc.

  • No labels