Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Streaming mode requires the table to be created first, downstream jobs can consume in real time.
  • In most cases, Streaming jobs do not need to be cleaned up even if the job fails.
  • Flink has a rich connector ecosystem, and the capabilities provided by external storage systems are different, Flink needs to behave consistently.
  • Batch jobs try to ensure final atomicity.

Implementation Plan

We will introduce new concepts: Stage Table.

Stage Table: Created through the Catalog#createStageTable API, stored in the memory of the Catalog, visible in the SQL compilation stage;

There will be a collection record stage table in the catalog, which is created in the catalog backend storage when the job status is CREATED;

Dropped from the catalog backend storage when the job status is FAILED or CANCELED.


Through the research summary and analysis, the overall implementation process is as follows:

Execution Flow

The overall execution process is shown in the figure above.


Complete execution process

Steps:

  1. Create the sink table in the catalog using the new API(createStageTable) according to the schema of the query result, the table is visible but recorded in memory.
  2. Submit job to the cluster.
  3. After JM starts, create a table through Catalog.
  4. If the job executes successfully, then make data visible.
  5. If the job execution fails, then drop the sink table or delete data.(This capability requires runtime module support, such as hook, and SQL passes relevant parameters to the runtime module.)
  6. (stage table) through Catalog's new API createStageTable.
  7. Construct CTASJobStatusHook with Catalog as a construction parameter, CTASJobStatusHook is an implementation of the JobStatusHook interface.
  8. Register CTASJobStatusHook with StreamGraph, then passed to JobGraph and serialized(Need to implement serialization/deserialization of Catalog and JobStatusHook).
  9. When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method, and we call the Catalog#createTable method in the CTASJobStatusHook#onCreated method.
  10. When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method, we call the Catalog#dropTable method in the CTASJobStatusHook#onFailed method.
  11. When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method, we call the Catalog#dropTable method in the CTASJobStatusHook#onCanceled method.
  12. When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.

Hook design

Definition of JobStatusHookInternal JM Hook Definition

/**
* Hooks provided by users on job status changing.
*/
public interface JobStatusHook {

/** When Job become CREATED status. It would only be called one time. */
default void onCreated(JobID jobId) {}

/** When job finished successfully. */
default void onFinished(JobID jobId) {}

/** When job failed finally. */
default void onFailed(JobID jobId, Throwable throwable) {}

/** When job get canceled by users. */
default void onCanceled(JobID jobId) {}
}

Register JobStatusHook with StreamGraph

public class StreamGraph implements Pipeline {

... ...

    /** Registers the JobStatusHook. */
void setJobStatusHook(JobStatusHook hook) {
...
}
}


Supported Job Mode

Support both streaming and batch mode.

...