Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

In addition, there are two table types: persistent table  and temporal table . The persistent table  can be identified by catalog and database above, while the temporal table  can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka.

TableStorage ExternalStorage is added in CatalogTable to identify different Flink tables on the same physical table. TableStorage is created by DynamicTableStorageFactory ExternalStorageFactory , which is loaded with specific connector type.

Code Block
/* TableExternal storage for different physical table. */
@PublicEvolving
public classinterface TableStorageExternalStorage {
    /* External Tablestorage type,information such as kafka, hive, iceberg or paimon. */
    String type();
    /* Physical location which identify the unique physical table. */
    String location();
}

/* TableExternal storage factory is loaded with specific connector type and create {@link TableStorageExternalStorage}. */
@PublicEvolving
public interface DynamicTableStorageFactory extendsExternalStorageFactory DynamicTableFactory {
    /* Create tableexternal storage for different tableconnector type. */
    TableStorageDynamicTableStorage createDynamicTableStorage(ContextConfiguration contextconfig);
}

@PublicEvolving
public interface CatalogTable {
    /* Get physical storage for the table. */
    TableStorageExternalStorage tableStoragestorage();
}

JobSubmissionListener 

...

Code Block
/**
 * Job logical plan is built according to JobGraph in the client. Users can get sources, sinks and the relationship between nodes from plan.
 */
@PublicEvolvig
public interface JobLogicalPlan {
    /* Job type, BATCH or STREAMING. */
    String jobType();

    /* Source info list. */
    List<JobSourceInfo> sources();

    /* Sink info list. */
    List<JobSinkInfo> sinks();

    /* Job configuration. */
    Map<String, String> config();

    /* Source and column lineages for the sink, the key is sink name and the value is source and column lineages. */
    Map<String, SourceColumnLineage> columnLineages();
}
 
/* Source info of the job plan. */
@PublicEvolving
public class JobSourceInfo {
    StringExternalStorage sourceNamesource();

    /* Collect/Table/DataStreamSource. */
    String sourceType();

    /* Source column name list. */
    List<String> columns();
    Map<String, String> config();
}
 
/* Sink info of the job plan. */
@PublicEvolving 
public class JobSinkInfo {
    StringExternalStorage sinkNamesink();

    /* Sink column name list. */
    List<String> columns();

    /* Modify type, INSERT/UPDATE/DELETE. */
    String modifyType();

    /* Update mode, APPEND/RETRACT/UPSERT. */
    String updateMode();
    boolean overwrite();
    Map<String, String> config();
}
 
/* Source column list for sink vertex. */
@PublicEvolving  
public class SourceColumnLineage {
    /* Sink name. */
    String sinkName();

    /* Source name list for the given sink. */
    List<String> sourceNames();

    /* Column lineages, the key is the column in the sink and the value is source column list. */
    Map<String, ColumnLineage> columnLineages();
}

/* Source columns which are used to generate sink column. */
@PublicEvolving
public class ColumnLineage {
    /* The sink column. */
    String sinkColumn();

    /* Source Name -> Source Columns. */
    Map<String, List<String>> sourceColumns();
}

...

JobManager generates events when status of job is changed or checkpoint is started and notify JobExecutionListener .  JobStatusEvent indicates the status of Flink job in JobStatus with old status, new status and job logical plan.In addition to status, JobManager would generate CheckpointEvent  for checkpoint when it is started/completed/aborted and notify JobExecutionListener. All checkpoint related events extend CheckpointEvent and more events can be added in the future.

Code Block
/**
 * When job status is changed in job manager, it will generate job event and notify job execution listener.
 */
@PublicEvolving
public interface JobExecutionListener {
    /* Event fired after job status has been changed. */ 
    void onJobStatusChanged(JobStatusEvent jobStatusEvent);

    /* Job status event with plan. */
    @PublicEvolving
    public class JobStatusEvent {
        JobID jobId();
        String jobName();
        JobStatus oldStatus();
        JobStatus newStatus();
        /* Exception for job when it is failed. */
        @Nullable Throwable exception();
    }
}

/* Factory for job execution listener. */
@PublicEvolving
public interface JobExecutionListenerFactory {
    public JobExecutionListener createListener(Configuration configuration, ClassLoader classLoader);
}

...

Code Block
/* Listeners related operations in the catalog manager. */
public final class CatalogManager {
    /* Create catalog manager with listener list. */
    private CatalogManager(
            String defaultCatalogName,
            Catalog defaultCatalog,
            DataTypeFactory typeFactory,
            ManagedTableListener managedTableListener,
            List<CatalogEventListener> listeners);

    /* Notify the listeners with given catalog event. */
    private void notify(CatalogEvent event) {
        for (CatalogEventListener listener : listeners) {
            listener.onEvent(event);
        }
    }

    /* Notify listener for tables. */
    public void createTable/dropTable/alterTable(...) {
        ....;
        notify(Create Different Table Event);
    }

    /* Builder for catalog manager. */
    public static final class Builder {
        Builder listeners(List<CatalogEventListener> listeners);
    }
}

/* Listeners related operations in AbstractCatalog. */
public abstract class AbstractCatalog implements Catalog {
    /* Create the catalog with listeners. */
    public AbstractCatalog(String name, String defaultDatabase, List<CatalogEventListener> listeners); 

    /**
     * Notify the listeners with given database event, after the customized implementation of AbstractCatalog create/alter/drop a database,
     * it can create the specific event and call the notify method.
     */
    protected void notify(BaseDatabaseEvent event) {
        for (CatalogEventListener listener : listeners) {
            listener.onEvent(event);
        }
    }
}

/* Create default catalog context with listeners. */
public DefaultCatalogContext {
    public DefaultCatalogContext(
            String name,
            Map<String, String> options,
            ReadableConfig configuration,
            ClassLoader classLoader,
            List<CatalogEventListener> listeners);

    /* Get catalog event listeners from the context. */
    public List<CatalogEventListener> listeners() {
        return listeners;
    }
}

...

draw.io Diagram
14
bordertrue
diagramNameflow
simpleViewerfalse
width
linksauto
tbstyletop
lboxtrue
diagramWidth13911141
revision16

There is a graph structure in StreamGraph , we can create JobLogicalPlan based on StreamGraph easily. 


Changes for JobExecutionListener

...