...
In addition, there are two table types: persistent table
and temporal table
. The persistent table
can be identified by catalog and database above, while the temporal table
can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka
.ExternalStorage
is added in CatalogTable
to identify different Flink tables on the same physical table. TableStorage is created by ExternalStorageFactory
, which is loaded with specific connector type.
StorageIdentifier is introduced to address these issues, users can get it from CatalogTable
. StorageIdentifierFactory
creates StorageIdentifier
for catalog table.
Code Block |
---|
/* ExternalStorage storageidentifier for different physical table. */ @PublicEvolving public interfaceclass ExternalStorageStorageIdentifier { /* External storage information such as kafka, hive, iceberg or paimon. */ String type(); /* PhysicalIdentifier location which identify the unique physical table. */ String locationidentifier(); } /* ExternalProperties storagefor factory the storage identifier, users can get value from different keys for different storages. */ Map<String, String> properties(); } /* Storage identifier factory is loaded with specific connector type and create {@link ExternalStorageStorageIdentifier}. */ @PublicEvolving public interface ExternalStorageFactoryStorageIdentifierFactory { /* Create externalstorage storageidentifier for different connector type. */ DynamicTableStorageStorageIdentifier createDynamicTableStorage(Configuration config); } @PublicEvolving public interface CatalogTable { /* Get physical storage identifier for the table. */ ExternalStorageStorageIdentifier storage(); } |
JobSubmissionListener
...
Code Block |
---|
/** * Job logical plan is built according to JobGraph in the client. Users can get sources, sinks and the relationship between nodes from plan. */ @PublicEvolvig public interface JobLogicalPlan { /* Job type, BATCH or STREAMING. */ String jobType(); /* Source infolineage list. */ List<JobSourceInfo>List<JobSourceLineage> sources(); /* Sink infolineage list. */ List<JobSinkInfo>List<JobSinkLineage> sinks(); /* Job configurationTrue if it's a table or sql job, otherwise return False. */ Map<String, String> configboolean tableOrSqlJob(); /* SourceGet andsink columntable lineagesdescriptor for the sink, the key istable/sql job with given sink name and the value is source and column lineagesidentifier. */ SinkTableDescriptor sinkTable(String identifier); /* Job configuration. */ Map<String, SourceColumnLineage>String> columnLineagesconfig(); } /* Source info of the job plan. */ @PublicEvolving public class JobSourceInfoJobSourceLineage { ExternalStorageStorageIdentifier sourceidentifier(); /* Collect/Table/DataStreamSourceSource column name list. */ StringList<String> sourceTypecolumns(); } /* Sink info /*of Sourcethe columnjob name listplan. */ @PublicEvolving public class List<String> columns();JobSinkLineage { Map<String, String> configStorageIdentifier identifier(); } /* Sink infocolumn of the job planname list. */ @PublicEvolving public class JobSinkInfo { ExternalStorage sinkList<String> columns(); /* SinkSource column namelineages, list. */ List<String> columns(); /* Modify type, INSERT/UPDATE/DELETE. */ String modifyType(); /* Update mode, APPEND/RETRACT/UPSERT. */ String updateMode(); boolean overwrite();the key is the column in the sink and the value is the source columns list. */ Map<String, String>List<SourceColumnLineage>> configcolumnLineages(); } /* Source column list for sink vertex. */ @PublicEvolving public class SourceColumnLineage { /* SinkSource nameidentifier. */ String sinkNameidentifier(); /* Source name list for the given sink. */ List<String> sourceNamescolumns(); } @PublicEvolving public class SinkTableDescriptor { /* ColumnModify lineagestype, the key is the column in the sink and the value is source column listINSERT/UPDATE/DELETE. */ String modifyType(); /* Update mode, APPEND/RETRACT/UPSERT. */ Map<String, ColumnLineage> columnLineagesString updateMode(); boolean overwrite(); } /* Source columns which are used to generate sink column. */ @PublicEvolving |
Flink creates JobSourceLineage
and JobSinkLineage
for table/sql jobs, and for DataStream
jobs, users need to set them manually by setXXX
methods as follows.
Code Block |
---|
/** * Add set source lineage method in data stream source. */ @Public public class ColumnLineageDataStreamSource { /*private The sink column. */ String sinkColumn(); /* Source Name -> Source Columns. */JobSourceLineage lineage; public DataStreamSource setLineage(JobSourceLineage lineage); } /** * Add set sink lineage method in data stream sink. */ @Public public class DataStreamSink { Map<String,private List<String>>JobSinkLineage sourceColumns(); } |
JobExecutionListener
lineage;
public DataStreamSink setLineage(JobSinkLineage lineage);
} |
JobExecutionListener
JobManager
generates events when status of job is changed or checkpoint is started and notify JobExecutionListener
. JobStatusEvent
indicates the status of Flink job in JobStatus
with old status, new status and job logical plan.
Code Block |
---|
/** * When job status is changed in job manager, it will generate job event and notify job execution listener. */ @PublicEvolving public interface JobExecutionListener { /* Event fired after job status has been changed. */ void onJobStatusChangedonEvent(JobStatusEvent jobStatusEvent); /* Job status event with plan. */ @PublicEvolving public class JobStatusEvent { JobID jobId(); String jobName(); JobStatus oldStatus(); JobStatus newStatus(); /* Exception for job when it is failed. */ @Nullable Throwable exception(); } } /* Factory for job execution listener. */ @PublicEvolving public interface JobExecutionListenerFactory { public JobExecutionListener createListener(Configuration configuration, ClassLoader classLoader); } |
...
Changes for JobSubmissionListener
Build JobLogicalPlan in StreamGraph
Flink creates Planner
for sql and table jobs which contains exec nodes, then the planner will be converted to Operation
, Transformation
and StreamGraph
. DataStream
jobs are similar with SQL, Flink create datastream from environment and converted it to Transformation
and StreamGraph
. The job conversion is shown as followed.
draw.io Diagram border true diagramName flow simpleViewer false width links auto tbstyle top lbox true diagramWidth 1141882 revision 1617
There is a graph structure in StreamGraph
, we can create JobLogicalPlan
based on StreamGraph
easily.
Changes for JobExecutionListener
Flink sql or table jobs are created from Planner
which contains exec nodes, then it is converted to Operation
, Transformation
and StreamGraph
. Finally, the jobs are submitted as JobGraph
and job managers create ExecutionGraph
from it. The operations of source/sink list are as follows.
SourceScan
in Planner
contains source information such as table name, fields and configurations. But these information is hidden in the Source
which is an interface when the SourceScan
is converted to Transformation
. We should add source information in the conversion of SourceScan
->Operation->Transformation
->StreamNode
.
Similar to sources, Sink
and DataStreamSink
contain sink information such as table names and configuration. We should add sink information in the conversion of Sink
->Operation->Transformation
->StreamNode
, then we can add Map<JobVertexID, JobSinkVertexInfo> sources
in JobGraph
and ExecutionGraph
too.
After completing the above changes, JobManager
can create JobLogicalPlan
from JobGraph
for JobExecutionListener
. When the status of job is changed, DefaultExecutionGraph
in JobManager
will notify the listener. At the same time, this listener will also listen to the execution of checkpoint. When CheckpointCoordinator
starts/completes/aborts a specific checkpoint, it will notify the listener too.
Listener Construction and Execution
While the current JobListener
is created by an empty constructor, all customized listeners above can be created by a constructor with Configuration
or an empty constructor. Flink takes precedence over constructors with Configuration if it is exist.
For table and SQL jobs, Flink translate exec node of planner to Transformation
in method ExecNodeBase.translateToPlanInternal
for source and sink. There's resolved table in source and sink nodes, Flink can create source and sink lineage based on the table and save them in source/sink transformation.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
For DataStream
jobs, DataStreamSource
set the source lineage to source transformation in setLineage
method, and DataStreamSink
does the same thing.
Finally source and sink transformations are translated and added in StreamGraph
by SourceTransformationTranslator.translateInternal
and SinkTransformationTranslator.translateInternal
, where source and sink lineage information can be added to StreamGraph
too.
draw.io Diagram | ||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
Create and notify listeners in RestClusterClient
RestClusterClient
can read option of listener from Configuration
and create JobSubmissionListener
. But there is no StreamGraph
in RestClusterClient
and submitJob
method. RestClusterClient.submitJob
is used in AbstractSessionClusterExecutor
, which will convert Pipeline
to JobGraph
and submit it. AbstractSessionClusterExecutor
can set StreamGraph
in RestClusterClient
before calling its submitJob
method, then the specific RestClusterClient
can get the StreamGraph
and notify the listener before JobGraph
is submitted.
Code Block |
---|
/* Set pipeline to client before it submit job graph. */
public class AbstractSessionClusterExecutor {
public CompletableFuture<JobClient> execute(
@Nonnull final Pipeline pipeline,
@Nonnull final Configuration configuration,
@Nonnull final ClassLoader userCodeClassloader)
throws Exception {
....
final ClusterClientProvider<ClusterID> clusterClientProvider =
clusterDescriptor.retrieve(clusterID);
ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();
// Set pipeline to the cluster client before submit job graph
clusterClient.setPipeline(pipeline);
return clusterClient
.submitJob(jobGraph)
....
}
}
/* Create job submission event and notify listeners before it submit job graph. */
public class RestClusterClient {
private final List<JobSubmissionListener> listeners;
private Pipeline pipeline;
@Override
public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
// Create event and notify listeners before the job graph is submitted.
JobSubmissionEvent event = createEventFromPipeline(pipeline);
if (event != null) {
listeners.forEach(listener -> listener.onEvent(event));
}
....;
}
} |
Changes for JobExecutionListener
JobManager
can create JobExecutionListener
in DefaultExecutionGraph
according to options in job configuration. Currently JobManager
will call DefaultExecutionGraph.transitionState
when the job status changes, JobExecutionListener
can be notified in the method as follows.
Code Block |
---|
/* Set pipeline to client before it submit job graph. */
public class DefaultExecutionGraph {
private final List<JobExecutionListener> executionListeners;
private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
....;
notifyJobStatusHooks(newState, error);
// notify job execution listeners
notifyJobExecutionListeners(current, newState, error);
....;
}
private void notifyJobExecutionListeners(JobStatus current, JobStatus newState, Throwable error) {
JobStatusEvent event = create job status event;
executionListeners.forEach(listener -> listener.onEvent(event));
}
} |
Listener Construction and Execution
Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.
...
- We add column relationships between job vertex in JobLogicalPlanInfoJobLogicalPlan, but it is not supported in Flink at present. We'd like to implement them in the next FLIP. Source/Sink relationships for SQL/Table jobs are supported,
DataStream
jobs will be supported later. Currently we only supports scan source, lookup join source should be supported later.
Add Job vertex listener for batch mode, such as scheduling and execution status of vertex, execution status of subtask, etc.
...