Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph. Because JobGraph uses ObjectOutputStream serialization, since CatalogBaseTable and Catalog cannot be serialized directly using ObjectOutputStream, JobStatusHook instances need to implement interface Externalizable. Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable, it's the tools that Flink already provides.

For Catalog,  a the way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog(use FactoryUtil#createCatalog to get catalog).

The tables in InMemoryCatalog already exist in the external system. metadata information in InMemoryCatalog is only used by the job itself and is only stored in memory. Therefore, all metadata information in InMemoryCatalog does not need to be serialized and passed to JM, so this solution can cover the usage scenario of InMemoryCatalog.

For example,  create catalog use DDL

CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);

1) The Planner registers the catalog to the CatalogManager, it also registers the properties in the with keyword to the CatalogManager.

2) When serializing the catalog, only need to serialize and save the catalog name(my_catalog) and properties, like this:

my_catalog

{'type'='jdbc', 'default-database'='...', 'username'='...', 'password'='...', 'base-url'='...'}

The advantages of this solution are simple design, ease of compatibility and reduced complexity of implementation for the userThe advantage of this solution is that the implementation is simple and convenient, and does not require complex serialization and deserialization tools. 

The disadvantage of this solution is that it cannot cover the entire scene.does not cover the usage scenario of TableEnvironment#registerCatalog, which can be solved if we introduce CatalogDescriptor (like TableDescriptor) for Table API in the future, and Flink can get the properties of Catalog through CatalogDescriptor.

void registerCatalog(String catalogName, CatalogDescriptor catalogDescriptor);

Runtime

Provide JM side, job status change hook mechanism.

...

  1. When the task starts, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.
  2. Through the previous method of serializing and deserializing Catalog and CatalogBaseTable, when When deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.
    • Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.
    • When deserializing a Catalog, first read the Catalog ClassNamecatalog name and properties, then use the framework objenesis to get an empty instance of the Catalog,and finally call the Catalog#deserialize method to get a valid Catalog FactoryUtil#createCatalog to get catalog instance.
  3. When the job is start and the job status changes, the JobStatusHook method will be called:

...

  • Streaming mode requires the table to be created first(metadata sharing), downstream jobs can consume in real time.
  • In most cases, Streaming jobs do not need to be cleaned up even if the job fails(Such as Redis, cannot be cleaned unless all keys written are recorded).
  • Batch jobs try to ensure final atomicity(The job is successful and the data is visible; otherwise, drop the metadata and delete the temporary data).






properties  [ˈprɒpətiz]  详细X
基本翻译
n. <正式>房屋及周围的土地;性质,性能;(舞台或电影的)道具(prop 的旧时用语)(property 的复数)
网络释义
Wheelock Properties: 会德丰地产