...
4. Job execution status changed information, such as job status, exception.
This FLIP focuses on customized metadata listener in Flink and job lineage related listener will be in FLIP-314 [4]
Public Interfaces
CatalogModificationListener
...
For some sensitive information, users can encode and desensitize them in their customized listeners.
Config Customized Listener
Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options
Code Block |
---|
# Config for catalog modification listeners.
table.catalog-modification.listeners: {table catalog listener factory1},{table catalog listener factory2} |
Proposed Changes
Changes for CatalogModificationListener
TableEnvironmentImpl
creates customized CatalogModificationListener
according to the option lineage.catalog-modification.listeners
, and build CatalogManager
with the listeners. Some other components such as Sql-Gateway can create CatalogManager
with the listeners themselves. Currently all table related operations such as create/alter are in CatalogManager
, but database operations are not. We can add database modification operations in CatalogManager
and notify the specified listeners for tables and databases.
JobStatusChangedListener
Flink creates events and notify JobStatusChangedListener
when status of job is changed. There are two types of job status event for the listener: JobCreatedEvent
and JobExecutionStatusEvent
. JobCreatedEvent
will be fired when job is created, it has job lineage and the listener can create lineages for source and sink. JobExecutionStatusEvent
has old and new job statuses in runtime and listener can even delete the lineages when job goes to termination.
Code Block |
---|
/**
* When job is created or its status is changed, Flink will generate job event and notify job status changed listener.
*/
@PublicEvolving
public interface JobStatusChangedListener {
/* Event will be fired when job status is changed. */
public void onEvent(JobStatusChangedEvent event);
}
/** Basic job status event. */
@PublicEvolving
public abstract class JobStatusChangedEvent {
JobID jobId();
String jobName();
}
/** Job created event with job lineage. */
@PublicEvolving
public class JobCreatedEvent extends JobStatusChangedEvent {
/* Lineage for the current job. */
JobLineage lineage() |
Code Block |
/* Listeners and related operations in the catalog manager. */ public final class CatalogManager { private final List<CatalogModificationListener> listeners; /* CreateJob catalogtype, managerTableOrSQL withor listener listDataStream. */ privateString CatalogManagerjobType(); /* Job execution type, BATCH or STREAMING. */ String defaultCatalogName, String executionType(); Catalog defaultCatalog, /* Job configuration. */ Map<String, String> config(); DataTypeFactory typeFactory, ManagedTableListener managedTableListener, } /** Job status changed event for runtime. */ @PublicEvolving public class JobExecutionStatusEvent extends JobStatusChangedEvent { JobStatus oldStatus(); JobStatus newStatus(); List<CatalogModificationListener> listeners@Nullable Throwable exception(); } /** NotifyFactory thefor listenersjob withstatus givenchanged catalog eventlistener. */ @PublicEvolving public private void notify(CatalogEvent event)interface JobStatusChangedListenerFactory { JobStatusChangedListener createListener(Configuration config, listeners.forEach(listener -> listener.onEvent(event)); } /* Notify listener for tables. */ public void createTable/dropTable/alterTable(...) { ....; notify(Create Different Table Modification Event); }ClassLoader classloader); } |
Job lineage is divided into two layers: the first layer is global abstraction for all Flink jobs and connectors, and the second layer defines the lineages for Table/Sql and DataStream independently based on the first one.
Code Block |
---|
/** * Job lineage is built according to StreamGraph. Users can get sources, sinks and relationships from lineage. */ @PublicEvolvig public class JobLineage { /* Source lineage entity list. */ List<LineageEntity> sources(); /* Sink lineage entity list. */ List<LineageEntity> sinks(); /* AddLineage databaserelations ddlsfrom andsources notify listener for databasesto sinks. */ public void createDatabase/dropDatabase/alterDatabase(...) { ....; notify(Create Different Database Modification Event); } /* Add listeners in Builder for catalog manager. */ public static final class Builder { Builder listeners(List<CatalogModificationListener> listeners); } } |
Listener Execution
...
List<LineageRelation> relations();
}
@PublicEvolving
public class LineageEntity {
/* Config for the lineage entity. */
Map<String, String> config();
}
/** Lineage relation from source to sink. */
@PublicEvolving
public class LineageRelation {
LineageEntity source();
LineageEntity sink();
} |
Job lineage for Table/SQL job
For Table/SQL jobs, Flink creates table lineages according to tables for source and sink. There're column lineages in table lineages, and Flink jobs can create the dependencies between source and sink columns. Flink creates these lineages for Table/SQL jobs from job plan, the entire processing has nothing to do with users.
Code Block |
---|
/** Basic table lineage entity which has catalog context and table in it. */
public class TableLineageEntity extends LineageEntity {
/* The catalog context of the table lineage entity. */
public CatalogContext catalogContext();
/* The table of the table lineage entity. */
public CatalogBaseTable table();
}
/** Source lineage entity for table. */
@PublicEvolving
public class TableSourceLineageEntity extends TableLineageEntity {
}
/** Sink lineage entity for table. */
@PublicEvolving
public class TableSinkLineageEntity extends TableLineageEntity {
/* Modify type, INSERT/UPDATE/DELETE. */
String modifyType();
/* Update mode, APPEND/RETRACT/UPSERT. */
String updateMode();
boolean overwrite();
}
/* Table lineage relations from source table to sink table. */
@PublicEvolving
public class TableLineageRelation extends LineageRelation {
/* Table column lineage relations from source to sink. */
List<TableColumnLineageRelation> columnRelations();
}
/* Column lineage from source table to sink table. */
@PublicEvolving
public class TableColumnLineageRelation {
/* Source table column. */
String sourceColumn();
/* Sink table column. */
String sinkColumn();
} |
Job lineage for DataStream job
The data structures of connectors in DataStream
jobs are much more complex than in Table/SQL jobs, they may be tables, user customized POJO classes or even vector for ML jobs. We added setLineageEntity
in DataStreamSource
and DataStreamSink
which allows users to define job lineage entity using TaleSourceLineageEntity
and TableSinkLineageEntity
if the connectors are tables, or to implement customized LineageEntity
for source and sink based on their specific requirements.
Code Block |
---|
/**
* Add setLineageEntity method in data stream source.
*/
@Public
public class DataStreamSource {
private LineageEntity lineageEntity;
public DataStreamSource setLineageEntity(LineageEntity lineageEntity);
}
/**
* Add setLineageEntity and addLineageRelations methods in data stream sink.
*/
@Public
public class DataStreamSink {
private LineageEntity lineageEntity;
public DataStreamSink setLineageEntity(LineageEntity lineageEntity);
/* Add lineage relations for data stream jobs. */
public DataStreamSink addLineageRelations(LineageRelation ... relations);
} |
Config Customized Listener
Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options
Code Block |
---|
# Config for catalog modification listeners.
lineage.catalog-modification.listeners: {job catalog listener factory1},{job catalog listener factory2}
# Config for job status changed listeners.
lineage.job-status-changed.listeners: {job status changed listener factory1},{job status changed listener factory2} |
Proposed Changes
Use case of job lineage
1. User customized lineage for DataStream
Users can implement customized source and sink lineages for datastream job, for example, KafkaVectorLineageEntity
for kafka source and sink as follows.
Code Block |
---|
/** User defined vector source and sink lineage entity. */
public class KafkaVectorLineageEntity extends LineageEntity {
/* The capacity of source lineage. */
int capacity();
/* The value type in the vector. */
String valueType();
}
/* User can use vector source/sink lineages in datastream job. */
Map<String, String> kafkaSourceConf = ...;
Map<String, String> kafkaSinkConf = ...;
StreamExecutionEnvironment env = ...;
KafkaVectorLineageEntity sourceLineageEntity = new KafkaVectorLineageEntity(10, "int", kafkaSourceConf);
KafkaVectorLineageEntity sinkLineageEntity = new KafkaVectorLineageEntity(20, "double", kafkaSinkConf);
env.fromSource(...)
.setLineageEntity(sourceLineageEntity) // Set source lineage entity
.map(...).keyBy(..).reduce(..)...
.sinkTo(...)
.setLineageEntity(sinkLineageEntity) // Set sink lineage entity
.addLineageRelations( // Add lineage relations from sources to the current sink
new LineageRelation(sourceLineageEntity, sinkLineageEntity);
env.execute(); |
After that, users can cast the source and sink lineage entities to vector lineage entity, get capacity and value type in the customized listeners.
2. Connectors identified by connector identifier
Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink SQL jobs as follows.
a) Create Kafka and Paimon tables for Flink SQL job
Code Block |
---|
-- Create a table my_kafka_table1 for kafka topic 'kafka_topic'
CREATE TABLE my_kafka_table1 (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_topic',
'properties.bootstrap.servers' = 'kafka_bootstrap_server',
'properties.group.id' = 'kafka_group',
'format' = 'json'
);
-- Create a Paimon catalog and table for warehouse 'paimon_path1'
CREATE CATALOG paimon_catalog WITH (
'type'='paimon',
'warehouse'='paimon_path1'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (...);
-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table1 WHERE ...; |
b) Create another Kafka and Paimon tables for Flink SQL job
Code Block |
---|
-- Create another table my_kafka_table2 for kafka topic 'kafka_topic' which is same as my_kafka_table1 above
CREATE TABLE my_kafka_table2 (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_topic',
'properties.bootstrap.servers' = 'kafka_bootstrap_server',
'properties.group.id' = 'kafka_group',
'format' = 'json'
);
-- Create a Paimon catalog with the same name 'paimon_catalog' for different warehouse 'paimon_path2'
CREATE CATALOG paimon_catalog WITH (
'type'='paimon',
'warehouse'='paimon_path2'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (...);
-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table2 WHERE ...; |
c) Create Mysql table for Flink SQL job
Code Block |
---|
-- Create two catalogs for warehouse 'paimon_path1' and 'paimon_path2', there are two different tables 'my_paimon_table'
CREATE CATALOG paimon_catalog1 WITH (
'type'='paimon',
'warehouse'='paimon_path1'
);
CREATE CATALOG paimon_catalog2 WITH (
'type'='paimon',
'warehouse'='paimon_path2'
);
-- Create mysql table
CREATE TABLE mysql_table (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://HOST:PORT/my_database',
'table-name' = 'my_mysql_table',
'username' = '***',
'password' = '***'
);
-- Insert data to mysql table from two paimon tales
INSERT INTO mysql_table
SELECT ... FROM paimon_catalog1.default.my_paimon_table
JOIN paimon_catalog2.default.my_paimon_table
ON ... WHERE ...; |
After completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by connector identifier. These tables are associated through Flink jobs, users can report the tables and relationships to datahub[1] as an example which is shown below
Changes for CatalogModificationListener
TableEnvironmentImpl
creates customized CatalogModificationListener
according to the option lineage.catalog-modification.listeners
, and build CatalogManager
with the listeners. Some other components such as Sql-Gateway can create CatalogManager
with the listeners themselves. Currently all table related operations such as create/alter are in CatalogManager
, but database operations are not. We can add database modification operations in CatalogManager
and notify the specified listeners for tables and databases.
Code Block |
---|
/* Listeners and related operations in the catalog manager. */
public final class CatalogManager {
private final List<CatalogModificationListener> listeners;
/* Create catalog manager with listener list. */
private CatalogManager(
String defaultCatalogName,
Catalog defaultCatalog,
DataTypeFactory typeFactory,
ManagedTableListener managedTableListener,
List<CatalogModificationListener> listeners);
/* Notify the listeners with given catalog event. */
private void notify(CatalogEvent event) {
listeners.forEach(listener -> listener.onEvent(event));
}
/* Notify listener for tables. */
public void createTable/dropTable/alterTable(...) {
....;
notify(Create Different Table Modification Event);
}
/* Add database ddls and notify listener for databases. */
public void createDatabase/dropDatabase/alterDatabase(...) {
....;
notify(Create Different Database Modification Event);
}
/* Add listeners in Builder for catalog manager. */
public static final class Builder {
Builder listeners(List<CatalogModificationListener> listeners);
}
} |
Changes for JobStatusChangedListener
Build JobLogicalPlan in StreamGraph
Flink creates Planner
for sql and table jobs which contains exec nodes, then the planner will be converted to Transformation
and StreamGraph
. DataStream
jobs are similar with SQL, Flink creates DataStream and converts it to Transformation
and StreamGraph
. The job conversion is shown as followed.
draw.io Diagram border true diagramName flow simpleViewer false links auto tbstyle top lbox true diagramWidth 882 revision 17
There is a graph structure in StreamGraph
, we can create JobLogicalPlan
based on StreamGraph
easily.
For table and SQL jobs, Flink translates source and sink exec nodes in planner to Transformation
in ExecNodeBase.translateToPlanInternal
. There's resolved table in source and sink nodes, Flink can create source and sink lineage based on the table and save them in source/sink transformations.
draw.io Diagram | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
For DataStream
jobs, DataStreamSource
set the source lineage to source transformation in setLineage
method, and DataStreamSink
does the same thing.
Finally source and sink nodes are translated from transformation and added to StreamGraph
in SourceTransformationTranslator.translateInternal
and SinkTransformationTranslator.translateInternal
, where their lineages information can be added to StreamGraph
too.
draw.io Diagram | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Create and notify listeners in RestClusterClient
RestClusterClient
can read option lineage.job-status-changed.listeners
from Configuration
and create JobStatusChangedListener
. RestClusterClient.submitJob
is used in AbstractSessionClusterExecutor
to submit job, which will convert Pipeline
to JobGraph
. AbstractSessionClusterExecutor
can set StreamGraph
in RestClusterClient
before calling the submitJob
method, then the RestClusterClient
can get the StreamGraph
and notify the listener before JobGraph
is submitted.
Code Block |
---|
/* Set pipeline to client before it submit job graph. */
public class AbstractSessionClusterExecutor {
public CompletableFuture<JobClient> execute(
@Nonnull final Pipeline pipeline,
@Nonnull final Configuration configuration,
@Nonnull final ClassLoader userCodeClassloader)
throws Exception {
....
final ClusterClientProvider<ClusterID> clusterClientProvider =
clusterDescriptor.retrieve(clusterID);
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
// Set pipeline to the cluster client before submit job graph
clusterClient.setPipeline(pipeline);
return clusterClient
.submitJob(jobGraph)
....
}
}
/* Create job submission event and notify listeners before it submit job graph. */
public class RestClusterClient {
private final List<JobStatusChangedListener> listeners;
private Pipeline pipeline;
@Override
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
// Create event and notify listeners before the job graph is submitted.
JobSubmissionEvent event = createEventFromPipeline(pipeline);
if (event != null) {
listeners.forEach(listener -> listener.onEvent(event));
}
....;
}
} |
Job status changed in job manager
JobManager
can create JobStatusChangedListener
in DefaultExecutionGraph
according to option lineage.job-status-changed.listeners
in job configuration. Currently JobManager
will call DefaultExecutionGraph.transitionState
when the job status is changed, JobStatusChangedListener
can be notified in the method as follows.
Code Block |
---|
/* Set pipeline to client before it submit job graph. */
public class DefaultExecutionGraph {
private final List<JobStatusChangedListener> executionListeners;
private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
....;
notifyJobStatusHooks(newState, error);
// notify job execution listeners
notifyJobStatusChangedListeners(current, newState, error);
....;
}
private void notifyJobStatusChangedListeners(JobStatus current, JobStatus newState, Throwable error) {
JobExecutionStatusEvent event = create job execution status event;
executionListeners.forEach(listener -> listener.onEvent(event));
}
} |
Listener Execution
Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.
Rejected Alternatives
Report lineage in JobListener
Currently there is JobListener
in Flink, users can implement custom listeners and obtain JobClient. We can add lineage information in JobListener
and users can report it. However, JobListener
is added in FLINK-14992 [4] which is used by Zeppelin and Notebook to manage the job. The lineage information of Flink job is static and we don't need JobClient
in the listener. So we reject this proposal and create JobStatusChangedListener
for lineage.
Pull job status in JobListener
We can pull the job status in JobListener
. As discussed in the previous thread [5], we do not want to do such things in our services too.
Submit job with lineage and Report it in JobMaster
In this FLIP the JobStatusChangedListener
will be in Client and JobMaster, which will report lineage information and job status independently. Another proposal is to put the lineage information into JobGraph
and then report it in JobMaster
. Considering that the lineage information may be relatively large and affect JobGraph
, we will report separately in the first phase and then consider merging in the future.
Plan For The Future
- We add column lineages in job lineage, but it is not supported in Flink at present. We'd like to implement them in the next FLIP.
Currently we only supports scan source in this FLIP, lookup join source should be supported later.
Add Job vertex listener for batch mode, such as scheduling and execution status of vertex, execution status of subtask, etc.
- Current client will notify listener to report job lineage, it should be reported in JobManager and supports in REST api in the future.
[2] https://atlas.apache.org/#/
[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store
[4] FLIP-314: Support Customized Job Lineage Listener https://issues.apache.org/jira/browse/FLINK-14992
[5] https://lists.apache.org/thread/ttsc6155bzrtkh8wkkxg2dj689jtvh4t