Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
/**
 * Different events will be fired when a catalog/database/table is modified. The customized listener can get and
 * report specific information from the event according to the event type.
 */
@PublicEvolving
public interface CatalogModificationListener {
    /** The event will be fired when the database/table is modified. */
    void onEvent(CatalogModificationEvent event, CatalogModificationContext context);
}

/* Basic class for catalog modification. */
@PublicEvolving
public abstract class CatalogModificationEvent {
}

/* Context for catalog modification listener and job lineage events. */
@PublicEvolving
public class CatalogModificationContext {
    /* The name of catalog. */
    String catalogName();

    /* Class of the catalog. */
    Class<? extends Catalog> clazz();

    /* Identifier for the catalog from catalog factory, such as paimon. It will be empty for memory catalog and hive catalogjdbc/iceberg/paimon. */
    Optional<String> factoryIdentifier();  
}

/* The basic class for database related event. */
public abstract class DatabaseModificationEvent extends CatalogModificationEvent {
    CatalogDatabase database();
}

/* Event for creating database. */
@PublicEvolving
public class CreateDatabaseEvent extends DatabaseModificationEvent {
    boolean ignoreIfExists();
}

/* Event for altering database. */
@PublicEvolving
public class AlterDatabaseEvent extends DatabaseModificationEvent {
    String oldDatabaseName();
    boolean ignoreIfNotExists();
}

/* Event for dropping database. */
@PublicEvolving
public class DropDatabaseEvent extends DatabaseModificationEvent {
    boolean ignoreIfExists();
}

/**
 * Base table event, provides column list, primary keys, partition keys, watermarks and properties in
 * CatalogBaseTable. The table can be source or sink.
 */
public abstract class TableModificationEvent extends CatalogModificationEvent {
    ObjectIdentifier identifier();
    CatalogBaseTable table();
}

/* Event for creating table. */
@PublicEvolving
public class CreateTableEvent extends CatalogModificationEvent {
    boolean ignoreIfExists();
}

/* Event for altering table, provides all changes for old table. */
@PublicEvolving
public class AlterTableEvent extends CatalogModificationEvent {
    List<TableChange> tableChanges();
    boolean ignoreIfExists();
}

/* Event for dropping table. */
@PublicEvolving
public class DropTableEvent extends CatalogModificationEvent {
    boolean ignoreIfExists();
}

/* Factory for catalog modification listener. */
@PublicEvolving
public interface CatalogModificationListenerFactory {
    CatalogModificationListener createListener(Configuration config, ClassLoader classloader);
}

...

Users can identify the physical connector by CatalogContext and options in CatalogBaseTable through the following processsteps:

1. Get connector name.

Users can get value of option 'connector' from options in CatalogBaseTable  for temporal tables. If it doesn't exist, users can get factory identifier from CatalogContext as connector name. If none of the above exist, users can define the connector name themselves through Class<? extends Catalog> .

...

For Table/SQL jobs, Flink creates table lineages according to tables for source and sink. There're column lineages in table lineages, and Flink jobs can create the dependencies between source and sink columns. Flink creates these lineages for Table/SQL jobs from job plannerplan, the entire processing has nothing to do with users.

...

Code Block
/**
 * Add setLineageEntity method in data stream source.
 */
@Public
public class DataStreamSource {
    private LineageEntity lineageEntity;
 
    public DataStreamSource setLineageEntity(LineageEntity lineageEntity);
}
 
/**
 * Add setLineageEntity and addLineageRelations methods in data stream sink.
 */
@Public
public class DataStreamSink {
    private LineageEntity lineageEntity;
 
    public DataStreamSink setLineageEntity(LineageEntity lineageEntity);

    /* Add lineage relations for data stream jobs. */
    public DataStreamSink addLineageRelations(LineageRelation ... relations);
}

...