Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Flink ETL job consumes data from Source Table and produces result to Sink Table. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].

This FLIP aims to introduce listeners interface listener interfaces in Flink, users can implement them to report the progress of jobs and meta data to external systems. Flink SQL and Table jobs are supported in the first stage, and DataStream will be consider in the future. The main information is as follows

...

2. Job information, such as job id/name, execution mode, scheduler job type, logical plan

3. Relationship between Source/Sink and jobs, such as source and sink tables, columns in tables for joband their column lineages.

4. Job execution information, such as job status, checkpoints

Public Interfaces

CatalogEventListener

DDL operations such as create/alter/drop tables and etc will generate different events and notify CatalogEventListener . All events for CatalogEventListener extend the basic Base CatalogEvent and listeners can get catalog from it. Some general events for database/table are defined as follows and more events can be implemented based on the requirements in the future.

Code Block
/**
 * Different events will be fired when a catalog/database/table is modified. The customized listener can receiveget theseand eventsreport andspecific theninformation dofrom somethe specificevent operations according to the event type.
 */
@PublicEvolving
public interface CatalogEventListener {
    /* Event fired after a catalog/database/table is modified. */
    void onEvent(CatalogEvent catalogEvent);

    /* The basic class for catalog related event. */
    public @PublicEvolving
    public abstract class BaseCatalogEventCatalogEvent {
        /* The catalog of the event. */
        Catalog catalog();
        /* The name of catalog. */
        String catalogName();
    }

    /* The basic class for database related event. */
    public abstract class BaseDatabaseEvent extends CatalogEvent {
        String databaseName();  
    }

    /* Event for database creation. */
    @PublicEvolving
    public class CreateDatabaseEvent extends BaseDatabaseEvent {
        CatalogDatabase database();
        boolean ignoreIfExists();
    }

    /* Event for alter database. */
    @PublicEvolving
    public class AlterDatabaseEvent extends BaseDatabaseEvent {
        CatalogDatabase newDatabase();
        boolean ignoreIfNotExists();
    }

    /* Event for dropping database. */
    @PublicEvolving
    public class DropDatabaseEvent extends BaseDatabaseEvent {
        boolean ignoreIfExists();
    }

    /* Base table event, provides column list, primary keys, partition keys, watermarks and properties in CatalogBaseTable. The table can be source or sink. */
    public abstract class BaseTableEvent extends BaseCatalogEventCatalogEvent {
        ObjectIdentifier identifier();  
        CatalogBaseTable table();
    }

    /* Event for table creation. */
    @PublicEvolving
    public class CreateTableEvent extends BaseTableEvent {
        boolean ignoreIfExists();
    }

    /* Event for altering table, provides all information in old table and new table. */
    @PublicEvolving
    public class AlterTableEvent extends BaseTableEvent {
        List<TableChange> tableChanges();
        boolean ignoreIfExists();
    }

    /* Event for dropping table. */
    @PublicEvolving
    public class DropTableEvent extends BaseTableEvent {
        boolean ignoreIfExists();   
    }
}

/* Factory for catalog listener. */
@PublicEvolving
public interface CatalogEventListenerFactory {
    public CatalogEventListener createListener(Configuration configuration, ClassLoader classLoader);
}

/* Add listeners in the catalog context. */
@PublicEvolving
public interface CatalogFactory {
    /** Add listeners in the context. */
    @PublicEvolving
    interface Context {
        /* Get the listeners from context if they are exists. */
        List<CatalogEventListener> listeners();
    }
}

Users can may create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1  and hive_catalog2  for the same metastore. The tables hive_catalog1.my_database.my_table  and hive_catalog2.my_database.my_table  are the same table in hive metastore.

...

StorageIdentifier  is introduced to address these issues, users listeners can get it from CatalogTable . StorageIdentifierFactory creates StorageIdentifier for catalog table.

Code Block
/* Storage identifier for different physical table. */
@PublicEvolving
public class StorageIdentifier {
    /* ExternalStorage storagetype information such as kafka, hive, iceberg or paimon. */
    String type();

    /* Identifier which identify the unique physical table. */
    String identifier();

    /* Properties for the storage identifier, users can get value from different keys for different storages, such as broker and topic for kafka. */
    Map<String, String> properties();
}

/* Storage identifier factory is loaded with specific connector type and create {@link StorageIdentifier}. */
@PublicEvolving
public interface StorageIdentifierFactory {
    /* Create storage identifier for different connector type. */
    StorageIdentifier createDynamicTableStorage(Configuration config, ClassLoader classLoader);
}

@PublicEvolving
public interface CatalogTable {
    /* Get physical storage identifier for the table. */
    StorageIdentifier storage();
}

