...
- Streaming mode requires the table to be created first, downstream jobs can consume in real time.
- In most cases, Streaming jobs do not need to be cleaned up even if the job fails.
- Flink has a rich connector ecosystem, and the capabilities provided by external storage systems are different, Flink needs to behave consistently.
- Batch jobs try to ensure final atomicity.
Implementation Plan
We will introduce new concepts: Stage Table.
Stage Table: Created through the Catalog#createStageTable API, stored in the memory of the Catalog, visible in the SQL compilation stage;
There will be a collection record stage table in the catalog, which is created in the catalog backend storage when the job status is CREATED;
Dropped from the catalog backend storage when the job status is FAILED or CANCELED.
Through the research summary and analysis, the overall implementation process is as follows:
Execution Flow
The overall execution process is shown in the figure above.
Complete execution process
Steps:
- Create the sink table in the catalog using the new API(createStageTable) according to the schema of the query result, the table is visible but recorded in memory.
- Submit job to the cluster.
- After JM starts, create a table through Catalog.
- If the job executes successfully, then make data visible.
- If the job execution fails, then drop the sink table or delete data.(This capability requires runtime module support, such as hook, and SQL passes relevant parameters to the runtime module.)
- (stage table) through Catalog's new API createStageTable.
- Construct CTASJobStatusHook with Catalog as a construction parameter, CTASJobStatusHook is an implementation of the JobStatusHook interface.
- Register CTASJobStatusHook with StreamGraph, then passed to JobGraph and serialized(Need to implement serialization/deserialization of Catalog and JobStatusHook).
- When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method, and we call the Catalog#createTable method in the CTASJobStatusHook#onCreated method.
- When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method, we call the Catalog#dropTable method in the CTASJobStatusHook#onFailed method.
- When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method, we call the Catalog#dropTable method in the CTASJobStatusHook#onCanceled method.
- When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.
Hook design
Definition of JobStatusHookInternal JM Hook Definition
/** |
Register JobStatusHook with StreamGraph
public class StreamGraph implements Pipeline { /** Registers the JobStatusHook. */ |
Supported Job Mode
Support both streaming and batch mode.
...