Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In addition, there are two table types: persistent table  and temporal table . The persistent table  can be identified by catalog and database above, while the temporal table  can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka.ExternalStorage is added in CatalogTable to identify different Flink tables on the same physical table. TableStorage is created by ExternalStorageFactory , which is loaded with specific connector type.

StorageIdentifier  is introduced to address these issues, users can get it from CatalogTable . StorageIdentifierFactory creates StorageIdentifier for catalog table.

Code Block
/* ExternalStorage storageidentifier for different physical table. */
@PublicEvolving
public interfaceclass ExternalStorageStorageIdentifier {
    /* External storage information such as kafka, hive, iceberg or paimon. */
    String type();

    /* PhysicalIdentifier location which identify the unique physical table. */
    String locationidentifier();
}

    /* ExternalProperties storagefor factory the storage identifier, users can get value from different keys for different storages. */
    Map<String, String> properties();
}

/* Storage identifier factory is loaded with specific connector type and create {@link ExternalStorageStorageIdentifier}. */
@PublicEvolving
public interface ExternalStorageFactoryStorageIdentifierFactory {
    /* Create externalstorage storageidentifier for different connector type. */
    DynamicTableStorageStorageIdentifier createDynamicTableStorage(Configuration config);
}

@PublicEvolving
public interface CatalogTable {
    /* Get physical storage identifier for the table. */
    ExternalStorageStorageIdentifier storage();
}

JobSubmissionListener 

...

Code Block
/**
 * Job logical plan is built according to JobGraph in the client. Users can get sources, sinks and the relationship between nodes from plan.
 */
@PublicEvolvig
public interface JobLogicalPlan {
    /* Job type, BATCH or STREAMING. */
    String jobType(); 

      /* Source infolineage list. */
    List<JobSourceInfo>List<JobSourceLineage> sources();

    /* Sink infolineage list. */
    List<JobSinkInfo>List<JobSinkLineage> sinks(); 

    /* Job configurationTrue if it's a table or sql job, otherwise return False. */
    Map<String, String> configboolean tableOrSqlJob();

    /* SourceGet andsink columntable lineagesdescriptor for the sink, the key istable/sql job with given sink name and the value is source and column lineagesidentifier. */
    SinkTableDescriptor sinkTable(String identifier);

    /* Job configuration. */
    Map<String, SourceColumnLineage>String> columnLineagesconfig(); 
}
 
/* Source info of the job plan. */
@PublicEvolving
public class JobSourceInfoJobSourceLineage {
    ExternalStorageStorageIdentifier sourceidentifier();

    /* Collect/Table/DataStreamSourceSource column name list. */
    StringList<String> sourceTypecolumns();
}
 
/* Sink info /*of Sourcethe columnjob name listplan. */
@PublicEvolving 
public class  List<String> columns();JobSinkLineage {
     Map<String, String> configStorageIdentifier identifier();
}
 
   /* Sink infocolumn of the job planname list. */
@PublicEvolving 
public class JobSinkInfo {
    ExternalStorage sinkList<String> columns();

    /* SinkSource column namelineages, list. */
    List<String> columns();

    /* Modify type, INSERT/UPDATE/DELETE. */
    String modifyType();

    /* Update mode, APPEND/RETRACT/UPSERT. */
    String updateMode();
    boolean overwrite();the key is the column in the sink and the value is the source columns list. */
    Map<String, String>List<SourceColumnLineage>> configcolumnLineages();
}
 
/* Source column list for sink vertex. */
@PublicEvolving  
public class SourceColumnLineage {
    /* SinkSource nameidentifier. */
    String sinkNameidentifier();

    /* Source name list for the given sink. */
    List<String> sourceNamescolumns();
}

@PublicEvolving
public class SinkTableDescriptor {
    /* ColumnModify lineagestype, the key is the column in the sink and the value is source column listINSERT/UPDATE/DELETE. */
    String modifyType();

    /* Update mode, APPEND/RETRACT/UPSERT. */
    Map<String, ColumnLineage> columnLineagesString updateMode();
    boolean overwrite();
}

/* Source columns which are used to generate sink column. */
@PublicEvolving

Flink creates JobSourceLineage and JobSinkLineage for table/sql jobs, and for DataStream jobs, users need to set them manually by setXXX methods as follows.

Code Block
/**
 * Add set source lineage method in data stream source.
 */
@Public
public class ColumnLineageDataStreamSource {
    /*private The sink column. */
    String sinkColumn();

    /* Source Name -> Source Columns. */JobSourceLineage lineage;

    public DataStreamSource setLineage(JobSourceLineage lineage);
}

/**
 * Add set sink lineage method in data stream sink.
 */
@Public
public class DataStreamSink {
    Map<String,private List<String>>JobSinkLineage sourceColumns();
}

JobExecutionListener

lineage;

    public DataStreamSink setLineage(JobSinkLineage lineage);
}

JobExecutionListener

JobManager generates events when status of job is changed or checkpoint is started and notify JobExecutionListener .  JobStatusEvent indicates the status of Flink job in JobStatus with old status, new status and job logical plan.

Code Block
/**
 * When job status is changed in job manager, it will generate job event and notify job execution listener.
 */
@PublicEvolving
public interface JobExecutionListener {
    /* Event fired after job status has been changed. */ 
    void onJobStatusChangedonEvent(JobStatusEvent jobStatusEvent);

    /* Job status event with plan. */
    @PublicEvolving
    public class JobStatusEvent {
        JobID jobId();
        String jobName();
        JobStatus oldStatus();
        JobStatus newStatus();
        /* Exception for job when it is failed. */
        @Nullable Throwable exception();
    }
}

