Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
/**
 * Different events will be fired when a catalog/database/table is changed. The customized listener can receive these events and then do some specific operations according to the event type.
 */
@PublicEvolving
public interface CatalogEventListener {
    /* Event fired after a catalog is modified. */
    void onEvent(CatalogEvent catalogEvent);

    /* The basic class for catalog related event. */
    @PublicEvolving
    public interface CatalogEvent {
        /* The catalog of the event. */
        Catalog catalog();
        /* The name of catalog. */
        String catalogName();
    }

    /* Event for catalog registration. */
    @PublicEvolving
    interface RegisterCatalogEvent extends CatalogEvent { }
 
    /* Event for catalog unregistration. */ 
    @PublicEvolving
    interface UnregisterCatalogEvent extends CatalogEvent {
        boolean ignoreIfNotExists();
    }

    /* Event for database creation. */
    @PublicEvolving
    interface CreateDatabaseEvent extends CatalogEvent {
        CatalogDatabase database();
        String databaseName(); 
        boolean ignoreIfExists();
    }

    /* Event for dropping database. */
    @PublicEvolving
    interface DropDatabaseEvent extends CatalogEvent  {
        String databaseName(); 
        boolean ignoreIfExists();
    }

    /* Base table event, provides column list, primary keys, partition keys, watermarks and properties in CatalogBaseTable. The table can be source or sink. */
    interface BaseTableEvent extends CatalogEvent {
        ObjectIdentifier identifier();  
        CatalogBaseTable table();
    }

    /* Event for table creation. */
    @PublicEvolving
    interface CreateTableEvent extends BaseTableEvent {
        boolean ignoreIfExists();
    }

    /* Event for altering table, provides all information in old table and new table. */
    @PublicEvolving
    interface AlterTableEvent extends BaseTableEvent {
        List<TableChange> tableChanges();
        boolean ignoreIfExists();
    }

    /* Event for dropping table. */
    @PublicEvolving
    interface DropTableEvent extends BaseTableEvent {
        boolean ignoreIfExists();   
    }
}

JobSubmissionListener

JobListener 

There is an existing JobListener which JobSubmissionListener will be notified before and after a when job is submitted by client. There is JobSubmissionEvent with job id, name and logical plan for the listener. Users can get . Before job submission event should be added to the listener with source/sink list from the plan and in the job, then users can do their customized validation such as whether a table is written by multiple jobs. JobSubmissionEvent is created for the listener and onJobBeforeSubmitted method is added to the listener  as follows.

Code Block
@PublicEvolving
public interface JobListener {
Code Block
/**
 * Job submission listener will be notified before and after a job is submitted. Users can get job id, job name, source list and sink list from the submission event.
 */
@PublicEvolving
public interface JobSubmissionListener {
    /* Event is fired before and after a job is submitted. */
    void onEvent(JobSubmissionEvent submissionEvent);

    /* Event for job submission. */
    @PublicEvolving
    public interface JobSubmissionEvent {
        JobID jobId();
        String jobName();
        JobLogicalPlan plan();
    }

    /* Event before job is submitted. */
    @PublicEvolving
    public interface BeforeJobSubmissionEvent extends JobSubmissionEvent {
    }fired before a job is submitted. */
    void onJobPreSubmitted(JobSubmissionEvent submissionEvent);

    /* Event afterfor job is submitted successfulsubmission. */
    @PublicEvolving
    public interface SuccessJobSubmissionEvent extends JobSubmissionEvent {
    }

    /* Event after job is submitted failed. */JobID jobId();
    @PublicEvolving
    public interface FailedJobSubmissionEvent extends JobSubmissionEvent {String jobName();
        ThrowableJobLogicalPlan exceptionplan();
    }
}

JobExecutionListener

...

Users should add their listeners to the classpath of client and flink cluster, and config them in the following options

Code Block
# Config jobcatalog deploymentevent listeners.
table.catalog.listeners: {job.deployment. catalog listener class1},{job catalog listener class2}

# Existing config job submission listeners.
execution.job-listeners: {job deploymentsubmission listener class1},{job deploymentsubmission listener class2}

# Config job execution listeners.
jobmanager.execution.listeners: {job execution listener class1},{job execution listener class2}

...