You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 38 Next »

Status

Discussion thread-
Vote thread-
JIRA

Unable to render Jira issues macro, execution error.

Release-

Motivation

Flink ETL job consumes data from Source and produces result to Sink. Source creates relationship with Sink through Flink ETL job. Flink needs a mechanism for users to report these relationships to external systems, such as meta system Datahub [1], Atlas [2] and meta store we mentioned in FLIP-276 [3].

This FLIP aims to introduce listener interfaces in Flink, users can implement them to report the meta data to external systems. The main information is as follows

1. Source and Sink information, such as table name, fields, partition keys, primary keys, watermarks, configurations

2. Job information, such as job id/name, job type, logical plan

3. Relationship between Source/Sink and jobs, such as source and sink and their column lineages.

4. Job execution information, such as job status

Public Interfaces

CatalogEventListener

DDL operations such as create/alter/drop tables will generate different events and notify CatalogEventListener . All events for CatalogEventListener extend the basic CatalogEvent and listeners can get catalog from it. Some general events for database/table are defined as follows and more events can be implemented based on the requirements in the future.

/**
 * Different events will be fired when a catalog/database/table is modified. The customized listener can get and report specific information from the event according to the event type.
 */
@PublicEvolving
public interface CatalogEventListener {
    /* Event fired after a catalog/database/table is modified. */
    void onEvent(CatalogEvent catalogEvent);

    /* The basic class for catalog related event. */
    @PublicEvolving
    public abstract class CatalogEvent {
        /* The catalog of the event. */
        Catalog catalog();
        /* The name of catalog. */
        String catalogName();
    }

    /* The basic class for database related event. */
    public abstract class BaseDatabaseEvent extends CatalogEvent {
        String databaseName();  
    }

    /* Event for database creation. */
    @PublicEvolving
    public class CreateDatabaseEvent extends BaseDatabaseEvent {
        CatalogDatabase database();
        boolean ignoreIfExists();
    }

    /* Event for alter database. */
    @PublicEvolving
    public class AlterDatabaseEvent extends BaseDatabaseEvent {
        CatalogDatabase newDatabase();
        boolean ignoreIfNotExists();
    }

    /* Event for dropping database. */
    @PublicEvolving
    public class DropDatabaseEvent extends BaseDatabaseEvent {
        boolean ignoreIfExists();
    }

    /* Base table event, provides column list, primary keys, partition keys, watermarks and properties in CatalogBaseTable. The table can be source or sink. */
    public abstract class BaseTableEvent extends CatalogEvent {
        ObjectIdentifier identifier();  
        CatalogBaseTable table();
    }

    /* Event for table creation. */
    @PublicEvolving
    public class CreateTableEvent extends BaseTableEvent {
        boolean ignoreIfExists();
    }

    /* Event for altering table, provides all information in old table and new table. */
    @PublicEvolving
    public class AlterTableEvent extends BaseTableEvent {
        List<TableChange> tableChanges();
        boolean ignoreIfExists();
    }

    /* Event for dropping table. */
    @PublicEvolving
    public class DropTableEvent extends BaseTableEvent {
        boolean ignoreIfExists();   
    }
}

/* Factory for catalog listener. */
@PublicEvolving
public interface CatalogEventListenerFactory {
    public CatalogEventListener createListener(Configuration configuration, ClassLoader classLoader);
}

/* Add listeners in the catalog context. */
@PublicEvolving
public interface CatalogFactory {
    /** Add listeners in the context. */
    @PublicEvolving
    interface Context {
        /* Get the listeners from context if they are exists. */
        List<CatalogEventListener> listeners();
    }
}

Users may create different catalogs on the same physical catalog, for example, create two hive catalog named hive_catalog1  and hive_catalog2  for the same metastore. The tables hive_catalog1.my_database.my_table  and hive_catalog2.my_database.my_table  are the same table in hive metastore.

In addition, there are two table types: persistent table  and temporal table . The persistent table  can be identified by catalog and database above, while the temporal table  can only be identified by properties in ddl. Different temporal tables with the same connector type and related properties are the same physical table in external system, such as two tables for the same topic in Kafka.