JobSubmissionListener 

Before job is submitted, Flink can create logical plan for the job and notify the listener. We add JobSubmissionListener for this and users can create relationships between source/sink tables in it. The logical plan of job is static information which may contains much data and Flink only need to report it once when the job is submitted. Therefor, this listener is on the client side. The RestClusterClient is the input JobSubmissionListener is introduced in this FLIP to report job submission event with job logical plan which has source/sink lineages information. Source and sink lineages are static information for a job, they can be reported before job is submitted once. Therefor, the listeners are on the client side.  The RestClusterClient is the gateway of all jobs such as sql/table/datastream and event other developers who build job themselves and submit job with client. RestClusterClient will create JobSubmissionListeners and notify them before the specific job is submitted.

Code Block
/**
 * This listener will be notified before job is submitted in {@link RestClusterClient}.
 */
@PublicEvolving
public interface JobSubmissionListener {
    /* Event is fired before a job is submitted. */
    void onEvent(JobSubmissionEvent submissionEvent);

    /* Event for job submission. */
    @PublicEvolving
    public class JobSubmissionEvent {
        JobID jobId();
        String jobName();
        JobLogicalPlan plan();
    }
}

/* Factory for job submission listener. */
@PublicEvolving
public interface JobSubmissionListenerFactory {
    public JobSubmissionListener createListener(Configuration configuration, ClassLoader classLoader);
}

There is JobLogicalPlan in JobSubmissionEvent which describe the job detailed information such as relationships between source/sink tables lineages and columns dependenciescolumn lineages. Users can get the plan from the event to report more information about the job.

Code Block
/**
 * Job logical plan is built according to JobGraphStreamGraph in the client. Users can get sources, sinks and the relationship between nodes from plan.
 */
@PublicEvolvig
public interface JobLogicalPlan {
    /* Job type, BATCHTableOrSQL or STREAMINGDataStream. */
    String jobType(); 

      /* Source lineage list Job execution type, BATCH or STREAMING. */
    List<JobSourceLineage>String sourcesexecutionType();

      /* SinkSource lineage list. */
    List<JobSinkLineage>List<JobSourceLineage> sinkssources(); 

    /* TrueSink if it's a table or sql job, otherwise return Falselineage list. */
    booleanList<JobSinkLineage> tableOrSqlJobsinks(); 

    /* Get sink table descriptor for table/sql job with given sink identifier, it will always get null for DataStream jobs. */
    SinkTableDescriptor sinkTable(String identifier);

    /* Job configuration. */
    Map<String, String> config(); 
}
 
/* Source info of the job plan. */
@PublicEvolving
public class JobSourceLineage {
    StorageIdentifier identifier();

    /* Source column name list. */
    List<String> columns();
}
 
/* Sink info of the job plan. */
@PublicEvolving 
public class JobSinkLineage {
    StorageIdentifier identifier();

    /* Sink column name list. */
    List<String> columns();

    /* Source column lineages, the key is the column in the sink and the value is the source columns list. */
    Map<String, List<SourceColumnLineage>> columnLineages();
}
 
/* Source column list for sink vertex. */
@PublicEvolving  
public class SourceColumnLineage {
    /* Source identifier. */
    String identifier();

    /* Source name list for the given sink. */
    List<String> columns();
}

@PublicEvolving
public class SinkTableDescriptor {
    /* Modify type, INSERT/UPDATE/DELETE. */
    String modifyType();

    /* Update mode, APPEND/RETRACT/UPSERT. */
    String updateMode();
    boolean overwrite();
}

Flink creates JobSourceLineage and JobSinkLineage for table/sql jobs, and for DataStream jobs, users need to create the lineages themselves and set them manually by setXXX methods as follows.

...

JobExecutionListener

JobManager generates generates events and notify JobExecutionListener when job status of job is changed or checkpoint is started and notify JobExecutionListener .  JobStatusEvent indicates the status of Flink job in JobStatus with old status, new status and job logical planexception.

Code Block
/**
 * When job status is changed in job manager, it will generate job event and notify job execution listener.
 */
@PublicEvolving
public interface JobExecutionListener {
    /* Event fired after job status has been changed. */ 
    void onEvent(JobStatusEvent jobStatusEvent);

    /* Job status event with plan. */
    @PublicEvolving
    public class JobStatusEvent {
        JobID jobId();
        String jobName();
        JobStatus oldStatus();
        JobStatus newStatus();
        /* Exception for job when it is failed. */
        @Nullable Throwable exception();
    }
}

/* Factory for job execution listener. */
@PublicEvolving
public interface JobExecutionListenerFactory {
    public JobExecutionListener createListener(Configuration configuration, ClassLoader classLoader);
}

...