You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 32 Next »

Status

Discussion thread-
Vote thread-
JIRA

Unable to render Jira issues macro, execution error.

Release-

Motivation

Flink ETL job consumes data from Source Table and produces result to Sink Table. Source Table creates relationship with Sink Table through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].

This FLIP aims to introduce listeners interface in Flink, users can implement them to report the progress of jobs and meta data to external systems. Flink SQL and Table jobs are supported in the first stage, and DataStream will be consider in the future. The main information is as follows

1. Source and Sink information, such as table name, fields, partition keys, primary keys, watermarks, configurations

2. Job information, such as job id/name, execution mode, scheduler type, logical plan

3. Relationship between Source/Sink and jobs, such as source and sink tables, columns in tables for job

4. Job execution information, such as job status, checkpoints

Public Interfaces

CatalogEventListener

DDL operations such as create/alter/drop tables and etc will generate different events and notify CatalogEventListener . All events for CatalogEventListener extend the basic BaseCatalogEvent and listeners can get catalog from it. Some general events for database/table are defined as follows and more events can be implemented based on the requirements in the future.

/**
 * Different events will be fired when a catalog/database/table is modified. The customized listener can receive these events and then do some specific operations according to the event type.
 */
@PublicEvolving
public interface CatalogEventListener {
    /* Event fired after a catalog/database/table is modified. */
    void onEvent(CatalogEvent catalogEvent);

    /* The basic class for catalog related event. */
    @PublicEvolving
    public abstract class BaseCatalogEvent {
        /* The catalog of the event. */
        Catalog catalog();
        /* The name of catalog. */
        String catalogName();
    }

    /* Event for database creation. */
    @PublicEvolving
    public class CreateDatabaseEvent extends BaseCatalogEvent {
        CatalogDatabase database();
        String databaseName(); 
        boolean ignoreIfExists();
    }

    /* Event for dropping database. */
    @PublicEvolving
    public class DropDatabaseEvent extends BaseCatalogEvent  {
        String databaseName(); 
        boolean ignoreIfExists();
    }

    /* Base table event, provides column list, primary keys, partition keys, watermarks and properties in CatalogBaseTable. The table can be source or sink. */
    public abstract class BaseTableEvent extends BaseCatalogEvent {
        ObjectIdentifier identifier();  
        CatalogBaseTable table();
    }

    /* Event for table creation. */
    @PublicEvolving
    public class CreateTableEvent extends BaseTableEvent {
        boolean ignoreIfExists();
    }

    /* Event for altering table, provides all information in old table and new table. */
    @PublicEvolving
    public class AlterTableEvent extends BaseTableEvent {
        List<TableChange> tableChanges();
        boolean ignoreIfExists();
    }

    /* Event for dropping table. */
    @PublicEvolving
    public class DropTableEvent extends BaseTableEvent {
        boolean ignoreIfExists();   
    }
}

Users can create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1  and hive_catalog2  for the same metastore. The tables hive_catalog1.my_database.my_table  and hive_catalog2.my_database.my_table  are the same table in hive metastore.

In addition, there are two table types: persistent table  and temporal table . The persistent table  can be identified by catalog and database above, while the temporal table  can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka.

TableStorage is added in CatalogTable to identify different Flink tables on the same physical table. TableStorage is created by DynamicTableStorageFactory , which is loaded with specific connector type.

/* Table storage for different physical table. */
@PublicEvolving
public class TableStorage {
    /* Table type, such as kafka, hive, iceberg or paimon. */
    String type();
    /* Physical location which identify the unique physical table. */
    String location();
}

/* Table storage factory is loaded with specific connector type and create {@link TableStorage}. */
@PublicEvolving
public interface DynamicTableStorageFactory extends DynamicTableFactory {
    /* Create table storage for different table type. */
    TableStorage createDynamicTableStorage(Context context);
}

@PublicEvolving
public interface CatalogTable {
    /* Get physical storage for the table. */
    TableStorage tableStorage();
}

JobSubmissionListener 

Before job is submitted, Flink can create logical plan for the job and notify the listener. We add JobSubmissionListener for this and users can create relationships between source/sink tables in it. The logical plan of job is static information which may contains much data and Flink only need to report it once when the job is submitted. Therefor, this listener is on the client side. The RestClusterClient is the input of all jobs such as sql/table/datastream and event other developers who build job themselves and submit job with client.

/**
 * This listener will be notified before job is submitted in {@link RestClusterClient}.
 */
@PublicEvolving
public interface JobSubmissionListener {
    /* Event is fired before a job is submitted. */
    void onEvent(JobSubmissionEvent submissionEvent);

    /* Event for job submission. */
    @PublicEvolving
    public class JobSubmissionEvent {
        JobID jobId();
        String jobName();
        JobLogicalPlan plan();
    }
}


There is JobLogicalPlan in JobSubmissionEvent which describe the job detailed information such as relationships between source/sink tables and columns dependencies. Users can get the plan to report more information about the job.

/**
 * Job logical plan is built according to JobGraph in the client. Users can get sources, sinks and the relationship between nodes from plan.
 */