StorageIdentifier  is introduced to address these issues, listeners can get it from CatalogTable . StorageIdentifierFactory creates StorageIdentifier for catalog table.

/* Storage identifier for different physical table. */
@PublicEvolving
public class StorageIdentifier {
    /* Storage type such as kafka, hive, iceberg or paimon. */
    String type();

    /* Identifier which identify the unique physical table. */
    String identifier();

    /* Properties for the storage identifier, users can get value from different keys for different storages, such as broker and topic for kafka. */
    Map<String, String> properties();
}

/* Storage identifier factory is loaded with specific connector type and create {@link StorageIdentifier}. */
@PublicEvolving
public interface StorageIdentifierFactory {
    /* Create storage identifier for different connector type. */
    StorageIdentifier createDynamicTableStorage(Configuration config, ClassLoader classLoader);
}

@PublicEvolving
public interface CatalogTable {
    /* Get physical storage identifier for the table. */
    StorageIdentifier storage();
}

JobSubmissionListener 

JobSubmissionListener is introduced in this FLIP to report job submission event with job logical plan which has source/sink lineages information. Source and sink lineages are static information for a job, they can be reported before job is submitted once. Therefor, the listeners are on the client side.  The RestClusterClient is the gateway of all jobs such as sql/table/datastream and event other developers who build job themselves and submit job with client. RestClusterClient will create JobSubmissionListeners and notify them before the specific job is submitted.

/**
 * This listener will be notified before job is submitted in {@link RestClusterClient}.
 */
@PublicEvolving
public interface JobSubmissionListener {
    /* Event is fired before a job is submitted. */
    void onEvent(JobSubmissionEvent submissionEvent);

    /* Event for job submission. */
    @PublicEvolving
    public class JobSubmissionEvent {
        JobID jobId();
        String jobName();
        JobLogicalPlan plan();
    }
}

/* Factory for job submission listener. */
@PublicEvolving
public interface JobSubmissionListenerFactory {
    public JobSubmissionListener createListener(Configuration configuration, ClassLoader classLoader);
}

There is JobLogicalPlan in JobSubmissionEvent which describe the job detailed information such as source/sink lineages and column lineages. Users can get the plan from the event to report more information about the job.

/**
 * Job logical plan is built according to StreamGraph in the client. Users can get sources, sinks and the relationship between nodes from plan.
 */
@PublicEvolvig
public interface JobLogicalPlan {
    /* Job type, TableOrSQL or DataStream. */
    String jobType();

    /* Job execution type, BATCH or STREAMING. */
    String executionType();

    /* Source lineage list. */
    List<JobSourceLineage> sources();

    /* Sink lineage list. */
    List<JobSinkLineage> sinks(); 

    /* Get sink table descriptor for table/sql job with given sink identifier, it will always get null for DataStream jobs. */
    SinkTableDescriptor sinkTable(String identifier);

    /* Job configuration. */
    Map<String, String> config(); 
}
 
/* Source info of the job plan. */
@PublicEvolving
public class JobSourceLineage {
    StorageIdentifier identifier();

    /* Source column name list. */
    List<String> columns();
}
 
/* Sink info of the job plan. */
@PublicEvolving 
public class JobSinkLineage {
    StorageIdentifier identifier();

    /* Sink column name list. */
    List<String> columns();

    /* Source column lineages, the key is the column in the sink and the value is the source columns list. */
    Map<String, List<SourceColumnLineage>> columnLineages();
}
 
/* Source column list for sink vertex. */
@PublicEvolving  
public class SourceColumnLineage {
    /* Source identifier. */
    String identifier();

    /* Source name list for the given sink. */
    List<String> columns();
}

@PublicEvolving
public class SinkTableDescriptor {
    /* Modify type, INSERT/UPDATE/DELETE. */
    String modifyType();

    /* Update mode, APPEND/RETRACT/UPSERT. */
    String updateMode();
    boolean overwrite();
}

Flink creates JobSourceLineage and JobSinkLineage for table/sql jobs, and for DataStream jobs, users need to create the lineages themselves and set them manually by setXXX methods as follows.

/**
 * Add set source lineage method in data stream source.
 */
