Status
Discussion thread | - |
---|---|
Vote thread | - |
JIRA |
|
Release | - |
Motivation
Flink ETL job consumes data from Source Table and produces result to Sink Table. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].
This FLIP aims to introduce listeners interface in Flink, users can implement them to report the progress of jobs and meta data to external systems. Flink SQL and Table jobs are supported in the first stage, and DataStream will be consider in the future. The main information is as follows
1. Source and Sink information, such as table name, fields, partition keys, primary keys, watermarks, configurations
2. Job information, such as job id/name, execution mode, scheduler type, logical plan
3. Relationship between Source/Sink and jobs, such as source and sink tables, columns in tables for job
4. Job execution information, such as job status, checkpoints
Public Interfaces
CatalogEventListener
DDL operations such as register catalog, create/alter/drop tables and etc will generate different events and notify CatalogEventListener
. All events for CatalogEventListener
extend the basic interface CatalogEvent
and listeners can get catalog from it. Some general events for catalog/database/table are defined as follows and more events can be implemented based on the requirements in the future.
/** * Different events will be fired when a catalog/database/table is modified. The customized listener can receive these events and then do some specific operations according to the event type. */ @PublicEvolving public interface CatalogEventListener { /* Event fired after a catalog/database/table is modified. */ void onEvent(CatalogEvent catalogEvent); /* The basic class for catalog related event. */ @PublicEvolving public interface CatalogEvent { /* The catalog of the event. */ Catalog catalog(); /* The name of catalog. */ String catalogName(); } /* Event for catalog registration. */ @PublicEvolving interface RegisterCatalogEvent extends CatalogEvent { } /* Event for catalog unregistration. */ @PublicEvolving interface UnregisterCatalogEvent extends CatalogEvent { boolean ignoreIfNotExists(); } /* Event for database creation. */ @PublicEvolving interface CreateDatabaseEvent extends CatalogEvent { CatalogDatabase database(); String databaseName(); boolean ignoreIfExists(); } /* Event for dropping database. */ @PublicEvolving interface DropDatabaseEvent extends CatalogEvent { String databaseName(); boolean ignoreIfExists(); } /* Base table event, provides column list, primary keys, partition keys, watermarks and properties in CatalogBaseTable. The table can be source or sink. */ interface BaseTableEvent extends CatalogEvent { ObjectIdentifier identifier(); CatalogBaseTable table(); } /* Event for table creation. */ @PublicEvolving interface CreateTableEvent extends BaseTableEvent { boolean ignoreIfExists(); } /* Event for altering table, provides all information in old table and new table. */ @PublicEvolving interface AlterTableEvent extends BaseTableEvent { List<TableChange> tableChanges(); boolean ignoreIfExists(); } /* Event for dropping table. */ @PublicEvolving interface DropTableEvent extends BaseTableEvent { boolean ignoreIfExists(); } }
JobListener
There is an existing JobListener
which will be notified when job is submitted. Before job is submitted, Flink should create an event with source/sink list for the job and notify the listener, then users can do their customized validation such as whether a table is written by multiple jobs. JobSubmissionEvent
is created for the listener and onEvent
method is added to the listener as follows.
@PublicEvolving public interface JobListener { /* Event is fired before a job is submitted. */ void onEvent(JobSubmissionEvent submissionEvent); /* Event for job submission. */ @PublicEvolving public interface JobSubmissionEvent { JobID jobId(); String jobName(); JobLogicalPlan plan(); } /* Use JobSubmittedEvent instead. */ @Deprecated void onJobSubmitted(@Nullable JobClient jobClient, @Nullable Throwable throwable); /* Use JobExecutedEvent instead. */ @Deprecated void onJobExecuted( @Nullable JobExecutionResult jobExecutionResult, @Nullable Throwable throwable); @PublicEvolving public interface JobPreSubmitEvent extends JobSubmissionEvent { } @PublicEvolving public interface JobSubmittedEvent extends JobSubmissionEvent { @Nullable JobClient jobClient(); @Nullable Throwable throwable(); } @PublicEvolving public interface JobExecutedEvent extends JobSubmissionEvent { @Nullable JobExecutionResult jobExecutionResult(); @Nullable Throwable throwable(); } }
JobExecutionListener
JobManager
generates events when status of job is changed or checkpoint is started and notify JobExecutionListener
. JobStatusEvent
indicates the status of Flink job in JobStatus
with old status, new status and job logical plan.
In addition to status, JobManager
would generate CheckpointEvent
for checkpoint when it is started/completed/aborted and notify JobExecutionListener
. All checkpoint related events extend CheckpointEvent
and more events can be added in the future.
/** * When job status is changed in job manager, it will generate job event and notify job execution listener. */ @PublicEvolving public interface JobExecutionListener { /* Event fired after job status has been changed. */ void onJobStatusChanged(JobStatusEvent jobStatusEvent); /* Event fired when a checkpoint is started/completed/aborted. */ void onCheckpoint(CheckpointEvent checkpointEvent); /* Job status event with plan. */ @PublicEvolving public interface JobStatusEvent { JobLogicalPlan plan(); JobStatus oldStatus(); JobStatus newStatus(); } /* Event for job checkpoint. */ @PublicEvolving public interface CheckpointEvent { /* Snapshot type, checkpoint or savepoint. */ String snapshotType(); long checkpoint(); @Nullable String externalSavepointLocation(); boolean isPeriodic; long timestamp(); Map<String, String> config(); } /* Checkpoint started/completed/aborted event. */ @PublicEvolving public interface CheckpointStartedEvent extends CheckpointEvent {} @PublicEvolving public interface CheckpointCompletedEvent extends CheckpointEvent {} @PublicEvolving public interface CheckpointStartedEvent extends CheckpointEvent {} }
Job Logical Plan
There is JobLogicalPlan
in above events for different listeners. Users can get the plan to report more information about the job, such as source/sink tables in the job, column relation between source/sink tables and topology of the job. JobPlanVertex
in the logical is built on JobVertex
in JobGraph
and provides basic information.
In addition, JobPlanVertex also require additional information, such as schema for source/sink in Table and SQL job. JobPlanTableVertex
for Table and SQL job extends the basic JobPlanVertex
, and vertex for DataStream
job can be added in the future.
/** * Job logical plan is built according to JobGraph. Users can get sources, sinks and the relationship between nodes from plan. */ @PublicEvolvig public interface JobLogicalPlan { JobID jobId(); String jobName(); /* Scheduler type such as Default/Adaptive/AdaptiveBatch. */ String scheduler(); /* Job execution mode, PIPELINED/PIPELINED_FORCED/BATCH/BATCH_FORCED. */ ExecutionMode executionMode(); /* Job type, BATCH or STREAMING. */ String jobType(); /* Source vertex list. */ List<JobPlanVertex> sources(); /* Sink vertex list. */ List<JobPlanVertex> sinks(); /* Get all vertex list. */ List<JobPlanVertex> getVerticesSortedTopologicallyFromSources(); /* Get specific vertex by id. */ JobPlanVertex vertex(String id); /* Job configuration. */ Configuration jobConfiguration(); /* Vertex in job logical plan based on JobVertex. */ @PublicEvolving public interface JobPlanVertex { String id(); String name(); String operatorName(); String operatorDescription(); int parallelism(); String invokableClassName(); boolean supportsConcurrentExecutionAttempts(); List<JobPlanEdge> inputs(); } /* Edge between vertexes in the logical plan. */ @PublicEvolving public interface JobPlanEdge { JobPlanVertex source(); JobPlanVertex target(); String distribution(); String shipStrategyName(); boolean isBroadcast(); boolean isForward(); } } /* Table scan source and sink base interface, datastream source/sink vertex can be added based on the requirements in the future. */ public interface JobPlanTableVertex extends JobPlanVertex { /* `catalog`.`database`.`table` for scan source. */ ObjectIdentifier table(); /* For Scan source, the type is Values or Table; for sink, the type is CollectSink or ModifySink. */ String type(); /* Table options. */ Map<String, String> config(); /* For scan source, columns which are consumed by source; for sink, columns which are produced by sink. */ List<JobTableColumn> columns(); /* Column with name and type in the table. */ public interface JobTableColumn extends Serializable { String name(); LogicalType type(); } /* Table scan source vertex. */ @PublicEvolving public interface JobPlanTableSourceVertex extends JobPlanTableVertex {} /* Table sink vertex. */ @PublicEvolving public interface JobPlanTableSinkVertex extends JobPlanTableVertex { /* Modify type, INSERT/UPDATE/DELETE. */ String modifyType(); /* Update mode, APPEND/RETRACT/UPSERT. */ String updateMode(); boolean overwrite(); Map<String, String> staticPartitions(); } }
Config Customized Listener
Users should add their listeners to the classpath of client and flink cluster, and config them in the following options
# Config catalog event listeners. table.catalog.listeners: {job catalog listener class1},{job catalog listener class2} # Existing config job submission listeners. execution.job-listeners: {job submission listener class1},{job submission listener class2} # Config job execution listeners. jobmanager.execution.listeners: {job execution listener class1},{job execution listener class2}
Proposed Changes
Changes for CatalogEventListener
TableEnvironmentImpl
creates customized CatalogEventListener
according to the option table.catalog.listeners
, and put the listener into CatalogManager
and AbstractCatalog
. TableEnvironmentImpl
can use the External component such as Sql-Gateway can create listeners and CatalogManager
itself and create TableEnvironmentImpl
with them. When DDL related operations are executed in CatalogManager
and AbstractCatalog
, they should notify the listeners.
Changes for JobExecutionListener
Flink sql or table jobs are created from Planner
which contains exec nodes, then it is converted to Operation
, Transformation
and StreamGraph
. Finally, the jobs are submitted as JobGraph
and job managers create ExecutionGraph
from it. The operations of source/sink list are as follows.
SourceScan
in Planner
contains source information such as table name, fields and configurations. But these information is hidden in the Source
which is an interface when the SourceScan
is converted to Transformation
. We should add source information in the conversion of SourceScan
->Operation->Transformation
->StreamNode
.
Similar to sources, Sink
and DataStreamSink
contain sink information such as table names and configuration. We should add sink information in the conversion of Sink
->Operation->Transformation
->StreamNode
, then we can add Map<JobVertexID, JobSinkVertexInfo> sources
in JobGraph
and ExecutionGraph
too.
After completing the above changes, JobManager
can create JobLogicalPlan
from JobGraph
for JobExecutionListener
. When the status of job is changed, DefaultExecutionGraph
in JobManager
will notify the listener. At the same time, this listener will also listen to the execution of checkpoint. When CheckpointCoordinator
starts/completes/aborts a specific checkpoint, it will notify the listener too.
Listener Construction and Execution
While the current JobListener
is created by an empty constructor, all customized listeners above can be created by a constructor with Configuration
or an empty constructor. Flink takes precedence over constructors with Configuration if it is exist.
Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.
Plan For The Future
- We add column relationships between job vertex in JobLogicalPlanInfo, but it is not supported in Flink at present. We'd like to implement them in the next FLIP.
Source/Sink relationships for SQL/Table jobs are supported,
DataStream
jobs will be supported later.Currently we only supports scan source, lookup join source should be supported later.
Add Job vertex listener for batch mode, such as scheduling and execution status of vertex, execution status of subtask, etc.
[2] https://atlas.apache.org/#/
[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store