...
In addition, there are two table types: persistent table
and temporal table
. The persistent table
can be identified by catalog and database above, while the temporal table
can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka
.
TableStorage
ExternalStorage
is added in CatalogTable
to identify different Flink tables on the same physical table. TableStorage is created by DynamicTableStorageFactory
ExternalStorageFactory
, which is loaded with specific connector type.
Code Block |
---|
/* TableExternal storage for different physical table. */ @PublicEvolving public classinterface TableStorageExternalStorage { /* External Tablestorage type,information such as kafka, hive, iceberg or paimon. */ String type(); /* Physical location which identify the unique physical table. */ String location(); } /* TableExternal storage factory is loaded with specific connector type and create {@link TableStorageExternalStorage}. */ @PublicEvolving public interface DynamicTableStorageFactory extendsExternalStorageFactory DynamicTableFactory { /* Create tableexternal storage for different tableconnector type. */ TableStorageDynamicTableStorage createDynamicTableStorage(ContextConfiguration contextconfig); } @PublicEvolving public interface CatalogTable { /* Get physical storage for the table. */ TableStorageExternalStorage tableStoragestorage(); } |
JobSubmissionListener
...
Code Block |
---|
/** * Job logical plan is built according to JobGraph in the client. Users can get sources, sinks and the relationship between nodes from plan. */ @PublicEvolvig public interface JobLogicalPlan { /* Job type, BATCH or STREAMING. */ String jobType(); /* Source info list. */ List<JobSourceInfo> sources(); /* Sink info list. */ List<JobSinkInfo> sinks(); /* Job configuration. */ Map<String, String> config(); /* Source and column lineages for the sink, the key is sink name and the value is source and column lineages. */ Map<String, SourceColumnLineage> columnLineages(); } /* Source info of the job plan. */ @PublicEvolving public class JobSourceInfo { StringExternalStorage sourceNamesource(); /* Collect/Table/DataStreamSource. */ String sourceType(); /* Source column name list. */ List<String> columns(); Map<String, String> config(); } /* Sink info of the job plan. */ @PublicEvolving public class JobSinkInfo { StringExternalStorage sinkNamesink(); /* Sink column name list. */ List<String> columns(); /* Modify type, INSERT/UPDATE/DELETE. */ String modifyType(); /* Update mode, APPEND/RETRACT/UPSERT. */ String updateMode(); boolean overwrite(); Map<String, String> config(); } /* Source column list for sink vertex. */ @PublicEvolving public class SourceColumnLineage { /* Sink name. */ String sinkName(); /* Source name list for the given sink. */ List<String> sourceNames(); /* Column lineages, the key is the column in the sink and the value is source column list. */ Map<String, ColumnLineage> columnLineages(); } /* Source columns which are used to generate sink column. */ @PublicEvolving public class ColumnLineage { /* The sink column. */ String sinkColumn(); /* Source Name -> Source Columns. */ Map<String, List<String>> sourceColumns(); } |
...
JobManager
generates events when status of job is changed or checkpoint is started and notify JobExecutionListener
. JobStatusEvent
indicates the status of Flink job in JobStatus
with old status, new status and job logical plan.In addition to status, JobManager
would generate CheckpointEvent
for checkpoint when it is started/completed/aborted and notify JobExecutionListener
. All checkpoint related events extend CheckpointEvent
and more events can be added in the future.
Code Block |
---|
/** * When job status is changed in job manager, it will generate job event and notify job execution listener. */ @PublicEvolving public interface JobExecutionListener { /* Event fired after job status has been changed. */ void onJobStatusChanged(JobStatusEvent jobStatusEvent); /* Job status event with plan. */ @PublicEvolving public class JobStatusEvent { JobID jobId(); String jobName(); JobStatus oldStatus(); JobStatus newStatus(); /* Exception for job when it is failed. */ @Nullable Throwable exception(); } } /* Factory for job execution listener. */ @PublicEvolving public interface JobExecutionListenerFactory { public JobExecutionListener createListener(Configuration configuration, ClassLoader classLoader); } |
...
Code Block |
---|
/* Listeners related operations in the catalog manager. */ public final class CatalogManager { /* Create catalog manager with listener list. */ private CatalogManager( String defaultCatalogName, Catalog defaultCatalog, DataTypeFactory typeFactory, ManagedTableListener managedTableListener, List<CatalogEventListener> listeners); /* Notify the listeners with given catalog event. */ private void notify(CatalogEvent event) { for (CatalogEventListener listener : listeners) { listener.onEvent(event); } } /* Notify listener for tables. */ public void createTable/dropTable/alterTable(...) { ....; notify(Create Different Table Event); } /* Builder for catalog manager. */ public static final class Builder { Builder listeners(List<CatalogEventListener> listeners); } } /* Listeners related operations in AbstractCatalog. */ public abstract class AbstractCatalog implements Catalog { /* Create the catalog with listeners. */ public AbstractCatalog(String name, String defaultDatabase, List<CatalogEventListener> listeners); /** * Notify the listeners with given database event, after the customized implementation of AbstractCatalog create/alter/drop a database, * it can create the specific event and call the notify method. */ protected void notify(BaseDatabaseEvent event) { for (CatalogEventListener listener : listeners) { listener.onEvent(event); } } } /* Create default catalog context with listeners. */ public DefaultCatalogContext { public DefaultCatalogContext( String name, Map<String, String> options, ReadableConfig configuration, ClassLoader classLoader, List<CatalogEventListener> listeners); /* Get catalog event listeners from the context. */ public List<CatalogEventListener> listeners() { return listeners; } } |
...
draw.io Diagram border true diagramName flow simpleViewer false width links auto tbstyle top lbox true diagramWidth 13911141 revision 1416
There is a graph structure in StreamGraph
, we can create JobLogicalPlan
based on StreamGraph
easily.
Changes for JobExecutionListener
...