Status
Discussion thread | - |
---|---|
Vote thread | - |
JIRA |
|
Release | - |
Motivation
Flink ETL job consumes data from Source Table and produces result to Sink Table. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].
This FLIP aims to introduce listeners interface in Flink, users can implement them to report the progress of jobs and meta data to external systems. Flink SQL and Table jobs are supported in the first stage, and DataStream will be consider in the future. The main information is as follows
1. Source and Sink information, such as table name, fields, partition keys, primary keys, watermarks, configurations
2. Job information, such as job id/name, execution mode, scheduler type, logical plan
3. Relationship between Source/Sink and jobs, such as source and sink tables for job, fields relationships in job and vertex
4. Job execution information, such as job status, checkpoints
Public Interfaces
JobDeploymentListener
JobDeploymentListener
is used to collect events of ddl and job submission, it only supports sql/table jobs in this FLIP.
/** * Job deployment listener, client will create specific event and notify this listener when it submits a job. */ @PublicEvolving public interface JobDeploymentListener { /* Event fired after a catalog has been registered. */ void onRegisterCatalog(RegisterCatalogEvent catalogEvent); /* Event fired after a catalog has been unregistered. */ void onUnregisterCatalog(UnregisterCatalogEvent catalogEvent); /* Event fired after a database has been created. */ void onCreateDatabase(CreateDatabaseEvent databaseEvent); /* Event fired after a database has been dropped. */ void onDropDatabase(DropDatabaseEvent databaseEvent); /* Event fired after a table has been created. */ void onCreateTable(CreateTableEvent tableEvent); /* Event fired after a table has been changed. */ void onAlterTable(AlterTableEvent tableEvent); /* Event fired after a table has been dropped. */ void onDropTable(DropTableEvent tableEvent); /* Event fired before a job is submitted to do some validations. */ void onJobPreSubmission(JobSubmissionEvent submitEvent); /* Event fired after a job is submitted. */ void onJobSubmission(JobSubmissionEvent submitEvent); /* Event for catalog registration, provides catalog name, default database, database list and properties in the Catalog. */ @PublicEvolving interface RegisterCatalogEvent { String catalog(); Catalog catalog(); } /* Event for catalog unregistration. */ @PublicEvolving interface UnregisterCatalogEvent { String catalog(); boolean ignoreIfNotExists(); } /* Event for database creation, provides catalog name, database name, comment and properties of the database. */ @PublicEvolving interface CreateDatabaseEvent { String catalog(); String name(); CatalogDatabase database(); boolean ignoreIfExists(); } /* Event for dropping database. */ @PublicEvolving interface DropDatabaseEvent { String catalog(); String name(); boolean ignoreIfExists(); } /* Table information event, provides column list, primary keys, partition keys, watermarks and properties in the table. The table can be source or sink. */ @PublicEvolving interface TableEvent { ObjectIdentifier identifier(); CatalogBaseTable table(); } /* Event for table creation. */ @PublicEvolving interface CreateTableEvent extends TableEvent { boolean ignoreIfExists(); } /* Event for altering table, provides all information in old table and new table. */ @PublicEvolving interface AlterTableEvent extends TableEvent { CreateTableEvent newTable(); boolean ignoreIfExists(); } /* Event for dropping table. */ @PublicEvolving interface DropTableEvent { ObjectIdentifier identifier(); boolean ignoreIfExists(); } /* Event for before and after job is submitted, provides source tables and sink tables. */ @PublicEvolving interface JobSubmissionEvent { JobID jobId(); String jobName(); List<TableEvent> sources(); List<TableEvent> sinks(); } }
JobExecutionListener
JobExecutionListener
listens to the changed status in the job. JobManager
creates event for each status of job, and notifies JobExecutionListener
. Users can implement different listeners according to their needs, such as Datahub listener or Atlas listener.
/** * When job status is changed in job manager, it will generate job event and notify job execution listener. */ @PublicEvolving public interface JobExecutionListener { /* Event fired after job has been created. */ void onCreated(JobCreatedEvent createdEvent); /* Event fired after job has beed finished. */ void onFinished(JobFinishedEvent finishedEvent); /* Event fired after job has beed canceled. */ void onCanceled(JobCanceledEvent canceledEvent); /* Event fired after job has beed failed. */ void onFailed(JobFailedEvent failedEvent); /* Event fired after checkpoint has been started. */ void onCheckpointStarted(CheckpointEvent checkpointEvent); /* Event fired after checkpoint has been completed. */ void onCheckpointCompleted(CheckpointEvent checkpointEvent); /* Event fired after checkpoint has been aborted. */ void onCheckpointAborted(CheckpointEvent checkpointEvent); /* Column information for vertex inputs and outputs. */ @PublicEvolving interface VertexColumn { /* The column name. */ String name(); /* The column type such as INT/BIGINT/FLOAT/DOUBLE and etc. */ String type(); } /* Vertex in job plan, provides id, name, parallelism, input edges and output columns. */ @PublicEvolving interface JobVertexInfo { String id(); String name(); String operatorName(); String operatorDescription(); int parallelism(); String invokableClassName(); boolean supportsConcurrentExecutionAttempts(); List<JobEdgeInfo> inputs(); List<VertexColumn> outputColumns(); } /* Edge in job plan, provides source/target vertex, input columns, distribution, isBroadcast and isForward. */ @PublicEvolving interface JobEdgeInfo { JobVertexInfo source(); JobVertexInfo target(); /* Input columns for the edge. */ List<VertexColumn> inputColumns(); String distribution(); String shipStrategyName(); boolean isBroadcast(); boolean isForward(); String preProcessingOperationName(); String operatorLevelCachingDescription(); } /* Job scan source vertex, provides source table name, input table columns and source type. */ @PublicEvolving interface JobScanSourceVertexInfo extends JobVertexInfo { /* `Catalog.Database.Table` format name. */ String sourceName(); /* Scan source type, Values or Table. */ String type(); /* Columns from source table in the source vertex. */ List<VertexColumn> sourceColumns(); /* Options for the source. */ Map<String, String> config(); } /* */ @PublicEvolving interface JobSinkVertexInfo extends JobVertexInfo { /* `Catalog.Database.Table` format name. */ String sinkName(); /* Sink type, CollectSink or ModifySink. */ String type(); /* Modify type, INSERT/UPDATE/DELETE. */ String modifyType(); /* Update mode, APPEND/RETRACT/UPSERT. */ String updateMode(); /* Columns to sink table in the sink vertex. */ List<VertexColumn> sinkColumns(); boolean overwrite(); Map<String, String> staticPartitions(); Map<String, String> config(); } /* Event for job status is changed. */ interface JobBaseEvent { JobID jobId(); String jobName(); /* Scheduler type such as Default/Adaptive/AdaptiveBatch. */ String scheduler(); /* Job execution mode, PIPELINED/PIPELINED_FORCED/BATCH/BATCH_FORCED. */ ExecutionMode executionMode(); /* Job type, BATCH or STREAMING. */ String jobType(); /* Timestamp for current job status. */ long timestamp(); } @PublicEvolving interface JobLogicalPlanInfo { /* Source list. */ List<JobSourceVertex> sources(); /* Sink list. */ List<JobSinkVertex> sinks(); /* Get all vertex list. */ List<JobVertexInfo> getVerticesSortedTopologicallyFromSources(); /* Get specific vertex by id. */ JobVertexInfo vertex(String id); } /* Event for job is created. */ @PublicEvolving interface JobCreatedEvent extends JobBaseEvent { JobLogicalPlanInfo plan(); } /* Event for job is finished. */ @PublicEvolving interface JobFinishedEvent extends JobBaseEvent { } /* Event for job is canceled. */ @PublicEvolving interface JobCanceledEvent extends JobBaseEvent { } /* Event for job is failed. */ @PublicEvolving interface JobFailedEvent extends JobBaseEvent { Throwable exception(); } /* Event for job checkpoint. */ @PublicEvolving interface CheckpointEvent extends JobBaseEvent { /* Snapshot type, checkpoint or savepoint. */ String snapshotType(); long checkpoint(); @Nullable String externalSavepointLocation(); boolean isPeriodic; long timestamp(); Map<String, String> config(); } }
Config Customized Listener
Users should add their listeners to the classpath of client and flink cluster, and config them in the following options
# Config job deployment listeners. table.job.deployment.listeners: {job deployment listener class1},{job deployment listener class2} # Config job execution listeners. jobmanager.execution.listeners: {job execution listener class1},{job execution listener class2}
Proposed Changes
Changes for JobDeploymentListener
TableEnvironmentImpl
creates customized JobDeploymentListener
according to the option table.job.deployment.listeners
, and put the listener into CatalogManager
and AbstractCatalog
. TableEnvironmentImpl
can receive existing listeners in constructor with CatalogManager
too, which can be used in some other classes such sql gateway. When DDL related operations are executed in CatalogManager
and AbstractCatalog
, they should notify the listener.
TableEnvironmentImpl
will submit the job after it is created, it notifies the listener before and after the job is submitted.
Changes for JobExecutionListener
Flink sql or table jobs are created from Planner
which contains exec nodes, then it is converted to Operation
, Transformation
and StreamGraph
. Finally, the jobs are submitted as JobGraph
and job managers create ExecutionGraph
from it. The operations of source/sink list are as follows.
SourceScan
in Planner
contains source information such as table name, fields and configurations. But these information is hidden in the Source
which is an interface when the SourceScan
is converted to Transformation
. We should add source information in the conversion of SourceScan
->Operation->Transformation
->StreamNode
.
Similar to sources, Sink
and DataStreamSink
contain sink information such as table names and configuration. We should add sink information in the conversion of Sink
->Operation->Transformation
->StreamNode
, then we can add Map<JobVertexID, JobSinkVertexInfo> sources
in JobGraph
and ExecutionGraph
too.
After completing the above changes, JobManager
can create JobLogicalPlanInfo
from JobGraph
for JobExecutionListener
. When the status of job is changed, DefaultExecutionGraph
in JobManager
will notify the listener. At the same time, this listener will also listen to the execution of checkpoint. When CheckpointCoordinator
starts/completes/aborts a specific checkpoint, it will notify the listener too.
Plan For The Future
- We add column relationships between job vertex in JobLogicalPlanInfo, but it is not supported in Flink at present. We'd like to implement them in the next FLIP.
Source/Sink relationships for SQL/Table jobs are supported,
DataStream
jobs will be supported later.Currently we only supports scan source, lookup join source should be supported later.
Add Job vertex listener for batch mode, such as scheduling and execution status of vertex, execution status of subtask, etc.
[2] https://atlas.apache.org/#/
[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store