Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Discussion thread-
Vote thread-
JIRA-
Release-

Motivation

Public Interfaces

Briefly list any new interfaces that will be introduced as part of this proposal or any existing interfaces that will be removed or changed. The purpose of this section is to concisely call out the public contract that will come along with this feature.

A public interface is any change to the following:

  • DataStream and DataSet API, including classes related to that, such as StreamExecutionEnvironment
  • Classes marked with the @Public annotation
  • On-disk binary formats, such as checkpoints/savepoints
  • User-facing scripts/command-line tools, i.e. bin/flink, Yarn scripts, Mesos scripts
  • Configuration settings
  • Exposed monitoring information

Proposed Changes

Describe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.

Compatibility, Deprecation, and Migration Plan

  • What impact (if any) will there be on existing users? 
  • If we are changing behavior how will we phase out the older behavior? 
  • If we need special migration tools, describe them here.
  • When will we remove the existing behavior?

Test Plan

Describe in few sentences how the FLIP will be tested. We are mostly interested in system tests (since unit-tests are specific to implementation details). How will we know that the implementation works as expected? How will we know nothing broke?

Rejected Alternatives

...


Flink ETL job consumes data from Source Table and produces result to Sink Table after computation. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships with customized listeners to other systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLINK-276 [3].
I'd like to introduce JobExecutionListener to users in this FLIP. They can implement the listener to receive job events when the job is created, finished, canceled and failed. The information in the job event includes

  1. JobID, the identity id for job

  2. Job name, the name of given job

  3. Source table list of the job

  4. Sink table list of the job

Public Interfaces

Job Execution Listener


Added JobExecutionListener listens to the status changes in the job. Job creates JobEvent for each status when it changes, and notify specific method in JobExecutionListener. Users can implement different listeners according to their needs, such as Datahub listener or Atlas listener.

Code Block
/**
 * When job status is changed in job manager, it will generate job event to
 * notify job execution listener.
 */
@PublicEvolving
public interface JobExecutionListener extends AutoCloseable {
    /* Start the listener with job configuration. */
    void start(Configuration configuration) throws Exception;

    /* Notify listener when job is created, it will be notified once. */
    void onCreated(JobCreatedEvent createdEvent);

    /* Notify listener when job is finished. */
    void onFinished(JobFinishedEvent finishedEvent);

    /* Notify listener when job is canceled. */
    void onCanceled(JobCanceledEvent canceledEvent);

    /* Notify listener when job is failed. */
    void onFailed(JobFailedEvent failedEvent);

    /* Event for job status is changed. */
    interface JobEvent {
        /* Job id. */
        JobID jobId();
        /* Job name. */
        String jobName();
        /* Timestamp for current job status. */
        long timestamp();
    }
    
    /* Source/Sink information. */
    @PublicEvolving
    interface SourceSinkInformation {
        /* Use catalog.database.table for table api and use source/sink name for datastream. */
        String name();
        /* Source/Sink operator name. */
        String operatorName();
        /* Configuration for source/sink. */
        Configuration configuration();
    }

    /* Event for job is created. */
    @PublicEvolving
    interface JobCreatedEvent extends JobEvent {
        /* Scan source list. */
        List<SourceSinkInformation> scanSources();

        /* Sink list. */
        List<SourceSinkInformation> sinks();
    }

    /* Event for job is finished. */
    @PublicEvolving
    interface JobFinishedEvent extends JobEvent { }

    /* Event for job is canceled. */
    @PublicEvolving
    interface JobCanceledEvent extends JobEvent { }

    /* Event for job is failed. */
    @PublicEvolving
    interface JobFailedEvent extends JobEvent {
        Throwable exception();
    }
}

Config Customized Listener


Users should add their listener to the classpath of flink cluster, and use the listener with the option in JobManagerOptions as followed

Code Block
jobmanager.execution.listener: {user's listener class}

Proposed Changes


The basic information such as job id and name are in ExecutionGraph, but the source and sink list are not. They should be added to ExecutionGraph for JobExecutionListener too.

Flink jobs are created from Planner(sql and table) and DataStream, then they are converted to Transformation and StreamGraph. Finally, the jobs are submitted as JobGraph and job managers create ExecutionGraph from it. The operations of source/sink list are as follows.

draw.io Diagram
bordertrue
diagramNameflow
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1131
revision3

SourceScan in Planner and DataStreamSource in DataStream contain source information such as table name, source configuration. But they hide them in the Source interface when they are converted to Transformation. We should add source information in the conversion of SourceScan/DataStreamSource->Transformation->StreamNode, then we can add Map<JobVertexID, SourceSinkInformation> sources in JobGraph and ExecutionGraph.

Similar to sources, Sink and DataStreamSink contain sink information such as table names and configuration. We should add sink information in the conversion of Sink/DataStreamSink->Transformation->StreamNode, then we can add Map<JobVertexID, SourceSinkInformation> sources in JobGraph and ExecutionGraph too.

JobManager creates instances of user's JobExecutionListener and gets sources/sinks information from ExecutionGraph. When the status of job is changed, JobManager creates specific job events and notifies the JobExecutionListener.

Plan For The Future

  1. Add ddl listener and submit listener to report fields in tables and supports customized validations for tables and jobs.

  2. This FLIP only supports scan source, we can support lookup join source later.

  3. Job vertex listener, such as scheduling and execution status of vertex, execution status of subtask, etc.