Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph. Because JobGraph uses ObjectOutputStream serialization, since CatalogBaseTable and Catalog cannot be serialized directly using ObjectOutputStream, JobStatusHook instances need to implement interface Externalizable. Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable, it's the tools that Flink already provides.

For Catalog,  A simple a way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog(use FactoryUtil#createCatalog to get catalog).

For example,  create catalog use DDL

CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);

...



The advantage of this solution is that the implementation is simple and convenient, and does not require complex serialization and deserialization tools.

...

For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized. We save the classname of the Catalog together with the serialized content, like this:

Catalog ClassName
Catalog serialized data

Since the Catalog class may not have a parameterless constructor, so we can't use Class#newInstance to initialize an object, we can use the framework objenesis to solve. After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API. This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

...