Status
Discussion thread |
---|
https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx |
Vote thread |
---|
https://lists.apache.org/thread/5f9806kw1q01536wzwgx1psh7jhmfmjw | |||||||||
JIRA |
|
---|
| ||
Release |
---|
1.18 |
Motivation
Flink ETL job consumes data from Source Table and produces result to Sink Table. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].
This FLIP aims We would like to introduce listeners interface listener interfaces in Flink, users can implement them to report the progress of jobs and meta data and lineage to external systems. Flink SQL and Table jobs are supported in the first stage, and DataStream will be consider in the future. The main information is as followsThe main information is as follows
1. Source and Sink information, 1. Source and Sink information, such as table name, fields, partition keys, primary keys, watermarks, configurations
2. Job information, such as job id/name, execution mode, scheduler job type, logical planjob configuration
3. Relationship between Source/Sink and jobs, such as source and sink tables, columns in tables for joband their column lineages.
4. Job execution status changed information, such as job status, checkpoints
Public Interfaces
CatalogEventListener
exception.
This FLIP focuses on customized meta data listener and customized job lineage listener will be introduced in FLIP-314 [4]
Public Interfaces
CatalogModificationListener
DDL DDL operations such as create/alter/drop tables and etc will generate different events and notify CatalogEventListener
CatalogModificationListener
. All events for CatalogEventListener
CatalogModificationListener
extend the basic BaseCatalogEvent
CatalogModificationEvent
and listeners can get catalog from it. Some general events for database/table are defined as follows and more events can be implemented based on the requirements in the future.
Code Block |
---|
/** * Different events will be fired when a catalog/database/table is modified. The customized listener can receiveget theseand events* andreport thenspecific doinformation somefrom specificthe operationsevent according to the event type. */ @PublicEvolving public interface CatalogEventListenerCatalogModificationListener { /* Event* The event will be fired afterwhen athe catalog/database/table is modified. */ void onEvent(CatalogEventCatalogModificationEvent catalogEventevent); } /* TheBasic basicinterface class for catalog related eventmodification. */ @PublicEvolving public abstractinterface class BaseCatalogEventCatalogModificationEvent { /* TheContext catalogfor of the event. */ Catalog catalogCatalogContext context(); } /* Context for catalog modification and job lineage /* The events. */ @PublicEvolving public interface CatalogContext { /* The name of catalog. */ String catalogNamegetCatalogName(); } /* TheClass basicof class for database related eventthe catalog. */ publicClass<? abstractextends class BaseDatabaseEvent {Catalog> getClass(); /* Identifier for String databaseName(); the catalog from catalog factory, such as jdbc/iceberg/paimon. */ Optional<String> }getFactoryIdentifier(); /* EventConfig for database creationcatalog. */ @PublicEvolving public class CreateDatabaseEvent extends BaseDatabaseEvent { CatalogDatabase database(); boolean ignoreIfExists(); } Configuration getConfiguration(); } /* The basic class for database related event. */ public interface DatabaseModificationEvent extends CatalogModificationEvent { CatalogDatabase database(); } /* Event for altercreating database. */ @PublicEvolving public classinterface AlterDatabaseEventCreateDatabaseEvent extends BaseDatabaseEventDatabaseModificationEvent { CatalogDatabase newDatabaseboolean ignoreIfExists(); } /* Event for altering database. */ @PublicEvolving public interface AlterDatabaseEvent boolean ignoreIfNotExistsextends DatabaseModificationEvent { CatalogDatabase newDatabase(); boolean ignoreIfNotExists(); } /* Event for dropping database. */ @PublicEvolving public classinterface DropDatabaseEvent extends BaseDatabaseEventDatabaseModificationEvent { boolean ignoreIfExists(); } //** * Base table event, provides column list, primary keys, partition keys, watermarks and properties in * CatalogBaseTable. The table can be source or sink. */ public abstractinterface class BaseTableEventTableModificationEvent extends BaseCatalogEventCatalogModificationEvent { ObjectIdentifierObjectIdentifier identifier(); CatalogBaseTableCatalogBaseTable table(); } /* Event for creating table creation. */ @PublicEvolving public classinterface CreateTableEvent extends BaseTableEventCatalogModificationEvent { boolean ignoreIfExists(); } /* Event for altering table, provides all informationchanges infor old table and new table. */ @PublicEvolving public classinterface AlterTableEvent extends BaseTableEventCatalogModificationEvent { List<TableChange> List<TableChange> tableChanges(); boolean ignoreIfExists(); } /* Event for dropping table. */ @PublicEvolving public classinterface DropTableEvent extends BaseTableEventCatalogModificationEvent { boolean ignoreIfExists(); } } /* Factory for catalog modification listener. */ @PublicEvolving public interface CatalogEventListenerFactoryCatalogModificationListenerFactory { public CatalogEventListener CatalogModificationListener createListener(Configuration configuration, ClassLoader classLoaderContext context); } /* Add listeners in the@PublicEvolving catalog context. */ @PublicEvolving public interface CatalogFactoryContext { /** Add listeners in the context. */Configuration getConfiguration(); @PublicEvolving interface Context {ClassLoader getUserClassLoader(); /* / * Get an theExecutor listenerspool fromfor contextthe iflistener theyto arerun exists. */async operations that can potentially be IO-heavy. */ List<CatalogEventListener> Executor listenersgetIOExecutor(); } } |
Users can may create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1
and hive_catalog2
for the same metastore. The tables hive_catalog1.my_database.my_table
and hive_catalog2.my_database.my_table
are the same table in hive metastore.
In addition, there are two table types: persistent table
and temporal table
. The persistent table
can be identified by catalog and database above, while the temporal table
can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka
.
StorageIdentifier is introduced to address these issues, users can get it from CatalogTable
. StorageIdentifierFactory
creates StorageIdentifier
for catalog table.
Code Block |
---|
/* Storage identifier for different physical table. */
@PublicEvolving
public class StorageIdentifier {
/* External storage information such as kafka, hive, iceberg or paimon. */
String type();
/* Identifier which identify the unique physical table. */
String identifier();
/* Properties for the storage identifier, users can get value from different keys for different storages. */
Map<String, String> properties();
}
/* Storage identifier factory is loaded with specific connector type and create {@link StorageIdentifier}. */
@PublicEvolving
public interface StorageIdentifierFactory {
/* Create storage identifier for different connector type. */
StorageIdentifier createDynamicTableStorage(Configuration config);
}
@PublicEvolving
public interface CatalogTable {
/* Get physical storage identifier for the table. */
StorageIdentifier storage();
} |
JobSubmissionListener
Before job is submitted, Flink can create logical plan for the job and notify the listener. We add JobSubmissionListener
for this and users can create relationships between source/sink tables in it. The logical plan of job is static information which may contains much data and Flink only need to report it once when the job is submitted. Therefor, this listener is on the client side. The RestClusterClient
is the input of all jobs such as sql/table/datastream and event other developers who build job themselves and submit job with client.
Code Block |
---|
/**
* This listener will be notified before job is submitted in {@link RestClusterClient}.
*/
@PublicEvolving
public interface JobSubmissionListener {
/* Event is fired before a job is submitted. */
void onEvent(JobSubmissionEvent submissionEvent);
/* Event for job submission. */
@PublicEvolving
public class JobSubmissionEvent {
JobID jobId();
String jobName();
JobLogicalPlan plan();
}
}
/* Factory for job submission listener. */
@PublicEvolving
public interface JobSubmissionListenerFactory {
public JobSubmissionListener createListener(Configuration configuration, ClassLoader classLoader);
} |
There is JobLogicalPlan
in JobSubmissionEvent
which describe the job detailed information such as relationships between source/sink tables and columns dependencies. Users can get the plan to report more information about the job.
Code Block |
---|
/**
* Job logical plan is built according to JobGraph in the client. Users can get sources, sinks and the relationship between nodes from plan.
*/
@PublicEvolvig
public interface JobLogicalPlan {
/* Job type, BATCH or STREAMING. */
String jobType();
/* Source lineage list. */
List<JobSourceLineage> sources();
/* Sink lineage list. */
List<JobSinkLineage> sinks();
/* True if it's a table or sql job, otherwise return False. */
boolean tableOrSqlJob();
/* Get sink table descriptor for table/sql job with given sink identifier. */
SinkTableDescriptor sinkTable(String identifier);
/* Job configuration. */
Map<String, String> config();
}
/* Source info of the job plan. */
@PublicEvolving
public class JobSourceLineage {
StorageIdentifier identifier();
/* Source column name list. */
List<String> columns();
}
/* Sink info of the job plan. */
@PublicEvolving
public class JobSinkLineage {
StorageIdentifier identifier();
/* Sink column name list. */
List<String> columns();
/* Source column lineages, the key is the column in the sink and the value is the source columns list. */
Map<String, List<SourceColumnLineage>> columnLineages();
}
/* Source column list for sink vertex. */
@PublicEvolving
public class SourceColumnLineage {
/* Source identifier. */
String identifier();
/* Source name list for the given sink. */
List<String> columns();
}
@PublicEvolving
public class SinkTableDescriptor {
/* Modify type, INSERT/UPDATE/DELETE. */
String modifyType();
/* Update mode, APPEND/RETRACT/UPSERT. */
String updateMode();
boolean overwrite();
} |
Flink creates JobSourceLineage
and JobSinkLineage
for table/sql jobs, and for DataStream
jobs, users need to set them manually by setXXX
methods as follows.
Code Block |
---|
/**
* Add set source lineage method in data stream source.
*/
@Public
public class DataStreamSource {
private JobSourceLineage lineage;
public DataStreamSource setLineage(JobSourceLineage lineage);
}
/**
* Add set sink lineage method in data stream sink.
*/
@Public
public class DataStreamSink {
private JobSinkLineage lineage;
public DataStreamSink setLineage(JobSinkLineage lineage);
} |
JobExecutionListener
JobManager
generates events when status of job is changed or checkpoint is started and notify JobExecutionListener
. JobStatusEvent
indicates the status of Flink job in JobStatus
with old status, new status and job logical plan.
Code Block |
---|
/**
* When job status is changed in job manager, it will generate job event and notify job execution listener.
*/
@PublicEvolving
public interface JobExecutionListener {
/* Event fired after job status has been changed. */
void onEvent(JobStatusEvent jobStatusEvent);
/* Job status event with plan. */
@PublicEvolving
public class JobStatusEvent {
JobID jobId();
String jobName();
JobStatus oldStatus();
JobStatus newStatus();
/* Exception for job when it is failed. */
@Nullable Throwable exception();
}
}
/* Factory for job execution listener. */
@PublicEvolving
public interface JobExecutionListenerFactory {
public JobExecutionListener createListener(Configuration configuration, ClassLoader classLoader);
} |
Config Customized Listener
Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options
Code Block |
---|
# Config catalog event listeners.
table.catalog.listeners: {job catalog listener factory1},{job catalog listener factory2}
# Existing config job submission listeners.
execution.job-submission-listeners: {job submission listener factory1},{job submission listener factory2}
# Config job execution listeners.
jobmanager.execution.listeners: {job execution listener factory1},{job execution listener factory2} |
Proposed Changes
Changes for CatalogEventListener
TableEnvironmentImpl
creates customized CatalogEventListener
according to the option table.catalog.listeners
, and build CatalogManager
with the listeners. Users can create CatalogManager
with the listeners in some other components such as Sql-Gateway too. The database related operations are in the Catalog
, the listeners are added in AbstractCatalog
and users can notify them after database operations in their customized catalog.
Code Block |
---|
/* Listeners related operations in the catalog manager. */
public final class CatalogManager {
/* Create catalog manager with listener list. */
private CatalogManager(
String defaultCatalogName,
Catalog defaultCatalog,
DataTypeFactory typeFactory,
ManagedTableListener managedTableListener,
List<CatalogEventListener> listeners);
/* Notify the listeners with given catalog event. */
private void notify(CatalogEvent event) {
for (CatalogEventListener listener : listeners) {
listener.onEvent(event);
}
}
/* Notify listener for tables. */
public void createTable/dropTable/alterTable(...) {
....;
notify(Create Different Table Event);
}
/* Builder for catalog manager. */
public static final class Builder {
Builder listeners(List<CatalogEventListener> listeners);
}
}
/* Listeners related operations in AbstractCatalog. */
public abstract class AbstractCatalog implements Catalog {
/* Create the catalog with listeners. */
public AbstractCatalog(String name, String defaultDatabase, List<CatalogEventListener> listeners);
/**
* Notify the listeners with given database event, after the customized implementation of AbstractCatalog create/alter/drop a database,
* it can create the specific event and call the notify method.
*/
protected void notify(BaseDatabaseEvent event) {
for (CatalogEventListener listener : listeners) {
listener.onEvent(event);
}
}
}
/* Create default catalog context with listeners. */
public DefaultCatalogContext {
public DefaultCatalogContext(
String name,
Map<String, String> options,
ReadableConfig configuration,
ClassLoader classLoader,
List<CatalogEventListener> listeners);
/* Get catalog event listeners from the context. */
public List<CatalogEventListener> listeners() {
return listeners;
}
} |
Changes for JobSubmissionListener
Build JobLogicalPlan in StreamGraph
Flink creates Planner
for sql and table jobs which contains exec nodes, then the planner will be converted to Operation
, Transformation
and StreamGraph
. DataStream
jobs are similar with SQL, Flink create datastream from environment and converted it to Transformation
and StreamGraph
. The job conversion is shown as followed.
...
There is a graph structure in StreamGraph
, we can create JobLogicalPlan
based on StreamGraph
easily.
For table and SQL jobs, Flink translate exec node of planner to Transformation
in method ExecNodeBase.translateToPlanInternal
for source and sink. There's resolved table in source and sink nodes, Flink can create source and sink lineage based on the table and save them in source/sink transformation.
...
For DataStream
jobs, DataStreamSource
set the source lineage to source transformation in setLineage
method, and DataStreamSink
does the same thing.
Finally source and sink transformations are translated and added in StreamGraph
by SourceTransformationTranslator.translateInternal
and SinkTransformationTranslator.translateInternal
, where source and sink lineage information can be added to StreamGraph
too.
...
Create and notify listeners in RestClusterClient
RestClusterClient
can read option of listener from Configuration
and create JobSubmissionListener
. But there is no StreamGraph
in RestClusterClient
and submitJob
method. RestClusterClient.submitJob
is used in AbstractSessionClusterExecutor
, which will convert Pipeline
to JobGraph
and submit it. AbstractSessionClusterExecutor
can set StreamGraph
in RestClusterClient
before calling its submitJob
method, then the specific RestClusterClient
can get the StreamGraph
and notify the listener before JobGraph
is submitted.
Code Block |
---|
/* Set pipeline to client before it submit job graph. */
public class AbstractSessionClusterExecutor {
public CompletableFuture<JobClient> execute(
@Nonnull final Pipeline pipeline,
@Nonnull final Configuration configuration,
@Nonnull final ClassLoader userCodeClassloader)
throws Exception {
....
final ClusterClientProvider<ClusterID> clusterClientProvider =
clusterDescriptor.retrieve(clusterID);
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
// Set pipeline to the cluster client before submit job graph
clusterClient.setPipeline(pipeline);
return clusterClient
.submitJob(jobGraph)
....
}
}
/* Create job submission event and notify listeners before it submit job graph. */
public class RestClusterClient {
private final List<JobSubmissionListener> listeners;
private Pipeline pipeline;
@Override
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
// Create event and notify listeners before the job graph is submitted.
JobSubmissionEvent event = createEventFromPipeline(pipeline);
if (event != null) {
listeners.forEach(listener -> listener.onEvent(event));
}
....;
}
} |
Changes for JobExecutionListener
JobManager
can create JobExecutionListener
in DefaultExecutionGraph
according to options in job configuration. Currently JobManager
will call DefaultExecutionGraph.transitionState
when the job status changes, JobExecutionListener
can be notified in the method as follows.
Code Block |
---|
/* Set pipeline to client before it submit job graph. */
public class DefaultExecutionGraph {
private final List<JobExecutionListener> executionListeners;
private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
....;
notifyJobStatusHooks(newState, error);
// notify job execution listeners
notifyJobExecutionListeners(current, newState, error);
....;
}
private void notifyJobExecutionListeners(JobStatus current, JobStatus newState, Throwable error) {
JobStatusEvent event = create job status event;
executionListeners.forEach(listener -> listener.onEvent(event));
}
} |
Listener Construction and Execution
Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.
Plan For The Future
...
Currently we only supports scan source, lookup join source should be supported later.
...
Users can identify the physical connector by CatalogContext
and options in CatalogBaseTable
through the following steps:
1. Get connector name.
Users can get value of option 'connector' from options in CatalogBaseTable
for temporal tables. If it doesn't exist, users can get factory identifier from CatalogContext as connector name. If none of the above exist, users can define the connector name themselves through Class<? extends Catalog>
.
2. Uses can get different properties based on connector name from table options and create connector identifier. Flink has many connectors, and we given the example of kafka options below, users can create kafka identifier with servers, group and topic as needed.
Code Block |
---|
/* Kafka storage identifier options. */
"properties.bootstrap.servers" for Kafka bootstrap servers
"topic" for Kafka Topic
"properties.group.id" for Kafka group id
"topic-pattern" for Kafka topic pattern |
For some sensitive information, users can encode and desensitize them in their customized listeners.
Config Customized Listener
Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options
Code Block |
---|
# Config for catalog modification listeners.
table.catalog-modification.listeners: {table catalog listener factory1},{table catalog listener factory2} |
Proposed Changes
Use case of job lineage
Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink SQL jobs as follows.
a) Create Kafka and Paimon tables for Flink SQL job
Code Block |
---|
-- Create a table my_kafka_table1 for kafka topic 'kafka_topic'
CREATE TABLE my_kafka_table1 (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_topic',
'properties.bootstrap.servers' = 'kafka_bootstrap_server',
'properties.group.id' = 'kafka_group',
'format' = 'json'
);
-- Create a Paimon catalog and table for warehouse 'paimon_path1'
CREATE CATALOG paimon_catalog WITH (
'type'='paimon',
'warehouse'='paimon_path1'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (...);
-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table1 WHERE ...; |
b) Create another Kafka and Paimon tables for Flink SQL job
Code Block |
---|
-- Create another table my_kafka_table2 for kafka topic 'kafka_topic' which is same as my_kafka_table1 above
CREATE TABLE my_kafka_table2 (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_topic',
'properties.bootstrap.servers' = 'kafka_bootstrap_server',
'properties.group.id' = 'kafka_group',
'format' = 'json'
);
-- Create a Paimon catalog with the same name 'paimon_catalog' for different warehouse 'paimon_path2'
CREATE CATALOG paimon_catalog WITH (
'type'='paimon',
'warehouse'='paimon_path2'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (...);
-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table2 WHERE ...; |
c) Create Mysql table for Flink SQL job
Code Block |
---|
-- Create two catalogs for warehouse 'paimon_path1' and 'paimon_path2', there are two different tables 'my_paimon_table'
CREATE CATALOG paimon_catalog1 WITH (
'type'='paimon',
'warehouse'='paimon_path1'
);
CREATE CATALOG paimon_catalog2 WITH (
'type'='paimon',
'warehouse'='paimon_path2'
);
-- Create mysql table
CREATE TABLE mysql_table (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://HOST:PORT/my_database',
'table-name' = 'my_mysql_table',
'username' = '***',
'password' = '***'
);
-- Insert data to mysql table from two paimon tales
INSERT INTO mysql_table
SELECT ... FROM paimon_catalog1.default.my_paimon_table
JOIN paimon_catalog2.default.my_paimon_table
ON ... WHERE ...; |
After completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by connector identifier. These tables are associated through Flink jobs, users can report the tables and relationships to datahub as an example which is shown below (The job lineage will be supported in FLIP-314)
Changes for CatalogModificationListener
TableEnvironmentImpl
creates customized CatalogModificationListener
according to the option lineage.catalog-modification.listeners
, and build CatalogManager
with the listeners. Some other components such as Sql-Gateway can create CatalogManager
with the listeners themselves. Currently all table related operations such as create/alter are in CatalogManager
, but database operations are not. We can add database modification operations in CatalogManager
and notify the specified listeners for tables and databases.
Code Block |
---|
/* Listeners and related operations in the catalog manager. */
public final class CatalogManager {
private final List<CatalogModificationListener> listeners;
/* Create catalog manager with listener list. */
private CatalogManager(
String defaultCatalogName,
Catalog defaultCatalog,
DataTypeFactory typeFactory,
ManagedTableListener managedTableListener,
List<CatalogModificationListener> listeners);
/* Notify the listeners with given catalog event. */
private void notify(CatalogModificationEvent event) {
listeners.forEach(listener -> listener.onEvent(event));
}
/* Notify listener for tables. */
public void createTable/dropTable/alterTable(...) {
....;
notify(Create Different Table Modification Event With Context);
}
/* Add database ddls and notify listener for databases. */
public void createDatabase/dropDatabase/alterDatabase(...) {
....;
notify(Create Different Database Modification Event With Context);
}
/* Add listeners in Builder for catalog manager. */
public static final class Builder {
Builder listeners(List<CatalogModificationListener> listeners);
}
} |
Listener Execution
Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.
[2] https://atlas.apache.org/#/
[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store
[4]FLIP-314: Support Customized Job Lineage Listener