Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph. Because JobGraph uses ObjectOutputStream serialization, since CatalogBaseTable and Catalog cannot be serialized directly using ObjectOutputStream, JobStatusHook instances need to implement interface Externalizable. Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable, it's the tools that Flink already provides.

For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized. We save the classname of the Catalog together with the serialized content, like this:

...

 

A simple way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog.

For example,  create catalog use DDL

CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);

we just need to save the default-database, username, password, base-url, and then re-initialize the JdbcCatalog on JM.

The advantage of this solution is that the implementation is simple and convenient, and does not require complex serialization and deserialization tools.

The disadvantage of this solution is that it cannot cover the entire scene.


Runtime

Provide JM side, job status

Since the Catalog class may not have a parameterless constructor, so we can't use Class#newInstance to initialize an object, we can use the framework objenesis to solve. After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API. This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

For example, JdbcCatalog#serialize can save catalogName, defaultDatabase, username, pwd, baseUrl, and JdbcCatalog#deserialize can re-initialize a JdbcCatalog object through these parameters; HiveCatalog#serialize can save catalogName, defaultDatabase, hiveConf, hiveVersion, and HiveCatalog#deserialize can re-initialize a HiveCatalog object through these parameters; InMemoryCatalog#serialize only needs to save the catalogName and defaultDatabase, and InMemoryCatalog#deserialize can re-initialize an InMemoryCatalog object through these two parameters.

The tables in the InMemoryCatalog already exist in the external system. The metadata information held in the InMemoryCatalog is only used by the job itself, and is held only in memory. Therefore, all metadata information in the InMemoryCatalog does not need to be serialized and passed to JM. In JM, only need to initialize a new InMemoryCatalog.

Runtime

Provide JM side, job status change hook mechanism.

/**
* Hooks provided by users on job status changing.
*/
@Internal
public interface JobStatusHook extends Serializable {

/** When Job become CREATED status. It would only be called one time. */
default void onCreated(JobID jobId) {}

/** When job finished successfully. */
default void onFinished(JobID jobId) {}

/** When job failed finally. */
default void onFailed(JobID jobId, Throwable throwable) {}

/** When job get canceled by users. */
default void onCanceled(JobID jobId) {}
}

...

Rejected Alternatives

Catalog serialize

A simple way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog.

For example,  create catalog use DDL

...

CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);


 This solution is only applicable to the way of create catalog using DDL because we only can get the Catalog options through the with keyword. If we use the Catalog registered by TableEnvironment#registerCatalog method, the options can not be got. There are a large number of users who use TableEnvironment#registerCatalog to register the Catalog in the production environment.  Consider the above, we reject this plan.


For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized. We save the classname of the Catalog together with the serialized content, like this:

Catalog ClassName
Catalog serialized data

Since the Catalog class may not have a parameterless constructor, so we can't use Class#newInstance to initialize an object, we can use the framework objenesis to solve. After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API. This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

For example, JdbcCatalog#serialize can save catalogName, defaultDatabase, username, pwd, baseUrl, and JdbcCatalog#deserialize can re-initialize a JdbcCatalog object through these parameters; HiveCatalog#serialize can save catalogName, defaultDatabase, hiveConf, hiveVersion, and HiveCatalog#deserialize can re-initialize a HiveCatalog object through these parameters; InMemoryCatalog#serialize only needs to save the catalogName and defaultDatabase, and InMemoryCatalog#deserialize can re-initialize an InMemoryCatalog object through these two parameters.

The tables in the InMemoryCatalog already exist in the external system. The metadata information held in the InMemoryCatalog is only used by the job itself, and is held only in memory. Therefore, all metadata information in the InMemoryCatalog does not need to be serialized and passed to JM. In JM, only need to initialize a new InMemoryCatalog

we just need to save the default-database, username, password, base-url, and then re-initialize the JdbcCatalog on JM.

The advantage of this solution is that the implementation is simple and convenient, and does not require complex serialization and deserialization tools.

The disadvantage of this solution is that it cannot cover the entire scene.

 This solution is only applicable to the way of create catalog using DDL because we only can get the Catalog options through the with keyword. If we use the Catalog registered by TableEnvironment#registerCatalog method, the options can not be got. There are a large number of users who use TableEnvironment#registerCatalog to register the Catalog in the production environment.  Consider the above, we reject this plan.



References

  1. Support SELECT clause in CREATE TABLE(CTAS)
  2. MySQL CTAS syntax
  3. Microsoft Azure Synapse CTAS
  4. LanguageManual DDL#Create/Drop/ReloadFunction
  5. Spark Create Table Syntax

...