/* Factory for job execution listener. */
@PublicEvolving
public interface JobExecutionListenerFactory {
    public JobExecutionListener createListener(Configuration configuration, ClassLoader classLoader);
}

...

Changes for JobSubmissionListener

Build JobLogicalPlan in StreamGraph

Flink creates Planner for sql and table jobs which contains exec nodes, then the planner will be converted to Operation , Transformation and StreamGraph. DataStream jobs are similar with SQL, Flink create datastream from environment and converted it to Transformation and StreamGraph. The job conversion is shown as followed. 

draw.io Diagram
bordertrue
diagramNameflow
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth1141882
revision1617

There is a graph structure in StreamGraph , we can create JobLogicalPlan based on StreamGraph easily. 

Changes for JobExecutionListener

Flink sql or table jobs are created from Planner which contains exec nodes, then it is converted to Operation , Transformation and StreamGraph. Finally, the jobs are submitted as JobGraph and job managers create ExecutionGraph from it. The operations of source/sink list are as follows.

SourceScan in Planner contains source information such as table name, fields and configurations. But these information is hidden in the Source which is an interface when the SourceScan  is converted to Transformation. We should add source information in the conversion of SourceScan->Operation->Transformation->StreamNode.

Similar to sources, Sink and DataStreamSink contain sink information such as table names and configuration. We should add sink information in the conversion of Sink->Operation->Transformation->StreamNode, then we can add Map<JobVertexID, JobSinkVertexInfo> sources in JobGraph and ExecutionGraph too.

After completing the above changes, JobManager can create JobLogicalPlan  from JobGraph  for JobExecutionListener . When the status of job is changed, DefaultExecutionGraph  in JobManager  will notify the listener. At the same time, this listener will also listen to the execution of checkpoint. When CheckpointCoordinator starts/completes/aborts a specific checkpoint, it will notify the listener too.

Listener Construction and Execution

While the current JobListener is created by an empty constructor, all customized listeners above can be created by a constructor with Configuration or an empty constructor. Flink takes precedence over constructors with Configuration if it is exist.

For table and SQL jobs, Flink translate exec node of planner to Transformation in method ExecNodeBase.translateToPlanInternal for source and sink. There's resolved table in source and sink nodes, Flink can create source and sink lineage based on the table and save them in source/sink transformation.

draw.io Diagram
bordertrue
diagramNameexecnode
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth761
revision1

For DataStream  jobs, DataStreamSource set the source lineage to source transformation in setLineage method, and DataStreamSink does the same thing.

Finally source and sink transformations are translated and added in StreamGraph by SourceTransformationTranslator.translateInternal  and SinkTransformationTranslator.translateInternal , where source and sink lineage information can be added to StreamGraph too.

draw.io Diagram
bordertrue
diagramNamesavestreamgraph
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth601
revision1

Create and notify listeners in RestClusterClient

RestClusterClient  can read option of listener from Configuration and create JobSubmissionListener . But there is no StreamGraph in RestClusterClient and submitJob method. RestClusterClient.submitJob is used in AbstractSessionClusterExecutor , which will convert Pipeline to JobGraph and submit it. AbstractSessionClusterExecutor can set StreamGraph in RestClusterClient before calling its submitJob method, then the specific RestClusterClient can get the StreamGraph and notify the listener before JobGraph is submitted.

Code Block
/* Set pipeline to client before it submit job graph.  */
public class AbstractSessionClusterExecutor {
    public CompletableFuture<JobClient> execute(
            @Nonnull final Pipeline pipeline,
            @Nonnull final Configuration configuration,
            @Nonnull final ClassLoader userCodeClassloader)
            throws Exception {
        ....

        final ClusterClientProvider<ClusterID> clusterClientProvider =
                    clusterDescriptor.retrieve(clusterID);
        ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();

        // Set pipeline to the cluster client before submit job graph
        clusterClient.setPipeline(pipeline); 

        return clusterClient
                    .submitJob(jobGraph)
                    ....  
    }
}

/* Create job submission event and notify listeners before it submit job graph. */
public class RestClusterClient {
    private final List<JobSubmissionListener> listeners;
    private Pipeline pipeline;

    @Override
    public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
        // Create event and notify listeners before the job graph is submitted.
        JobSubmissionEvent event = createEventFromPipeline(pipeline);
        if (event != null) {
            listeners.forEach(listener -> listener.onEvent(event));
        }
        
        ....;
    }
}

Changes for JobExecutionListener

JobManager  can create JobExecutionListener in DefaultExecutionGraph according to options in job configuration. Currently JobManager will call DefaultExecutionGraph.transitionState when the job status changes, JobExecutionListener can be notified in the method as follows.

Code Block
/* Set pipeline to client before it submit job graph.  */
public class DefaultExecutionGraph {
    private final List<JobExecutionListener> executionListeners;

    private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
        ....;
        notifyJobStatusHooks(newState, error);
        // notify job execution listeners
        notifyJobExecutionListeners(current, newState, error);
        ....;
    }

    private void notifyJobExecutionListeners(JobStatus current, JobStatus newState, Throwable error) {
        JobStatusEvent event = create job status event;
        executionListeners.forEach(listener -> listener.onEvent(event));
    }
}

Listener Construction and Execution

Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.

...

  1. We add column relationships between job vertex in JobLogicalPlanInfoJobLogicalPlan, but it is not supported in Flink at present. We'd like to implement them in the next FLIP. Source/Sink relationships for SQL/Table jobs are supported, DataStream  jobs will be supported later.
  2. Currently we only supports scan source, lookup join source should be supported later.

  3. Add Job vertex listener for batch mode, such as scheduling and execution status of vertex, execution status of subtask, etc.

...