Status
Discussion thread | - |
---|---|
Vote thread | - |
JIRA |
|
Release | - |
Motivation
Flink ETL job consumes data from Source Table and produces result to Sink Table after computation. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships with customized listeners to other systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLINK-276 [3].
This FLIP aims to introduce listeners for users in Flink, then users can implement them to report the progress of jobs and meta data to external system. The main information is as follows
1. Source and Sink information, such as table name, fields, partition keys, primary keys, watermarks, configurations
2. Job information, such as job id/name, execution mode, scheduler type, logical plan
3. Relationship between Source/Sink and jobs, such as source and sink tables for job, fields relationships in job and vertex
4. Job execution information, such as job status changes, vertex status changes, checkpoints
Public Interfaces
JobCreationListener
JobCreationListener
is used to receive events of ddl, job submission, it only supports sql/table jobs in this FLIP.
/** * Job creation listener, client will create specific event and notify this listener. */ @PublicEvolving public interface JobCreationListener { /* Start the listener. */ void start(Map<String, String> config); /* Event fired after a catalog has been registered. */ void onRegisterCatalog(CatalogEvent catalogEvent); void onUnregisterCatalog(UnregisterCatalogEvent catalogEvent); /* Event fired after a database has been created. */ void onCreateDatabase(DatabaseEvent databaseEvent); /* Event fired after a database has been dropped. */ void onDropDatabase(DatabaseEvent databaseEvent); /* Event fired after a table has been created. */ void onCreateTable(CreateTableEvent tableEvent); /* Event fired after a table has been changed. */ void onAlterTable(AlterTableEvent tableEvent); /* Event fired after a table has been dropped. */ void onDropTable(DropTableEvent tableEvent); /* Event fired before a job is submitted to do some validations. */ void onJobPreSubmission(JobSubmissionEvent submitEvent); /* Event fired after a job is submitted. */ void onJobSubmission(JobSubmissionEvent submitEvent); /* Event for catalog registration, provides catalog name, default database, database list and properties in the Catalog. */ @PublicEvolving interface CatalogEvent { String catalog(); Catalog catalog(); } /* Event for catalog unregistration. */ @PublicEvolving interface UnregisterCatalogEvent { String catalog(); boolean ignoreIfNotExists(); } /* Event for database creation, provides catalog name, database name, comment and properties of the database. */ @PublicEvolving interface DatabaseEvent { String catalog(); String name(); CatalogDatabase database(); boolean ignoreIfExists(); } /* Event for dropping database. */ @PublicEvolving interface DropDatabaseEvent { String catalog(); String name(); boolean ignoreIfExists(); } /* Table information event, provides column list, primary keys, partition keys, watermarks and properties in the table. The table can be source or sink. */ @PublicEvolving interface TableEvent { ObjectIdentifier identifier(); CatalogBaseTable table(); } /* Event for table creation. */ @PublicEvolving interface CreateTableEvent extends TableEvent { boolean ignoreIfExists(); } /* Event for altering table, provides all information in old table and new table. */ @PublicEvolving interface AlterTableEvent extends TableEvent { CreateTableEvent newTable(); boolean ignoreIfExists(); } /* Event for dropping table. */ @PublicEvolving interface DropTableEvent { ObjectIdentifier identifier(); boolean ignoreIfExists(); } /* Event for before and after job is submitted, provides source tables and sink tables. */ @PublicEvolving interface JobSubmissionEvent { List<TableEvent> sources(); List<TableEvent> sinks(); } }
JobExecutionListener
Added JobExecutionListener
listens to the status changes in the job. JobManager
creates JobEvent
for each status when it changes, and notify specific method in JobExecutionListener
. Users can implement different listeners according to their needs, such as Datahub listener or Atlas listener.
/** * When job status is changed in job manager, it will generate job event to notify job execution listener. */ @PublicEvolving public interface JobExecutionListener extends AutoCloseable { /* Start the listener with job config. */ void start(Map<String, String> config) throws Exception; /* Event fired after job has been created. */ void onCreated(JobCreatedEvent createdEvent); /* Event fired after job has beed finished. */ void onFinished(JobFinishedEvent finishedEvent); /* Event fired after job has beed canceled. */ void onCanceled(JobCanceledEvent canceledEvent); /* Event fired after job has beed failed. */ void onFailed(JobFailedEvent failedEvent); /* Vertex in job plan, provides id, name, parallelism, input edges and output column names. */ @PublicEvolving interface JobVertexInfo { String id(); String name(); int parallelism(); List<JobEdgeInfo> inputs(); List<String> outputColumns(); } /* Edge in job plan, provides source/target vertex, input columns, distribution, isBroadcast and isForward. */ @PublicEvolving interface JobEdgeInfo { JobVertexInfo source(); JobVertexInfo target(); /* Input column names of the edge. */ List<String> inputColumns(); String distribution(); boolean isBroadcast(); boolean isForward(); } /* Job source vertex, provides source table name, input table columns and source type. */ @PublicEvolving interface JobSourceVertex extends JobVertexInfo { /* `Catalog.Database.Table` format name. */ String sourceName(); /* Scan or lookup. */ String type(); /* Columns from source table, detailed information such as column type is provided in {@Code JobCreationListener#onCreateTable}. */ List<String> columns(); /* Source config provides options in source such as source type. */ Map<String, String> config(); } /* */ @PublicEvolving interface JobSinkVertex extends JobVertexInfo { /* `Catalog.Database.Table` format name. */ String sinkName(); /* Columns from sink table, detailed information such as column type is provided in {@Code JobCreationListener#onCreateTable}. */ List<String> columns(); /* Sink config provides options in source such as source type. */ Map<String, String> config(); } /* Event for job status is changed. */ interface JobEvent { JobID jobId(); String jobName(); /* Scheduler type such as Default/Adaptive/AdaptiveBatch. */ String scheduler(); /* Job execution mode. */ ExecutionMode executionMode(); /* BATCH or STREAMING. */ String jobType(); /* Timestamp for current job status. */ long timestamp(); } /* Event for job is created. */ @PublicEvolving interface JobCreatedEvent extends JobEvent { /* Source list. */ List<JobSourceVertex> sources(); /* Sink list. */ List<JobSinkVertex> sinks(); } /* Event for job is finished. */ @PublicEvolving interface JobFinishedEvent extends JobEvent { } /* Event for job is canceled. */ @PublicEvolving interface JobCanceledEvent extends JobEvent { } /* Event for job is failed. */ @PublicEvolving interface JobFailedEvent extends JobEvent { Throwable exception(); } }
Config Customized Listener
Users should add their listener to the classpath of flink cluster, and use the listener with the option in JobManagerOptions
as followed
jobmanager.execution.listener: {user's listener class}
Proposed Changes
The basic information such as job id and name are in ExecutionGraph
, but the source and sink list are not. They should be added to ExecutionGraph
for JobExecutionListener
too.
Flink jobs are created from Planner
(sql and table) and DataStream
, then they are converted to Transformation
and StreamGraph
. Finally, the jobs are submitted as JobGraph
and job managers create ExecutionGraph
from it. The operations of source/sink list are as follows.
SourceScan
in Planner
and DataStreamSource
in DataStream
contain source information such as table name, source configuration. But these information is hidden in the Source
which is an interface when the SourceScan
and DataStreamSource
are converted to Transformation
. We should add source information in the conversion of SourceScan
/DataStreamSource
->Transformation
->StreamNode
, then we can add Map<JobVertexID, SourceSinkInformation> sources
in JobGraph
and ExecutionGraph
.
Similar to sources, Sink
and DataStreamSink
contain sink information such as table names and configuration. We should add sink information in the conversion of Sink
/DataStreamSink
->Transformation
->StreamNode
, then we can add Map<JobVertexID, SourceSinkInformation> sources
in JobGraph
and ExecutionGraph
too.
JobManager creates instances of user's JobExecutionListener
and gets sources/sinks information from ExecutionGraph
. When the status of job is changed, JobManager creates specific job events and notifies the JobExecutionListener
in DefaultExecutionGraph.transitionState
Plan For The Future
Add ddl listener and submit listener to report fields in tables and supports customized validations for tables and jobs.
This FLIP only supports scan source, we can support lookup join source later.
Job vertex listener, such as scheduling and execution status of vertex, execution status of subtask, etc.