You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 54 Next »

Status

Discussion thread-
Vote thread-
JIRA

Unable to render Jira issues macro, execution error.

Release-

Motivation

Flink ETL job consumes data from Source and produces result to Sink. Source creates relationship with Sink through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].

This FLIP aims to introduce listener interfaces in Flink, users can implement them to report the meta data and lineage to external systems. The main information is as follows

1. Source and Sink information, such as table name, fields, partition keys, primary keys, watermarks, configurations

2. Job information, such as job id/name, job type, job configuration

3. Relationship between Source/Sink and jobs, such as source and sink and their column lineages.

4. Job execution status changed information, such as job status, exception.

This FLIP focuses on customized metadata listener in Flink and job lineage related listener will be in FLIP-314 [4]

Public Interfaces

CatalogModificationListener

DDL operations such as create/alter/drop tables will generate different events and notify CatalogModificationListener . All events for CatalogModificationListener extend the basic CatalogModificationEvent and listeners can get catalog from it. Some general events for database/table are defined as follows and more events can be implemented based on the requirements in the future.

/**
 * Different events will be fired when a catalog/database/table is modified. The customized listener can get and
 * report specific information from the event according to the event type.
 */
@PublicEvolving
public interface CatalogModificationListener {
    /** The event will be fired when the database/table is modified. */
    void onEvent(CatalogModificationEvent event, CatalogModificationContext context);
}

/* Basic class for catalog modification. */
@PublicEvolving
public abstract class CatalogModificationEvent {
}

/* Context for catalog modification and job lineage events. */
@PublicEvolving
public class CatalogModificationContext {
    /* The name of catalog. */
    String catalogName();

    /* Class of the catalog. */
    Class<? extends Catalog> clazz();

    /* Identifier for the catalog from catalog factory, such as jdbc/iceberg/paimon. */
    Optional<String> factoryIdentifier();

    /* Config for catalog. */
    Map<String, String> config();
}

/* The basic class for database related event. */
public abstract class DatabaseModificationEvent extends CatalogModificationEvent {
    CatalogDatabase database();
}

/* Event for creating database. */
@PublicEvolving
public class CreateDatabaseEvent extends DatabaseModificationEvent {
    boolean ignoreIfExists();
}

/* Event for altering database. */
@PublicEvolving
public class AlterDatabaseEvent extends DatabaseModificationEvent {
    String oldDatabaseName();
    boolean ignoreIfNotExists();
}

/* Event for dropping database. */
@PublicEvolving
public class DropDatabaseEvent extends DatabaseModificationEvent {
    boolean ignoreIfExists();
}

/**
 * Base table event, provides column list, primary keys, partition keys, watermarks and properties in
 * CatalogBaseTable. The table can be source or sink.
 */
public abstract class TableModificationEvent extends CatalogModificationEvent {
    ObjectIdentifier identifier();
    CatalogBaseTable table();
}

/* Event for creating table. */
@PublicEvolving
public class CreateTableEvent extends CatalogModificationEvent {
    boolean ignoreIfExists();
}

/* Event for altering table, provides all changes for old table. */
@PublicEvolving
public class AlterTableEvent extends CatalogModificationEvent {
    List<TableChange> tableChanges();
    boolean ignoreIfExists();
}

/* Event for dropping table. */
@PublicEvolving
public class DropTableEvent extends CatalogModificationEvent {
    boolean ignoreIfExists();
}

/* Factory for catalog modification listener. */
@PublicEvolving
public interface CatalogModificationListenerFactory {
    CatalogModificationListener createListener(Configuration config, ClassLoader classloader);
}

Users may create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1  and hive_catalog2  for the same metastore. The tables hive_catalog1.my_database.my_table  and hive_catalog2.my_database.my_table  are the same table in hive metastore.

In addition, there are two table types: persistent table  and temporal table . The persistent table  can be identified by catalog and database above, while the temporal table  can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka.

Users can identify the physical connector by CatalogContext and options in CatalogBaseTable through the following steps:

1. Get connector name.

Users can get value of option 'connector' from options in CatalogBaseTable  for temporal tables. If it doesn't exist, users can get factory identifier from CatalogContext as connector name. If none of the above exist, users can define the connector name themselves through Class<? extends Catalog> .

2. Uses can get different properties based on connector name from table options and create connector identifier. Flink has many connectors, and we given the example of kafka options below, users can create kafka identifier with servers, group and topic as needed.

/* Kafka storage identifier options. */
"properties.bootstrap.servers" for Kafka bootstrap servers
"topic" for Kafka Topic
"properties.group.id" for Kafka group id
"topic-pattern" for Kafka topic pattern

For some sensitive information, users can encode and desensitize them in their customized listeners.

Config Customized Listener

Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options

# Config for catalog modification listeners.
table.catalog-modification.listeners: {table catalog listener factory1},{table catalog listener factory2}

Proposed Changes

Changes for CatalogModificationListener

TableEnvironmentImpl creates customized CatalogModificationListener according to the option lineage.catalog-modification.listeners , and build CatalogManager with the listeners. Some other components such as Sql-Gateway can create CatalogManager with the listeners themselves. Currently all table related operations such as create/alter are in CatalogManager , but database operations are not. We can add database modification operations in CatalogManager  and notify the specified listeners for tables and databases.

/* Listeners and related operations in the catalog manager. */
public final class CatalogManager {
    private final List<CatalogModificationListener> listeners;

    /* Create catalog manager with listener list. */
    private CatalogManager(
            String defaultCatalogName,
            Catalog defaultCatalog,
            DataTypeFactory typeFactory,
            ManagedTableListener managedTableListener,
            List<CatalogModificationListener> listeners);

    /* Notify the listeners with given catalog event. */
    private void notify(CatalogEvent event) {
        listeners.forEach(listener -> listener.onEvent(event));
    }

    /* Notify listener for tables. */
    public void createTable/dropTable/alterTable(...) {
        ....;
        notify(Create Different Table Modification Event);
    }

    /* Add database ddls and notify listener for databases. */
    public void createDatabase/dropDatabase/alterDatabase(...) {
        ....;
        notify(Create Different Database Modification Event); 
    }

    /* Add listeners in Builder for catalog manager. */
    public static final class Builder {
        Builder listeners(List<CatalogModificationListener> listeners);
    }
}

Listener Execution

Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.


[1] https://datahub.io/

[2] https://atlas.apache.org/#/

[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store

[4] FLIP-314: Support Customized Job Lineage Listener



  • No labels