Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

and the user can decide what information needs to be saved during serialization, so that it can be used during deserialization.

Proposed Changes

Image Removed

The overall execution process is shown in the figure above.

Because the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side, and create table and drop table are executed through Catalog.

Therefore, a new Hook mechanism and Catalog serialization and deserialization solution need to be introduced.

The overall execution process is as follows,

  1. Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, and the Hook, Catalog and CatalogBaseTable are serialized.
  2. Submit the job to the cluster, if it is in detached mode, the client can exit.
  3. When the job starts, deserialize hooks, Catalog and CatalogBaseTable; Call the Catalog#createTable method through the hook to create the CatalogBaseTable.
  4. start task execution.
  5. If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by calling the hook of the Catalog#dropTable method.

For details, see Implementation Plan.

Implementation Plan

Through the appendix research summary and analysis, the current status of CTAS in the field of big data is:

  • Flink: Flink dialect does not support CTAS. ==> LEVEL-1
  • Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
  • Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3

Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.

Planner

Providing method for planner to register JobStatusHook with StreamGraph.

Implementation Plan

Through the appendix research summary and analysis, the current status of CTAS in the field of big data is:

  • Flink: Flink dialect does not support CTAS. ==> LEVEL-1
  • Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
  • Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3

Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.

Image Added

The overall execution process is shown in the figure above.

Because the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side, and create table and drop table are executed through Catalog.

Therefore, a new Hook mechanism and Catalog serialization and deserialization solution need to be introduced.

The overall execution process is as follows,

  1. Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, and the Hook, Catalog and CatalogBaseTable are serialized.
  2. Submit the job to the cluster, if it is in detached mode, the client can exit.
  3. When the job starts, deserialize hooks, Catalog and CatalogBaseTable; Call the Catalog#createTable method through the hook to create the CatalogBaseTable.
  4. start task execution.
  5. If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by calling the hook of the Catalog#dropTable method.

Planner

Providing method for planner to register JobStatusHook with StreamGraph.

public class StreamGraph implements Pipeline {

... ...

    /** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHook hook) {
public class StreamGraph implements Pipeline {
... ...
/** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHook hook) {
...
}
}}
}

The final tasks of the The final tasks of the job are all generated by Planner. We want to complete the create table/drop table through Hook on the JM side, so we need an API to register the Hook on the JM side.

...

InMemoryCatalog#serialize only needs to save the catalogName and defaultDatabase, and InMemoryCatalog#deserialize can re-initialize an InMemoryCatalog object through these two parameters;

Runtime

Provide JM side, job status change hook mechanism.

The tables in the InMemoryCatalog already exist in the external system. The metadata information held in the InMemoryCatalog is only used by the job itself,

and is held only in memory. Therefore, all metadata information in the InMemoryCatalog does not need to be serialized and passed to JM. In JM, only need to initialize a new InMemoryCatalog.

Runtime

Provide JM side, job status change hook mechanism.

/**
* Hooks provided by users on job status changing.
*/
@Internal
public interface JobStatusHook {

/** When Job become CREATED status. It would only be called one time. */
default void onCreated(JobID jobId) {}

/** When job finished successfully. */
default void onFinished(JobID jobId) {}

/** When job failed finally. */
default void onFailed(JobID jobId, Throwable throwable) {}

/** When job get canceled by users. */
default void onCanceled(JobID jobId) {}
}

On the JM side, deserialize the JobStatusHook,

When the job status transitions to CREATED, call JobStatusHook#onCreated;

When the job status transitions to FAILED, call JobStatusHook#onFailed;

When the job status transitions to CANCELED, call JobStatusHook#onCanceled;

When the job status transitions to FINISH, call JobStatusHook#onFinished.

We can integrate create table in the onCreated method, Integrate drop table in onFailed and onCanceled methods.

5) Submit the job to the cluster for execution. Then the runtime will call the corresponding method of JobStatusHook when the job status changes.

When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method and then call the Catalog#createTable method to create table.

When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method and then call the Catalog#dropTable method to drop table.

When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method and then call the Catalog#dropTable method to drop table.

When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.

Atomicity & Data Visibility

Atomicity

CTAS does not provide strict atomicity, we will create the table first, the final atomicity is determined by the cleanUp implementation of TableSink.

This requires runtime module support, like the description in the Execution Flow.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,

otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:
  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Catalog

We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:

In-memory catalog:

  1. Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
  2. The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.

Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.

External catalog:

  1. Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
  2. The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.

Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog. 

Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.

In-memory Catalog,the options of the table are completely dependent on user input.

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.

For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/

CTAS supports Managed Table and Non-Managed Table.

Users need to be clear about their business needs and set the table options correctly.

The Catalog#inferTableOptions API can also automatically infer whether to add the connector attribute based on whether the Catalog supports ManagedTable.

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

Catalog serialize

Flink's current Hook design cannot meet the needs of CTAS.

For example, the JobListener is on the Client side;

JobStatusListener is on the JM side, but it cannot be serialized.

Thus we tend to add a new interface JobStatusHook, which could be attached to the JobGraph and executed in the JobMaster. The interface will also be marked as Internal. 

When the task starts, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.

Through the previous method of serializing and deserializing Catalog and CatalogBaseTable, when deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.

Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.

When deserializing a Catalog, first read the Catalog ClassName, then use the framework objenesis to get an empty instance of the Catalog,

and finally call the Catalog#deserialize method to get a valid Catalog instance.

When the job is start and the job status changes, the JobStatusHook method will be called:

For example, our JobStatusHook implementation is called CTASJobStatusHook, and uses JdbcCatalog:

The JdbcCatalog serialized by planner is like this :

org.apache.flink.connector.jdbc.catalog.JdbcCatalog

'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'

We can get an empty instance of JdbcCatalog through the framework objenesis, and then read the default-database, username, password, base-url in the JdbcCatalog#deserialize method,

so that a valid JdbcCatalog can be initialized.

Then when the job status changes, the CTASJobStatusHook method can be called:

When the job status is CREATED, the runtime module will call the CTASJobStatusHook#onCreated method, which will call the JdbcCatalog#createTable method to create a table.

When the final status of the job is FAILED, the runtime module will call the CTASJobStatusHook#onFailed method, which will call the JdbcCatalog#dropTable method to drop table.

When the final status of the job is CANCELED, the runtime module will call the CTASJobStatusHook#onCanceled method, which will call the JdbcCatalog#dropTable method to drop table.

When the final status of the job is FINISH, the runtime module will call the CTASJobStatusHook#onFinished method, and we do not need to do any additional operations.

Atomicity & Data Visibility

Atomicity

CTAS does not provide strict atomicity, we will create the table first, the final atomicity is determined by the cleanUp implementation of TableSink.

This requires runtime module support, like the description in the Execution Flow.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,

otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:
  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Catalog

We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:

In-memory catalog:

  1. Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
  2. The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.

Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.

External catalog:

  1. Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
  2. The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.

Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog. 


Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.

In-memory Catalog,the options of the table are completely dependent on user input.

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.

For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/

CTAS supports Managed Table and Non-Managed Table.

Users need to be clear about their business needs and set the table options correctly.

The Catalog#inferTableOptions API can also automatically infer whether to add the connector attribute based on whether the Catalog supports ManagedTable.

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

Catalog serialize

A simple way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog.

For example,  create catalog use DDL

CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);

we just need to save the default-database, username, password, base-url, and then re-initialize the JdbcCatalog on JMA simple way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog.

The advantage of this solution is that the implementation is simple and convenient, and does not require complex serialization and deserialization tools.

...