Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The complexity of serialization Catalog is high, and we need to introduce a new serialization mechanism to achieve this, which is described in a separate section.

Catalog Serialization Solutions :

Option 1: Serialize the options in the Create Catalog DDL

We need to serialize catalog name and the options which are used in create catalog DDL, then JM side can use these options to re-initialize the catalog by flink ServiceLoader mechnism(UsingFactoryUtil#createCatalog to get catalog).  To InMemoryCatalog, here are some special case. Due to the tables in InMemoryCatalog already exist in the external system, metadata information in InMemoryCatalog is only used by the job itself and is only stored in memory. The database related information in InMemoryCatalog needs to be serialized and then passed to JM, otherwise the database may not exist when JM creates the table. 

Here we give an example about catalog serializable process that catalog is created by DDL way.

...

CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);

1) The Planner registers the catalog to the CatalogManager, it also registers the properties in the with keyword to the CatalogManager.

2) When serializing the catalog, only need to serialize and save the catalog name(my_catalog) and properties, like this:

...

{'type'='jdbc', 'default-database'='...', 'username'='...', 'password'='...', 'base-url'='...'}

The advantages of this solution are simple design, ease of compatibility and reduced complexity of implementation for the user, and does not require complex serialization and deserialization tools. The disadvantage of this solution is that it does not cover the usage scenario of TableEnvironment#registerCatalog.

Regarding the disadvantage, we can introduce CatalogDescriptor (like TableDescriptor) for Table API used to register catalog in the future, and Flink can get the properties of Catalog through CatalogDescriptor. The interface pseudo-code in TableEnvironment as following:

void registerCatalog(String catalogName, CatalogDescriptor catalogDescriptor);

Note:  This solution only works if we create the Catalog using DDL, because we can only get the Catalog properties with the with keyword. If we use a Catalog registered by TableEnvironment#registerCatalog method, we cannot get these properties. Therefore, CTAS does not currently support jobs that use TableEnvironment#registerCatalog.

In the HiveCatalog solution, since the configuration of hive-conf-dir is a local path, make sure that all nodes in the cluster are placing hive configuration files under the same path. The current Application mode of Flink also has this problem.

Runtime

Provide JM side, job status change hook mechanism.

...



Runtime

Provide JM side, job status change hook mechanism.

/**
* Hooks provided by users on job status changing.
*/
@Internal
public interface JobStatusHook extends Serializable {

/** When Job become CREATED status. It would only be called one time. */
void onCreated(JobID jobId);

/** When job finished successfully. */
void onFinished(JobID jobId);

/** When job failed finally. */
void onFailed(JobID jobId, Throwable throwable);

/** When job get canceled by users. */
void onCanceled(JobID jobId);
}

Flink's current Hook design cannot meet the needs of CTAS. For example, the JobListener is on the Client side; JobStatusListener is on the JM side, but it cannot be serialized. Thus we tend to propose a new interface JobStatusHook, which could be attached to the JobGraph and executed in the JobMaster. The interface will also be marked as Internal. 

The process of CTAS in runtime

  1. When the task starts, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.
  2. When deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.
    • Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.
    • When deserializing a Catalog, first read the catalog name and properties, then use the FactoryUtil#createCatalog to get catalog instance.
  3. When the job is start and the job status changes, the JobStatusHook method will be called:

For example, our JobStatusHook implementation is called CTASJobStatusHook, and use JdbcCatalog, JdbcCatalog serialized by Planner has been covered in the previous section and will not be repeated.

We can deserialize the Catalog Name and properties, and then use the FactoryUtil#createCatalog method to get the JdbcCatalog instance. Then when the job status changes, the CTASJobStatusHook method can be called:

  • When the job status is CREATED, the runtime module will call the CTASJobStatusHook#onCreated method, which will call the JdbcCatalog#createTable method to create a table.
  • When the final status of the job is FAILED, the runtime module will call the CTASJobStatusHook#onFailed method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is CANCELED, the runtime module will call the CTASJobStatusHook#onCanceled method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is FINISH, the runtime module will call the CTASJobStatusHook#onFinished method, and we do not need to do any additional operations.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle. Otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:

  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle. For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table.

CTAS supports Managed Table and Non-Managed Table, user need to be clear about their business needs and set the table options correctly. The Catalog#inferTableOptions API can also automatically infer whether to add the connector attribute based on whether the Catalog supports ManagedTable.

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

Catalog serialize

Option 1: Add serialize/deserialize API to catalog

If we added serialize and deserialize APIs, Catalog must implement serialization and deserialization itself. We save the classname of the Catalog together with the serialized content, like this:

Catalog ClassName
Catalog serialized data

Since the Catalog class may not have a parameterless constructor, so we can't use Class#newInstance to initialize an object, we can use the framework objenesis to solve. After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API. This solves the serialization/deserialization problem of Catalog.

For example, JdbcCatalog#serialize can save catalogName, defaultDatabase, username, pwd, baseUrl, and JdbcCatalog#deserialize can re-initialize a JdbcCatalog object through these parameters; HiveCatalog#serialize can save catalogName, defaultDatabase, hiveConf, hiveVersion, and HiveCatalog#deserialize can re-initialize a HiveCatalog object through these parameters; InMemoryCatalog#serialize only needs to save the catalogName and defaultDatabase, and InMemoryCatalog#deserialize can re-initialize an InMemoryCatalog object through these two parameters.

The tables in the InMemoryCatalog already exist in the external system. The metadata information held in the InMemoryCatalog is only used by the job itself, and is held only in memory. Therefore, all metadata information in the InMemoryCatalog does not need to be serialized and passed to JM. In JM, only need to initialize a new InMemoryCatalog.

The solution serialization tool is more complex to implement, and the user-defined Catalog is more expensive to implement, so it is abandoned.

