Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, and the Hook, Catalog and CatalogBaseTable are serialized.
  2. Submit the job to the cluster, if it is in detached mode, the client can exit.
  3. When the job starts, deserialize hooks, Catalog and CatalogBaseTable; Call the Catalog#createTable method through the hook to create the CatalogBaseTable.
  4. start task execution.
  5. If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by calling the hook of the Catalog#dropTable method.

 In streaming mode, usually the data is written in real time and visible in real time, so streaming mode does not provide atomicity guarantee, then there is no need to use JobStatusHook mechanism.

Batch mode requires the use of the JobStatusHook mechanism to ensure atomicity.

Planner

Providing method for planner to register JobStatusHook with StreamGraph.

...

For example, our JobStatusHook implementation is called CTASJobStatusHook, and use JdbcCatalog, JdbcCatalog serialized by Planner has been covered in the previous section and will not be repeated.

We can deserialize the Catalog Name and properties, and then use the FactoryUtil#createCatalog method to get the new Catalog JdbcCatalog instance via FactoryUtil#createCatalog. Then when the job status changes, the CTASJobStatusHook method can be called:

  • When the job status is CREATED, the runtime module will call the CTASJobStatusHook#onCreated method, which will call the JdbcCatalog#createTable method to create a table.
  • When the final status of the job is FAILED, the runtime module will call the CTASJobStatusHook#onFailed method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is CANCELED, the runtime module will call the CTASJobStatusHook#onCanceled method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is FINISH, the runtime module will call the CTASJobStatusHook#onFinished method, and we do not need to do any additional operations.

Atomicity & Data Visibility

Atomicity

...

  • .

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle. Otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:

  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

...