Status
Discussion thread | - |
---|---|
Vote thread | - |
JIRA |
|
Release | - |
Motivation
Flink ETL job consumes data from Source and produces result to Sink. Source creates relationship with Sink through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].
This FLIP aims to introduce listener interfaces in Flink, users can implement them to report the meta data to external systems. The main information is as follows
1. Source and Sink information, such as table name, fields, partition keys, primary keys, watermarks, configurations
2. Job information, such as job id/name, job type, job configuration
3. Relationship between Source/Sink and jobs, such as source and sink and their column lineages.
4. Job execution information, such as job status
Public Interfaces
CatalogModificationListener
DDL operations such as create/alter/drop tables will generate different events and notify CatalogModificationListener
. All events for CatalogModificationListener
extend the basic CatalogEvent
and listeners can get catalog from it. Some general events for database/table are defined as follows and more events can be implemented based on the requirements in the future.
/** The basic listener for lineage in Flink, all lineage related listener should extend this interface. */ public interface LineageListener {} /** * Different events will be fired when a catalog/database/table is modified. The customized listener can get and report specific information from the event according to the event type. */ @PublicEvolving public interface CatalogModificationListener extends LineageListener { /* Event fired after a catalog/database/table is modified. */ void onEvent(CatalogEvent catalogEvent); /* The basic class for catalog related event. */ public abstract class CatalogEvent { /* The catalog of the event. */ Catalog catalog(); /* The name of catalog. */ String catalogName(); } /* The basic class for database related event. */ public abstract class BaseDatabaseEvent extends CatalogEvent { String databaseName(); } /* Event for creating database. */ @PublicEvolving public class CreateDatabaseEvent extends BaseDatabaseEvent { CatalogDatabase database(); boolean ignoreIfExists(); } /* Event for altering database. */ @PublicEvolving public class AlterDatabaseEvent extends BaseDatabaseEvent { CatalogDatabase newDatabase(); boolean ignoreIfNotExists(); } /* Event for dropping database. */ @PublicEvolving public class DropDatabaseEvent extends BaseDatabaseEvent { boolean ignoreIfExists(); } /* Base table event, provides column list, primary keys, partition keys, watermarks and properties in CatalogBaseTable. The table can be source or sink. */ public abstract class BaseTableEvent extends CatalogEvent { ObjectIdentifier identifier(); CatalogBaseTable table(); /* Get the storage identifier for the table.*/ StorageIdentifier storage() } /* Event for creating table. */ @PublicEvolving public class CreateTableEvent extends BaseTableEvent { boolean ignoreIfExists(); } /* Event for altering table, provides all changes for old table. */ @PublicEvolving public class AlterTableEvent extends BaseTableEvent { List<TableChange> tableChanges(); boolean ignoreIfExists(); } /* Event for dropping table. */ @PublicEvolving public class DropTableEvent extends BaseTableEvent { boolean ignoreIfExists(); } } /* Factory for catalog listener. */ @PublicEvolving public interface CatalogModificationListenerFactory { public CatalogModificationListener createListener(Configuration configuration, ClassLoader classLoader); }
Users may create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1
and hive_catalog2
for the same metastore. The tables hive_catalog1.my_database.my_table
and hive_catalog2.my_database.my_table
are the same table in hive metastore.
In addition, there are two table types: persistent table
and temporal table
. The persistent table
can be identified by catalog and database above, while the temporal table
can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka
.
StorageIdentifier is introduced to address these issues, listeners can get it from table related events.
/* Storage identifier for different physical table. */ @PublicEvolving public class StorageIdentifier { /* Storage type such as kafka, hive, iceberg or paimon. */ String type(); /* Properties for the storage identifier, users can get value from different keys for different storages, such as broker and topic for kafka. */ Map<String, String> properties(); }
Different storages put their options in the properties according to dynamic source and sink, users get option value with them too. Flink Table/SQL jobs can get options from table properties automatically, and users need to add them manually for DataStream jobs. Flink has many connectors, and we given the example of kafka options here.
/* Kafka storage identifier options. */ "properties.bootstrap.servers" for Kafka bootstrap servers "topic" for Kafka Topic "properties.group.id" for Kafka group id "topic-pattern" for Kafka topic pattern
For some sensitive information, users can encode and desensitize it in their own implemented listeners.
JobStatusChangedListener
Flink creates events and notify JobStatusChangedListener
when status of job is changed. There are two types of job status event for the listener: JobCreatedEvent
and JobExecutionStatusEvent
. JobCreatedEvent
will be fired when job is created, it has job lineage and listener can create lineages for source and sink. JobExecutionStatusEvent
has old and new job statuses in runtime and listener can even delete the lineages when job goes to termination.
/** * When job status is changed, Flink will generate job event and notify job status changed listener. */ @PublicEvolving public interface JobStatusChangedListener { /* Event fired after job status has been changed. */ void onEvent(JobStatusEvent jobStatusEvent); /** Basic job status event. */ abstract class JobStatusEvent { JobID jobId(); String jobName(); } /** Job created event with job lineage. */ @PublicEvolving class JobCreatedEvent extends JobStatusEvent { /* Lineage for the current job. */ JobLineage lineage(); /* Job type, TableOrSQL or DataStream. */ String jobType(); /* Job execution type, BATCH or STREAMING. */ String executionType(); /* Job configuration. */ Map<String, String> config(); } /** Job status changed event for runtime. */ @PublicEvolving class JobExecutionStatusEvent extends JobStatusEvent { JobStatus oldStatus(); JobStatus newStatus(); @Nullable Throwable exception(); } /* Factory for job execution listener. */ @PublicEvolving public interface JobStatusChangedListenerFactory { public JobStatusChangedListener createListener(Configuration configuration, ClassLoader classLoader); }
Job lineage is divided into two layers: the first layer is global abstraction for all Flink jobs and connectors, and the second layer defines the lineages of Table/Sql jobs and DataStream independently based on the first one.
/** * Job lineage is built according to StreamGraph. Users can get sources, sinks and the relationship between nodes from lineage. */ @PublicEvolvig public interface JobLineage { /* Source lineage list. */ List<SourceLineage> sources(); /* Sink lineage list. */ List<SinkLineage> sinks(); } /** Base connector lineage interface. */ public interface ConnectorLineage { StorageIdentifier connector(); } /** Base source lineage. */ @PublicEvolving public interface SourceLineage extends ConnectorLineage {} /** Base sink lineage. */ @PublicEvolving public interface SinkLineage extends ConnectorLineage { List<SourceLineage> sources(); }
Job lineage for Table/SQL job
For Table/SQL jobs, Flink creates table lineages according to tables for source and sink. There're column lineages in table lineages, and Flink jobs can create the dependencies between source and sink columns. Flink creates these lineages for Table/SQL jobs from job planner, the entire processing has nothing to do with users.
/** Basic table lineage and listeners can get catalog and table from it. */ public abstract class TableLineage implements ConnectorLineage { /* The catalog of the table lineage. */ public Catalog catalog(); /* The table of the table lineage. */ public CatalogBaseTable table(); } /** Source lineage for table. */ @PublicEvolving public class TableSourceLineage extends TableLineage implements SourceLineage { public StorageIdentifier connector(); /* Output columns for the source and the detailed column information such as data type are in the table. */ public List<String> columns(); } /** Sink lineage for table. */ @PublicEvolving public class TableSinkLineage extends TableLineage implements SinkLineage { public StorageIdentifier connector(); /* Modify type, INSERT/UPDATE/DELETE. */ String modifyType(); /* Update mode, APPEND/RETRACT/UPSERT. */ String updateMode(); boolean overwrite(); /* The source lineages for the sink table. */ public List<SourceLineage> sources(); /* The output columns for the sink table. */ public List<String> columns(); /* The source column lineages for each target column in sink table, this will be supported by Flink in the future. */ public Map<String, List<TableColumnLineage>> columnLineages(); /* Source table and columns for the target column in sink lineage. Multiple source table columns would generate one sink column. */ @PublicEvolving public class TableColumnLineage { /* The source table for column lineage. */ public TableSourceLineage source(); /* The columns in source lineage for column lineage. */ public List<String> columns(); } }
Job lineage for DataStream job
The data structures of connectors in DataStream
jobs are much more complex than in Table/SQL jobs, they may be tables, user customized POJO classes or even vector for ML jobs. We added lineage related methods to DataStreamSource
and DataStreamSink
, allowing users to define job lineages using TaleSourceLineage
and TableSinkLineage
if the connectors are tables, or to implement customized SourceLineage
and SinkLineage
based on their specific requirements.
/** * Add set source lineage method in data stream source. */ @Public public class DataStreamSource { private SourceLineage lineage; public DataStreamSource setLineage(SourceLineage lineage); } /** * Add set sink lineage method in data stream sink. */ @Public public class DataStreamSink { private SinkLineage lineage; public DataStreamSink setLineage(SinkLineage lineage); }
Config Customized Listener
Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options
# Config for catalog modification listeners. lineage.catalog-modification.listeners: {job catalog listener factory1},{job catalog listener factory2} # Config for job status changed listeners. lineage.job-status-changed.listeners: {job status changed listener factory1},{job status changed listener factory2}
Proposed Changes
Use case of job lineage
1. User customized lineage for DataStream
Users can implement customized source and sink lineages for datastream job, for example, implement VectorSourceLineage
and VectorSinkLineage
for kafka source and sink as follows.
/** User defined vector source lineage. */ public class VectorSourceLineage implements SourceLineage { /* The capacity of source lineage. */ int capacity(); /* The value type in the vector. */ String valueType(); } /** User defined vector sink lineage. */ public class VectorSinkLineage implements SinkLineage { List<SourceLineage> sources(); int capacity(); String valueType(); } /* User can use vector source/sink lineages in datastream job. */ Map<String, String> kafkaSourceConf = ...; Map<String, String> kafkaSinkConf = ...; StreamExecutionEnvironment env = ...; VectorSourceLineage sourceLineage = new VectorSourceLineage(new StorageIdentifier("kafka", kafkaSourceConf), 10, "int"); env.fromSource(...).setLineage(sourceLineage) .map(...).keyBy(..). .sinkTo(...).setLineage(new VectorSinkLineages(new StorageIdentifier("kafka", kafkaSinkConf), 20, "int", Arrays.asList(sourceLineage))); env.execute();
After that, users can cast the source and sink lineages to vector lineage, get capacity and value type in the customized listeners.
2. Connectors identified by StorageIdentifier
Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink jobs as follows.
a) Create Kafka and Paimon tables for Flink SQL job
-- Create a table my_kafka_table1 for kafka topic 'kafka_topic' CREATE TABLE my_kafka_table1 ( val1 STRING, val2 STRING, val3 STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'kafka_topic', 'properties.bootstrap.servers' = 'kafka_bootstrap_server', 'properties.group.id' = 'kafka_group', 'format' = 'json' ); -- Create a Paimon catalog and table for warehouse 'paimon_path1' CREATE CATALOG paimon_catalog WITH ( 'type'='paimon', 'warehouse'='paimon_path1' ); USE CATALOG paimon_catalog; CREATE TABLE my_paimon_table ( val1 STRING, val2 STRING, val3 STRING ) WITH (...); -- Insert data to Paimon table from Kafka INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table1 WHERE ...;
b) Create another Kafka and Paimon tables for Flink SQL job
-- Create another table my_kafka_table2 for kafka topic 'kafka_topic' which is same as my_kafka_table1 above CREATE TABLE my_kafka_table2 ( val1 STRING, val2 STRING, val3 STRING ) WITH ( 'connector' = 'kafka', 'topic' = 'kafka_topic', 'properties.bootstrap.servers' = 'kafka_bootstrap_server', 'properties.group.id' = 'kafka_group', 'format' = 'json' ); -- Create a Paimon catalog with the same name 'paimon_catalog' for different warehouse 'paimon_path2' CREATE CATALOG paimon_catalog WITH ( 'type'='paimon', 'warehouse'='paimon_path2' ); USE CATALOG paimon_catalog; CREATE TABLE my_paimon_table ( val1 STRING, val2 STRING, val3 STRING ) WITH (...); -- Insert data to Paimon table from Kafka INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table2 WHERE ...;
c) Create Mysql table for Flink SQL job
-- Create two catalogs for warehouse 'paimon_path1' and 'paimon_path2', there are two different tables 'my_paimon_table' CREATE CATALOG paimon_catalog1 WITH ( 'type'='paimon', 'warehouse'='paimon_path1' ); CREATE CATALOG paimon_catalog2 WITH ( 'type'='paimon', 'warehouse'='paimon_path2' ); -- Create mysql table CREATE TABLE mysql_table ( val1 STRING, val2 STRING, val3 STRING ) WITH ( 'connector' = 'jdbc', 'url' = 'jdbc:mysql://HOST:PORT/my_database', 'table-name' = 'my_mysql_table', 'username' = '***', 'password' = '***' ); -- Insert data to mysql table from two paimon tales INSERT INTO mysql_table SELECT ... FROM paimon_catalog1.default.my_paimon_table JOIN paimon_catalog2.default.my_paimon_table ON ... WHERE ...;
After completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by StorageIdentifier. These tables are associated through Flink jobs, users can report the tables and relationships to datahub[1] as an example which is shown below
Changes for CatalogModificationListener
TableEnvironmentImpl
creates customized CatalogModificationListener
according to the option lineage.catalog-modification.listeners
, and build CatalogManager
with the listeners. Some other components such as Sql-Gateway can create CatalogManager
with the listeners themselves. The database related operations are in Catalog
, then the listeners are added in AbstractCatalog
and users can notify them after database related operations in their customized catalog.
/* Listeners and related operations in the catalog manager. */ public final class CatalogManager { private final List<CatalogModificationListener> listeners; /* Create catalog manager with listener list. */ private CatalogManager( String defaultCatalogName, Catalog defaultCatalog, DataTypeFactory typeFactory, ManagedTableListener managedTableListener, List<CatalogModificationListener> listeners); /* Notify the listeners with given catalog event. */ private void notify(CatalogEvent event) { listeners.forEach(listener -> listener.onEvent(event)); } /* Notify listener for tables. */ public void createTable/dropTable/alterTable(...) { ....; notify(Create Different Table Event); } /* Builder for catalog manager. */ public static final class Builder { Builder listeners(List<CatalogModificationListener> listeners); } } /* Listeners and related operations in AbstractCatalog. */ public abstract class AbstractCatalog implements Catalog { private final List<CatalogModificationListener> listeners; /* Create the catalog with listeners. */ public AbstractCatalog(String name, String defaultDatabase, List<CatalogModificationListener> listeners); /** * Notify the listeners with given database event, after the customized implementation of AbstractCatalog create/alter/drop a database, * it can create the specific event and call the notify method. */ protected void notify(BaseDatabaseEvent event) { for (CatalogModificationListener listener : listeners) { listener.onEvent(event); } } } /* Create default catalog context with listeners. */ public DefaultCatalogContext { public DefaultCatalogContext( String name, Map<String, String> options, ReadableConfig configuration, ClassLoader classLoader, List<CatalogModificationListener> listeners); /* Get catalog event listeners from the context. */ public List<CatalogModificationListener> listeners() { return listeners; } }
Changes for JobStatusChangedListener
Build JobLogicalPlan in StreamGraph
Flink creates Planner
for sql and table jobs which contains exec nodes, then the planner will be converted to Transformation
and StreamGraph
. DataStream
jobs are similar with SQL, Flink creates datastream from environment and converted it to Transformation
and StreamGraph
. The job conversion is shown as followed.
There is a graph structure in StreamGraph
, we can create JobLogicalPlan
based on StreamGraph
easily.
For table and SQL jobs, Flink translates planner exec node to Transformation
in method ExecNodeBase.translateToPlanInternal
for source and sink. There's resolved table in source and sink nodes, Flink can create source and sink lineage based on the table and save them in source/sink transformations.
For DataStream
jobs, DataStreamSource
set the source lineage to source transformation in setLineage
method, and DataStreamSink
does the same thing.
Finally source and sink transformations are translated and added in StreamGraph
by SourceTransformationTranslator.translateInternal
and SinkTransformationTranslator.translateInternal
, where source and sink lineages information can be added to StreamGraph
too.
Create and notify listeners in RestClusterClient
RestClusterClient
can read option lineage.job-status-changed.listeners
from Configuration
and create JobStatusChangedListener
. RestClusterClient.submitJob
is used in AbstractSessionClusterExecutor
to submit job, which will convert Pipeline
to JobGraph
. AbstractSessionClusterExecutor
can set StreamGraph
in RestClusterClient
before calling its submitJob
method, then the RestClusterClient
can get the StreamGraph
and notify the listener before JobGraph
is submitted.
/* Set pipeline to client before it submit job graph. */ public class AbstractSessionClusterExecutor { public CompletableFuture<JobClient> execute( @Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration, @Nonnull final ClassLoader userCodeClassloader) throws Exception { .... final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor.retrieve(clusterID); ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient(); // Set pipeline to the cluster client before submit job graph clusterClient.setPipeline(pipeline); return clusterClient .submitJob(jobGraph) .... } } /* Create job submission event and notify listeners before it submit job graph. */ public class RestClusterClient { private final List<JobStatusChangedListener> listeners; private Pipeline pipeline; @Override public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) { // Create event and notify listeners before the job graph is submitted. JobSubmissionEvent event = createEventFromPipeline(pipeline); if (event != null) { listeners.forEach(listener -> listener.onEvent(event)); } ....; } }
Job status changed in job manager
JobManager
can create JobStatusChangedListener
in DefaultExecutionGraph
according to option lineage.job-status-changed.listeners
in job configuration. Currently JobManager
will call DefaultExecutionGraph.transitionState
when the job status changes, JobStatusChangedListener
can be notified in the method as follows.
/* Set pipeline to client before it submit job graph. */ public class DefaultExecutionGraph { private final List<JobStatusChangedListener> executionListeners; private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) { ....; notifyJobStatusHooks(newState, error); // notify job execution listeners notifyJobStatusChangedListeners(current, newState, error); ....; } private void notifyJobStatusChangedListeners(JobStatus current, JobStatus newState, Throwable error) { JobStatusEvent event = create job status event; executionListeners.forEach(listener -> listener.onEvent(event)); } }
Listener Execution
Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.
Plan For The Future
- We add column lineages in job lineage, but it is not supported in Flink at present. We'd like to implement them in the next FLIP.
Currently we only supports scan source, lookup join source should be supported later.
Add Job vertex listener for batch mode, such as scheduling and execution status of vertex, execution status of subtask, etc.
- Save and report job lineage in JobManager, and supports pulling job lineage from REST api.
[2] https://atlas.apache.org/#/
[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store