Status
Discussion thread |
---|
https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx |
Vote thread |
---|
https://lists.apache.org/thread/5f9806kw1q01536wzwgx1psh7jhmfmjw | |||||||||
JIRA |
|
---|
| ||
Release |
---|
1.18 |
Motivation
Flink ETL job consumes data from Source Table and produces result to Sink Table after computation. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships with customized listeners to other external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLINKFLIP-276 [3].
This FLIP aims We would like to introduce listeners for users listener interfaces in Flink, then users can implement them to report the progress of jobs and meta data meta data and lineage to external systemsystems. The main information is as follows
...
2. Job information, such as job id/name, execution mode, scheduler job type, logical planjob configuration
3. Relationship between Source/Sink and jobs, such as source and sink tables for job, fields relationships in job and vertexand their column lineages.
4. Job execution status changed information, such as job status changes, vertex status changes, checkpoints
Public Interfaces
JobCreationListener
JobCreationListener
is used to receive events of ddl, job submission, it only supports sql/table jobs in this FLIP.
, exception.
This FLIP focuses on customized meta data listener and customized job lineage listener will be introduced in FLIP-314 [4]
Public Interfaces
CatalogModificationListener
DDL operations such as create/alter/drop tables will generate different events and notify CatalogModificationListener
. All events for CatalogModificationListener
extend the basic CatalogModificationEvent
and listeners can get catalog from it. Some general events for database/table are defined as follows and more events can be implemented based on the requirements in the future.
Code Block |
---|
/**
* Different events will be fired when a catalog/database/table is modified. The customized listener can get and
* report specific information from the event according to the event type.
*/
@PublicEvolving
public interface CatalogModificationListener {
/** The event will be fired when the database/table is modified |
Code Block |
/** * Job creation listener, client will create specific event and notify this listener. */ @PublicEvolving public interface JobCreationListener { /* Start the listener. */ void start(Map<String, String> config); /* Event fired after a catalog has been registered. */ void onRegisterCatalog(CatalogEvent catalogEvent); void onUnregisterCatalog(UnregisterCatalogEvent catalogEvent); /* Event fired after a database has been created. */ void onCreateDatabase(DatabaseEvent databaseEvent); /* Event fired after a database has been dropped. */ void onDropDatabaseonEvent(DatabaseEventCatalogModificationEvent databaseEventevent); } /* EventBasic firedinterface afterfor a table has been createdcatalog modification. */ @PublicEvolving public interface class void onCreateTable(CreateTableEvent tableEvent); CatalogModificationEvent { /* EventContext firedfor after a table has been changedthe event. */ CatalogContext void onAlterTable(AlterTableEvent tableEventcontext(); } /* EventContext firedfor aftercatalog amodification tableand hasjob beenlineage droppedevents. */ @PublicEvolving public interface CatalogContext { void onDropTable(DropTableEvent tableEvent); /* EventThe firedname before a job is submitted to do some validationsof catalog. */ voidString onJobPreSubmissiongetCatalogName(JobSubmissionEvent submitEvent); /* EventClass firedof afterthe a job is submitted. */ void onJobSubmission(JobSubmissionEvent submitEvent); catalog. */ Class<? extends Catalog> getClass(); /* EventIdentifier for the catalog registration,from provides catalog namefactory, defaultsuch database, database list and properties in the Catalogas jdbc/iceberg/paimon. */ @PublicEvolving Optional<String> getFactoryIdentifier(); interface CatalogEvent { /* Config for catalog. */ Configuration String cataloggetConfiguration(); } /* The basic class for database related Catalog catalog(); } event. */ public interface DatabaseModificationEvent extends CatalogModificationEvent { /* Event for catalog unregistrationCatalogDatabase database(); } /* Event for creating database. */ @PublicEvolving public interface CreateDatabaseEvent extends interface UnregisterCatalogEventDatabaseModificationEvent { String catalog(); boolean ignoreIfNotExistsignoreIfExists(); } /* Event for database creation, provides catalog name, database name, comment and properties of the altering database. */ @PublicEvolving public interface AlterDatabaseEvent extends interface DatabaseEventDatabaseModificationEvent { String catalogCatalogDatabase newDatabase(); String nameboolean ignoreIfNotExists(); } /* Event for dropping CatalogDatabase database(); database. */ @PublicEvolving public interface DropDatabaseEvent extends DatabaseModificationEvent { boolean ignoreIfExists(); } /** Event* forBase droppingtable database. */ @PublicEvolving interface DropDatabaseEvent { String catalog(); String name(); event, provides column list, primary keys, partition keys, watermarks and properties in * CatalogBaseTable. The table can be source or sink. */ public interface TableModificationEvent extends CatalogModificationEvent { ObjectIdentifier identifier(); boolean ignoreIfExistsCatalogBaseTable table(); } /* Event for creating /* Table information event, provides column list, primary keys, partition keys, watermarks and properties in the table. The table can be source or sinktable. */ @PublicEvolving public interface CreateTableEvent extends CatalogModificationEvent { boolean ignoreIfExists(); } /* Event for altering table, provides all changes for old table. */ @PublicEvolving public interface AlterTableEvent extends interfaceCatalogModificationEvent TableEvent { ObjectIdentifierList<TableChange> identifiertableChanges(); CatalogBaseTableboolean tableignoreIfExists(); } /* Event for dropping table creation. */ @PublicEvolving public interface CreateTableEventDropTableEvent extends TableEventCatalogModificationEvent { boolean ignoreIfExists(); }} /* EventFactory for alteringcatalog table,modification provides all information in old table and new table. */listener. */ @PublicEvolving public interface CatalogModificationListenerFactory { CatalogModificationListener createListener(Context context); @PublicEvolving public interface AlterTableEvent extendsContext TableEvent { CreateTableEventConfiguration newTablegetConfiguration(); booleanClassLoader ignoreIfExistsgetUserClassLoader(); } /* /* Event * Get an Executor pool for the dropping table. */ @PublicEvolving interface DropTableEvent { ObjectIdentifier identifier(); boolean ignoreIfExists(); } /* Event for before and after job is submitted, provides source tables and sink tables. */ @PublicEvolving interface JobSubmissionEvent { List<TableEvent> sources(); List<TableEvent> sinks(); } } |
JobExecutionListener
Added JobExecutionListener
listens to the status changes in the job. JobManager
creates JobEvent
for each status when it changes, and notify specific method in JobExecutionListener
. Users can implement different listeners according to their needs, such as Datahub listener or Atlas listener.
listener to run async operations that can potentially be IO-heavy.
*/
Executor getIOExecutor();
}
} |
Users may create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1
and hive_catalog2
for the same metastore. The tables hive_catalog1.my_database.my_table
and hive_catalog2.my_database.my_table
are the same table in hive metastore.
In addition, there are two table types: persistent table
and temporal table
. The persistent table
can be identified by catalog and database above, while the temporal table
can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka
.
Users can identify the physical connector by CatalogContext
and options in CatalogBaseTable
through the following steps:
1. Get connector name.
Users can get value of option 'connector' from options in CatalogBaseTable
for temporal tables. If it doesn't exist, users can get factory identifier from CatalogContext as connector name. If none of the above exist, users can define the connector name themselves through Class<? extends Catalog>
.
2. Uses can get different properties based on connector name from table options and create connector identifier. Flink has many connectors, and we given the example of kafka options below, users can create kafka identifier with servers, group and topic as needed.
Code Block |
---|
/* Kafka storage identifier options. */
"properties.bootstrap.servers" for Kafka bootstrap servers
"topic" for Kafka Topic
"properties.group.id" for Kafka group id
"topic-pattern" for Kafka topic pattern |
For some sensitive information, users can encode and desensitize them in their customized listeners.
Config Customized Listener
Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options
Code Block |
---|
# Config for catalog modification listeners.
table.catalog-modification.listeners: {table catalog listener factory1},{table catalog listener factory2} |
Proposed Changes
Use case of job lineage
Users may create different tables on a same storage, such as the same Kafka topic. Suppose there's one Kafka topic, two Paimon tables and one Mysql table. Users create these tables and submit three Flink SQL jobs as follows.
a) Create Kafka and Paimon tables for Flink SQL job
Code Block |
---|
-- Create a table my_kafka_table1 for kafka topic 'kafka_topic'
CREATE TABLE my_kafka_table1 (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_topic',
'properties.bootstrap.servers' = 'kafka_bootstrap_server',
'properties.group.id' = 'kafka_group',
'format' = 'json'
);
-- Create a Paimon catalog and table for warehouse 'paimon_path1'
CREATE CATALOG paimon_catalog WITH (
'type'='paimon',
'warehouse'='paimon_path1'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (...);
-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table1 WHERE ...; |
b) Create another Kafka and Paimon tables for Flink SQL job
Code Block |
---|
-- Create another table my_kafka_table2 for kafka topic 'kafka_topic' which is same as my_kafka_table1 above
CREATE TABLE my_kafka_table2 (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (
'connector' = 'kafka',
'topic' = 'kafka_topic',
'properties.bootstrap.servers' = 'kafka_bootstrap_server',
'properties.group.id' = 'kafka_group',
'format' = 'json'
);
-- Create a Paimon catalog with the same name 'paimon_catalog' for different warehouse 'paimon_path2'
CREATE CATALOG paimon_catalog WITH (
'type'='paimon',
'warehouse'='paimon_path2'
);
USE CATALOG paimon_catalog;
CREATE TABLE my_paimon_table (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (...);
-- Insert data to Paimon table from Kafka
INSERT INTO my_paimon_table SELECT ... FROM default_catalog.default_database.my_kafka_table2 WHERE ...; |
c) Create Mysql table for Flink SQL job
Code Block |
---|
-- Create two catalogs for warehouse 'paimon_path1' and 'paimon_path2', there are two different tables 'my_paimon_table'
CREATE CATALOG paimon_catalog1 WITH (
'type'='paimon',
'warehouse'='paimon_path1'
);
CREATE CATALOG paimon_catalog2 WITH (
'type'='paimon',
'warehouse'='paimon_path2'
);
-- Create mysql table
CREATE TABLE mysql_table (
val1 STRING,
val2 STRING,
val3 STRING
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://HOST:PORT/my_database',
'table-name' = 'my_mysql_table',
'username' = '***',
'password' = '***'
);
-- Insert data to mysql table from two paimon tales
INSERT INTO mysql_table
SELECT ... FROM paimon_catalog1.default.my_paimon_table
JOIN paimon_catalog2.default.my_paimon_table
ON ... WHERE ...; |
After completing the above operations, we got one Kafka topic, two Paimon tables and one Mysql table which are identified by connector identifier. These tables are associated through Flink jobs, users can report the tables and relationships to datahub as an example which is shown below (The job lineage will be supported in FLIP-314)
Changes for CatalogModificationListener
TableEnvironmentImpl
creates customized CatalogModificationListener
according to the option lineage.catalog-modification.listeners
, and build CatalogManager
with the listeners. Some other components such as Sql-Gateway can create CatalogManager
with the listeners themselves. Currently all table related operations such as create/alter are in CatalogManager
, but database operations are not. We can add database modification operations in CatalogManager
and notify the specified listeners for tables and databases.
Code Block |
---|
/* Listeners and related operations in the catalog manager. */
public final class CatalogManager {
private final List<CatalogModificationListener> listeners;
/* Create catalog manager with listener list. */
private CatalogManager(
String defaultCatalogName,
Catalog defaultCatalog,
DataTypeFactory typeFactory,
ManagedTableListener managedTableListener,
List<CatalogModificationListener> listeners);
/* Notify the listeners with given catalog event. */
private void notify(CatalogModificationEvent event) |
Code Block |
/** * When job status is changed in job manager, it will generate job event to notify job execution listener. */ @PublicEvolving public interface JobExecutionListener extends AutoCloseable { /* Start the listener with job config. */ void start(Map<String, String> config) throws Exception; /* Event fired after job has been created. */ void onCreated(JobCreatedEvent createdEvent); /* Event fired after job has beed finished. */ void onFinished(JobFinishedEvent finishedEvent); /* Event fired after job has beed canceled. */ void onCanceled(JobCanceledEvent canceledEvent); /* Event fired after job has beed failed. */ void onFailed(JobFailedEvent failedEvent); /* Vertex in job plan, provides id, name, parallelism, input edges and output column names. */ @PublicEvolving interface JobVertexInfo { String id(); String name(); int parallelism(); List<JobEdgeInfo> inputs(); List<String> outputColumns(); } /* Edge in job plan, provides source/target vertex, input columns, distribution, isBroadcast and isForward. */ @PublicEvolving interface JobEdgeInfo { JobVertexInfo source(); JobVertexInfo target(); /* Input column names of the edge. */ List<String> inputColumns(); String distribution(); boolean isBroadcast(); boolean isForward(); } /* Job source vertex, provides source table name, input table columns and source type. */ @PublicEvolving interface JobSourceVertex extends JobVertexInfo { /* `Catalog.Database.Table` format name. */ String sourceName(); /* Scan or lookup. */ String type(); /* Columns from source table, detailed information such as column type is provided in {@Code JobCreationListener#onCreateTable}. */ List<String> columns(); /* Source config provides options in source such as source type. */ Map<String, String> config(); } /* */ @PublicEvolving interface JobSinkVertex extends JobVertexInfo { /* `Catalog.Database.Table` format name. */ String sinkName(); /* Columns from sink table, detailed information such as column type is provided in {@Code JobCreationListener#onCreateTable}. */ List<String> columns(); /* Sink config provides options in source such as source type. */ Map<String, String> config(); } /* Event for job status is changed. */ interface JobEvent { JobID jobId(); String jobName(); /* Scheduler type such as Default/Adaptive/AdaptiveBatch. */ String scheduler(); /* Job execution mode. */ ExecutionMode executionMode(); /* BATCH or STREAMING. */ String jobType(); /* Timestamp for current job status. */ long timestamp(); } /* Event for job is created. */ @PublicEvolving interface JobCreatedEvent extends JobEvent { /* Source list. */ List<JobSourceVertex> sources(); listeners.forEach(listener -> listener.onEvent(event)); } /* SinkNotify list. listener for tables. */ public void createTable/dropTable/alterTable(...) { List<JobSinkVertex> sinks() ....; } /* Event for job is finished. */notify(Create Different Table Modification Event With Context); @PublicEvolving} interface/* JobFinishedEventAdd extendsdatabase JobEventddls {and } notify listener for databases. */* Event for jobpublic is canceled. */void createDatabase/dropDatabase/alterDatabase(...) { @PublicEvolving interface JobCanceledEvent extends JobEvent { } ....; /*notify(Create Different Database Modification Event for job is failed. */With Context); } @PublicEvolving /* Add listeners in interfaceBuilder JobFailedEventfor extendscatalog JobEventmanager. {*/ public static final class Throwable exception();Builder { } } |
Config Customized Listener
Users should add their listener to the classpath of flink cluster, and use the listener with the option in JobManagerOptions
as followed
Code Block |
---|
jobmanager.execution.listener: {user's listener class} |
Proposed Changes
The basic information such as job id and name are in ExecutionGraph
, but the source and sink list are not. They should be added to ExecutionGraph
for JobExecutionListener
too.
Flink jobs are created from Planner
(sql and table) and DataStream
, then they are converted to Transformation
and StreamGraph
. Finally, the jobs are submitted as JobGraph
and job managers create ExecutionGraph
from it. The operations of source/sink list are as follows.
...
SourceScan
in Planner
and DataStreamSource
in DataStream
contain source information such as table name, source configuration. But these information is hidden in the Source
which is an interface when the SourceScan
and DataStreamSource
are converted to Transformation
. We should add source information in the conversion of SourceScan
/DataStreamSource
->Transformation
->StreamNode
, then we can add Map<JobVertexID, SourceSinkInformation> sources
in JobGraph
and ExecutionGraph
.
Similar to sources, Sink
and DataStreamSink
contain sink information such as table names and configuration. We should add sink information in the conversion of Sink
/DataStreamSink
->Transformation
->StreamNode
, then we can add Map<JobVertexID, SourceSinkInformation> sources
in JobGraph
and ExecutionGraph
too.
JobManager creates instances of user's JobExecutionListener
and gets sources/sinks information from ExecutionGraph
. When the status of job is changed, JobManager creates specific job events and notifies the JobExecutionListener
in DefaultExecutionGraph.transitionState
Plan For The Future
...
Add ddl listener and submit listener to report fields in tables and supports customized validations for tables and jobs.
...
This FLIP only supports scan source, we can support lookup join source later.
...
Builder listeners(List<CatalogModificationListener> listeners);
}
} |
Listener Execution
Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.
[2] https://atlas.apache.org/#/
[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store
[4]FLIP-314: Support Customized Job Lineage Listener