@Public
public class DataStreamSource {
    private JobSourceLineage lineage;

    public DataStreamSource setLineage(JobSourceLineage lineage);
}

/**
 * Add set sink lineage method in data stream sink.
 */
@Public
public class DataStreamSink {
    private JobSinkLineage lineage;

    public DataStreamSink setLineage(JobSinkLineage lineage);
}

JobExecutionListener

JobManager generates events and notify JobExecutionListener when job status is changed.  JobStatusEvent indicates the status of Flink job in JobStatus with old status, new status and exception.

/**
 * When job status is changed in job manager, it will generate job event and notify job execution listener.
 */
@PublicEvolving
public interface JobExecutionListener {
    /* Event fired after job status has been changed. */ 
    void onEvent(JobStatusEvent jobStatusEvent);

    /* Job status event with plan. */
    @PublicEvolving
    public class JobStatusEvent {
        JobID jobId();
        String jobName();
        JobStatus oldStatus();
        JobStatus newStatus();
        /* Exception for job when it is failed. */
        @Nullable Throwable exception();
    }
}

/* Factory for job execution listener. */
@PublicEvolving
public interface JobExecutionListenerFactory {
    public JobExecutionListener createListener(Configuration configuration, ClassLoader classLoader);
}

Config Customized Listener

Users should add their listeners to the classpath of client and flink cluster, and config the listener factory in the following options

# Config catalog event listeners.
table.catalog.listeners: {job catalog listener factory1},{job catalog listener factory2}

# Existing config job submission listeners.
execution.job-submission-listeners: {job submission listener factory1},{job submission listener factory2}

# Config job execution listeners.
jobmanager.execution.listeners: {job execution listener factory1},{job execution listener factory2}

Proposed Changes

Changes for CatalogEventListener

TableEnvironmentImpl creates customized CatalogEventListener according to the option table.catalog.listeners , and build CatalogManager with the listeners. Users can create CatalogManager with the listeners in some other components such as Sql-Gateway too. The database related operations are in the Catalog , the listeners are added in AbstractCatalog  and users can notify them after database operations in their customized catalog.

/* Listeners related operations in the catalog manager. */
public final class CatalogManager {
    /* Create catalog manager with listener list. */
    private CatalogManager(
            String defaultCatalogName,
            Catalog defaultCatalog,
            DataTypeFactory typeFactory,
            ManagedTableListener managedTableListener,
            List<CatalogEventListener> listeners);

    /* Notify the listeners with given catalog event. */
    private void notify(CatalogEvent event) {
        for (CatalogEventListener listener : listeners) {
            listener.onEvent(event);
        }
    }

    /* Notify listener for tables. */
    public void createTable/dropTable/alterTable(...) {
        ....;
        notify(Create Different Table Event);
    }

    /* Builder for catalog manager. */
    public static final class Builder {
        Builder listeners(List<CatalogEventListener> listeners);
    }
}

/* Listeners related operations in AbstractCatalog. */
public abstract class AbstractCatalog implements Catalog {
    /* Create the catalog with listeners. */
    public AbstractCatalog(String name, String defaultDatabase, List<CatalogEventListener> listeners); 

    /**
     * Notify the listeners with given database event, after the customized implementation of AbstractCatalog create/alter/drop a database,
     * it can create the specific event and call the notify method.
     */
    protected void notify(BaseDatabaseEvent event) {
        for (CatalogEventListener listener : listeners) {
            listener.onEvent(event);
        }
    }
}

/* Create default catalog context with listeners. */
public DefaultCatalogContext {
    public DefaultCatalogContext(
            String name,
            Map<String, String> options,
            ReadableConfig configuration,
            ClassLoader classLoader,
            List<CatalogEventListener> listeners);

    /* Get catalog event listeners from the context. */
    public List<CatalogEventListener> listeners() {
        return listeners;
    }
}

Changes for JobSubmissionListener

Build JobLogicalPlan in StreamGraph

Flink creates Planner for sql and table jobs which contains exec nodes, then the planner will be converted to Operation , Transformation and StreamGraph. DataStream jobs are similar with SQL, Flink create datastream from environment and converted it to Transformation and StreamGraph. The job conversion is shown as followed. 