Option 2: Serialize the options in the Create Catalog DDL

We need to serialize catalog name and the options which are used in create catalog DDL, then JM side can use these options to re-initialize the catalog by flink ServiceLoader mechnism(UsingFactoryUtil#createCatalog to get catalog).  To InMemoryCatalog, here are some special case. Due to the tables in InMemoryCatalog already exist in the external system, metadata information in InMemoryCatalog is only used by the job itself and is only stored in memory. The database related information in InMemoryCatalog needs to be serialized and then passed to JM, otherwise the database may not exist when JM creates the table. Other objects do not need to be serialized.

Here we give an example about catalog serializable process that catalog is created by DDL way.

CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);

1) The Planner registers the catalog to the CatalogManager, it also registers the properties in the with keyword to the CatalogManager.

2) When serializing the catalog, only need to serialize and save the catalog name(my_catalog) and properties, like this:

my_catalog

{'type'='jdbc', 'default-database'='...', 'username'='...', 'password'='...', 'base-url'='...'}


The advantages of this solution are simple design, ease of compatibility and reduced complexity of implementation for the user, and does not require complex serialization and deserialization tools. The disadvantage of this solution is that it does not cover the usage scenario of TableEnvironment#registerCatalog.

Regarding the disadvantage, we can introduce CatalogDescriptor (like TableDescriptor) for Table API used to register catalog in the future, and Flink can get the properties of Catalog through CatalogDescriptor. The interface pseudo-code in TableEnvironment as following:

void registerCatalog(String catalogName, CatalogDescriptor catalogDescriptor);


Note:  This solution only works if we create the Catalog using DDL, because we can only get the Catalog properties with the with keyword. If we use a Catalog registered by TableEnvironment#registerCatalog method, we cannot get these properties. Therefore, CTAS does not currently support jobs that use TableEnvironment#registerCatalog.

The following issues require attention:

  1. HiveCatalog:  
    1. If hive-conf-dir is specified, since the configuration of hive-conf-dir is a local path, please make sure that all nodes in the cluster put the hive configuration file under the same path, otherwise JM will not find the file and FAILED. This problem also exists in the current application mode of Flink.
    2. If hive-conf-dir is not specified, then HiveCatalog will look for hive-site.xml from Java Classpath, then we have to solve the hive-site.xml upload problem and make sure that all modes in Flink Client and JM Classpath can find Otherwise the job will fail.
  2. InMemoryCatalog:
    1. We need to additionally serialize the database information that already exists in the InMemoryCatalog, otherwise the operation to create a table on the JM side may fail because the corresponding database cannot be found

Flink's current Hook design cannot meet the needs of CTAS. For example, the JobListener is on the Client side; JobStatusListener is on the JM side, but it cannot be serialized. Thus we tend to propose a new interface JobStatusHook, which could be attached to the JobGraph and executed in the JobMaster. The interface will also be marked as Internal. 

The process of CTAS in runtime

  1. When the task starts, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.
  2. When deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.
    • Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.
    • When deserializing a Catalog, first read the catalog name and properties, then use the FactoryUtil#createCatalog to get catalog instance.
  3. When the job is start and the job status changes, the JobStatusHook method will be called:

For example, our JobStatusHook implementation is called CTASJobStatusHook, and use JdbcCatalog, JdbcCatalog serialized by Planner has been covered in the previous section and will not be repeated.

We can deserialize the Catalog Name and properties, and then use the FactoryUtil#createCatalog method to get the JdbcCatalog instance. Then when the job status changes, the CTASJobStatusHook method can be called:

  • When the job status is CREATED, the runtime module will call the CTASJobStatusHook#onCreated method, which will call the JdbcCatalog#createTable method to create a table.
  • When the final status of the job is FAILED, the runtime module will call the CTASJobStatusHook#onFailed method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is CANCELED, the runtime module will call the CTASJobStatusHook#onCanceled method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is FINISH, the runtime module will call the CTASJobStatusHook#onFinished method, and we do not need to do any additional operations.

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle. Otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:

  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle. For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table.

CTAS supports Managed Table and Non-Managed Table, user need to be clear about their business needs and set the table options correctly. The Catalog#inferTableOptions API can also automatically infer whether to add the connector attribute based on whether the Catalog supports ManagedTable.

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

Catalog serialize

For Catalog, if we added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized. We save the classname of the Catalog together with the serialized content, like this:

...

Since the Catalog class may not have a parameterless constructor, so we can't use Class#newInstance to initialize an object, we can use the framework objenesis to solve. After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API. This solves the serialization/deserialization problem of Catalog.

For example, JdbcCatalog#serialize can save catalogName, defaultDatabase, username, pwd, baseUrl, and JdbcCatalog#deserialize can re-initialize a JdbcCatalog object through these parameters; HiveCatalog#serialize can save catalogName, defaultDatabase, hiveConf, hiveVersion, and HiveCatalog#deserialize can re-initialize a HiveCatalog object through these parameters; InMemoryCatalog#serialize only needs to save the catalogName and defaultDatabase, and InMemoryCatalog#deserialize can re-initialize an InMemoryCatalog object through these two parameters.

The tables in the InMemoryCatalog already exist in the external system. The metadata information held in the InMemoryCatalog is only used by the job itself, and is held only in memory. Therefore, all metadata information in the InMemoryCatalog does not need to be serialized and passed to JM. In JM, only need to initialize a new InMemoryCatalog.

...

    1. .

References

  1. Support SELECT clause in CREATE TABLE(CTAS)
  2. MySQL CTAS syntax
  3. Microsoft Azure Synapse CTAS
  4. LanguageManual DDL#Create/Drop/ReloadFunction
  5. Spark Create Table Syntax

...