@PublicEvolvig
public interface JobLogicalPlan {
    JobID jobId();
    String jobName();

    /* Job type, BATCH or STREAMING. */
    String jobType();

    /* Source vertex list. */
    List<JobSourceVertex> sources();

    /* Sink vertex list. */
    List<JobSinkVertex> sinks();

    /* Job configuration. */
    Map<String, String> config();

    /* Source vertex in the job plan. */
    @PublicEvolving
    public class JobSourceVertex {
        String sourceName();
        /* Collect/Table/DataStreamSource. */
        String sourceType();
        /* Source column name list. */
        List<String> columns();
        Map<String, String> config();
    }
 
    /* Sink vertex in the job plan. */
    @PublicEvolving 
    public class JobSinkVertex {
        String sinkName();
        /* Sink column name list. */
        List<String> columns();
        /* Modify type, INSERT/UPDATE/DELETE. */
        String modifyType();
        /* Update mode, APPEND/RETRACT/UPSERT. */
        String updateMode();
        boolean overwrite();
        /* Source name -> source column list. */
        Map<String, SourceColumn> sourceColumns();
        Map<String, String> config();
    }
 
    /* Source column list for sink vertex. */
    @PublicEvolving  
    public class SourceColumn {
        String sourceName();
        List<String> columns();
    }
}

JobExecutionListener

JobManager generates events when status of job is changed or checkpoint is started and notify JobExecutionListener .  JobStatusEvent indicates the status of Flink job in JobStatus with old status, new status and job logical plan.

In addition to status, JobManager would generate CheckpointEvent  for checkpoint when it is started/completed/aborted and notify JobExecutionListener. All checkpoint related events extend CheckpointEvent and more events can be added in the future.

/**
 * When job status is changed in job manager, it will generate job event and notify job execution listener.
 */
@PublicEvolving
public interface JobExecutionListener {
    /* Event fired after job status has been changed. */ 
    void onJobStatusChanged(JobStatusEvent jobStatusEvent);

    /* Job status event with plan. */
    @PublicEvolving
    public class JobStatusEvent {
        JobID jobId();
        String jobName();
        JobStatus oldStatus();
        JobStatus newStatus();
        /* Exception for job when it is failed. */
        @Nullable Throwable exception();
    }
}

Config Customized Listener

Users should add their listeners to the classpath of client and flink cluster, and config them in the following options

# Config catalog event listeners.
table.catalog.listeners: {job catalog listener class1},{job catalog listener class2}

# Existing config job submission listeners.
execution.job-submission-listeners: {job submission listener class1},{job submission listener class2}

# Config job execution listeners.
jobmanager.execution.listeners: {job execution listener class1},{job execution listener class2}

Proposed Changes

Changes for CatalogEventListener

TableEnvironmentImpl creates customized CatalogEventListener according to the option table.catalog.listeners , and put the listener into CatalogManager and AbstractCatalog. TableEnvironmentImpl can use the External component such as Sql-Gateway can create listeners and CatalogManager itself and create TableEnvironmentImpl with them. When DDL related operations are executed in CatalogManager and AbstractCatalog , they should notify the listeners.

Currently there are 

Changes for JobExecutionListener

Flink sql or table jobs are created from Planner which contains exec nodes, then it is converted to Operation , Transformation and StreamGraph. Finally, the jobs are submitted as JobGraph and job managers create ExecutionGraph from it. The operations of source/sink list are as follows.

SourceScan in Planner contains source information such as table name, fields and configurations. But these information is hidden in the Source which is an interface when the SourceScan  is converted to Transformation. We should add source information in the conversion of SourceScan->Operation->Transformation->StreamNode.

Similar to sources, Sink and DataStreamSink contain sink information such as table names and configuration. We should add sink information in the conversion of Sink->Operation->Transformation->StreamNode, then we can add Map<JobVertexID, JobSinkVertexInfo> sources in JobGraph and ExecutionGraph too.

After completing the above changes, JobManager can create JobLogicalPlan  from JobGraph  for JobExecutionListener . When the status of job is changed, DefaultExecutionGraph  in JobManager  will notify the listener. At the same time, this listener will also listen to the execution of checkpoint. When CheckpointCoordinator starts/completes/aborts a specific checkpoint, it will notify the listener too.

Listener Construction and Execution

While the current JobListener is created by an empty constructor, all customized listeners above can be created by a constructor with Configuration or an empty constructor. Flink takes precedence over constructors with Configuration if it is exist.

Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.

Plan For The Future

  1. We add column relationships between job vertex in JobLogicalPlanInfo, but it is not supported in Flink at present. We'd like to implement them in the next FLIP. 
  2. Source/Sink relationships for SQL/Table jobs are supported, DataStream  jobs will be supported later.

  3. Currently we only supports scan source, lookup join source should be supported later.

  4. Add Job vertex listener for batch mode, such as scheduling and execution status of vertex, execution status of subtask, etc.


[1] https://datahub.io/

[2] https://atlas.apache.org/#/

[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store



  • No labels