There is a graph structure in StreamGraph , we can create JobLogicalPlan based on StreamGraph easily. 

For table and SQL jobs, Flink translate exec node of planner to Transformation in method ExecNodeBase.translateToPlanInternal for source and sink. There's resolved table in source and sink nodes, Flink can create source and sink lineage based on the table and save them in source/sink transformation.

For DataStream  jobs, DataStreamSource set the source lineage to source transformation in setLineage method, and DataStreamSink does the same thing.

Finally source and sink transformations are translated and added in StreamGraph by SourceTransformationTranslator.translateInternal  and SinkTransformationTranslator.translateInternal , where source and sink lineage information can be added to StreamGraph too.

Create and notify listeners in RestClusterClient

RestClusterClient  can read option of listener from Configuration and create JobSubmissionListener . But there is no StreamGraph in RestClusterClient and submitJob method. RestClusterClient.submitJob is used in AbstractSessionClusterExecutor , which will convert Pipeline to JobGraph and submit it. AbstractSessionClusterExecutor can set StreamGraph in RestClusterClient before calling its submitJob method, then the specific RestClusterClient can get the StreamGraph and notify the listener before JobGraph is submitted.

/* Set pipeline to client before it submit job graph.  */
public class AbstractSessionClusterExecutor {
    public CompletableFuture<JobClient> execute(
            @Nonnull final Pipeline pipeline,
            @Nonnull final Configuration configuration,
            @Nonnull final ClassLoader userCodeClassloader)
            throws Exception {
        ....

        final ClusterClientProvider<ClusterID> clusterClientProvider =
                    clusterDescriptor.retrieve(clusterID);
        ClusterClient<ClusterID> clusterClient = clusterClientProvider.getClusterClient();

        // Set pipeline to the cluster client before submit job graph
        clusterClient.setPipeline(pipeline); 

        return clusterClient
                    .submitJob(jobGraph)
                    ....  
    }
}

/* Create job submission event and notify listeners before it submit job graph. */
public class RestClusterClient {
    private final List<JobSubmissionListener> listeners;
    private Pipeline pipeline;

    @Override
    public CompletableFuture<JobID> submitJob(@Nonnull JobGraph jobGraph) {
        // Create event and notify listeners before the job graph is submitted.
        JobSubmissionEvent event = createEventFromPipeline(pipeline);
        if (event != null) {
            listeners.forEach(listener -> listener.onEvent(event));
        }
        
        ....;
    }
}

Changes for JobExecutionListener

JobManager  can create JobExecutionListener in DefaultExecutionGraph according to options in job configuration. Currently JobManager will call DefaultExecutionGraph.transitionState when the job status changes, JobExecutionListener can be notified in the method as follows.

/* Set pipeline to client before it submit job graph.  */
public class DefaultExecutionGraph {
    private final List<JobExecutionListener> executionListeners;

    private boolean transitionState(JobStatus current, JobStatus newState, Throwable error) {
        ....;
        notifyJobStatusHooks(newState, error);
        // notify job execution listeners
        notifyJobExecutionListeners(current, newState, error);
        ....;
    }

    private void notifyJobExecutionListeners(JobStatus current, JobStatus newState, Throwable error) {
        JobStatusEvent event = create job status event;
        executionListeners.forEach(listener -> listener.onEvent(event));
    }
}

Listener Construction and Execution

Multiple listeners are independent, and client/JobManager will notify the listeners synchronously. It is highly recommended NOT to perform any blocking operation inside the listeners. If blocked operations are required, users need to perform asynchronous processing in their customized listeners.

Plan For The Future

  1. We add column relationships between job vertex in JobLogicalPlan, but it is not supported in Flink at present. We'd like to implement them in the next FLIP. 
  2. Currently we only supports scan source, lookup join source should be supported later.

  3. Add Job vertex listener for batch mode, such as scheduling and execution status of vertex, execution status of subtask, etc.


[1] https://datahub.io/

[2] https://atlas.apache.org/#/

[3] FLIP-276: Data Consistency of Streaming and Batch ETL in Flink and Table Store



  • No labels