Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Discussion thread-https://lists.apache.org/thread/185mbcwnpokfop4xcb22r9bgfp2m68fx
Vote thread-https://lists.apache.org/thread/5f9806kw1q01536wzwgx1psh7jhmfmjw
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-3127532402

Release-1.18

Motivation

Flink ETL job consumes data from Source and produces result to Sink. Source creates relationship with Sink through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].

...

Code Block
/**
 * Different events will be fired when a catalog/database/table is modified. The customized listener can get and
 * report specific information from the event according to the event type.
 */
@PublicEvolving
public interface CatalogModificationListener {
    /** The event will be fired when the database/table is modified. */
    void onEvent(CatalogModificationEvent event);
}

/* Basic interface for catalog modification. */
@PublicEvolving
public interface class CatalogModificationEvent {
    /* Context for the event. */
     CatalogModificationContextCatalogContext context();
}

/* Context for catalog modification and job lineage events. */
@PublicEvolving
public interface CatalogModificationContextCatalogContext {
    
    /* The name of catalog. */
    String getCatalogName();

    /* Class of the catalog. */
    Class<? extends Catalog> getClass();

    /* Identifier for the catalog from catalog factory, such as jdbc/iceberg/paimon. */
    Optional<String> getFactoryIdentifier();

    /* Config for catalog. */
    Configuration getConfiguration();
}

/* The basic class for database related event. */
public interface DatabaseModificationEvent extends CatalogModificationEvent {
    CatalogDatabase database();
}

/* Event for creating database. */
@PublicEvolving
public interface CreateDatabaseEvent extends DatabaseModificationEvent {
    boolean ignoreIfExists();
}

/* Event for altering database. */
@PublicEvolving
public interface AlterDatabaseEvent extends DatabaseModificationEvent {
    CatalogDatabase newDatabase();
    boolean ignoreIfNotExists();
}

/* Event for dropping database. */
@PublicEvolving
public interface DropDatabaseEvent extends DatabaseModificationEvent {
    boolean ignoreIfExists();
}

/**
 * Base table event, provides column list, primary keys, partition keys, watermarks and properties in
 * CatalogBaseTable. The table can be source or sink.
 */
public interface TableModificationEvent extends CatalogModificationEvent {
    ObjectIdentifier identifier();
    CatalogBaseTable table();
}

/* Event for creating table. */
@PublicEvolving
public interface CreateTableEvent extends CatalogModificationEvent {
    boolean ignoreIfExists();
}

/* Event for altering table, provides all changes for old table. */
@PublicEvolving
public interface AlterTableEvent extends CatalogModificationEvent {
    List<TableChange> tableChanges();
    boolean ignoreIfExists();
}

/* Event for dropping table. */
@PublicEvolving
public interface DropTableEvent extends CatalogModificationEvent {
    boolean ignoreIfExists();
}

/* Factory for catalog modification listener. */
@PublicEvolving
public interface CatalogModificationListenerFactory {
    CatalogModificationListener createListener(Context context);

    @PublicEvolving 
    public interface Context {
        Configuration getConfiguration();
        ClassLoader getUserClassLoader();
    	/*
    	* Get an Executor pool for the listener to run async operations that can potentially be IO-heavy.
    	*/
    	Executor getIOExecutor();
     }
}

...

Users can identify the physical connector by CatalogModificationContext CatalogContext and options in CatalogBaseTable through the following steps:

...

Users can get value of option 'connector' from options in CatalogBaseTable  for temporal tables. If it doesn't exist, users can get factory identifier from CatalogModificationContext CatalogContext as connector name. If none of the above exist, users can define the connector name themselves through Class<? extends Catalog> .

...