Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink in batch execution mode. However, in streaming mode, we don't provide atomicity guarantees because of job is long running. Moreover, at the moment there here no strong need needs to guarantee atomicity in stream mode.

...

The final tasks of the job are all generated by Planner. We want to complete the create table/drop table action through Hook on the JM side, so we need an API to register the Hook on the JM side.

...

For CatalogBaseTable, we use CatalogPropertiesUtil to serialize/deserialize it , it's the tools that Flink already provides.

For Catalog,   the way we need to serialize the catalog is to save name and the options of the catalog in the CatalogManager, so that the JM side only needs to which are used in create catalog DDL, then JM side can use these options to re-initialize the catalog (use FactoryUtil#by flink ServiceLoader mechnism(UsingFactoryUtil#createCatalog to get catalog). The  To InMemoryCatalog, here are some special case. Due to the tables in InMemoryCatalog already exist in the external system. , metadata information in InMemoryCatalog is only used by the job itself and is only stored in memory. Therefore, all metadata information in InMemoryCatalog does not need to be serialized and passed to JM, so this solution can cover the usage scenario of InMemoryCatalog.For example,  create catalog use DDL

Here we give an example about catalog serializable process that catalog is created by DDL way.

CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);

...

The advantages of this solution are simple design, ease of compatibility and reduced complexity of implementation for the user, and does not require complex serialization and deserialization tools. The  The disadvantage of this solution is that it does not cover the usage scenario of TableEnvironment#registerCatalog.

Regarding the disadvantage, which we can be solved if we introduce CatalogDescriptor (like TableDescriptor) for Table API used to register catalog in the future, and Flink can get the properties of Catalog through CatalogDescriptor. The interface pseudo-code in TableEnvironment as following:

void registerCatalog(String catalogName, CatalogDescriptor catalogDescriptor);

...