Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

public class StreamGraph implements Pipeline {

... ...

    /** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHook hook) {
...
}
}

The final tasks of the job are all generated by Planner. We want to complete the create table/drop table through Hook on the JM side, so we need an API to register the Hook on the JM side.

Introduce the operation of CTAS in Planner:

1) Compile SQL to generate CatalogBaseTable (The table to be created) and CreateTableASOperation.

...

2) Use Catalog#inferTableOption API to do options filling to CatalogBaseTable.  The specific implementation is determined by the Catalog.

If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.

For example, when using JdbcCatalog, if the user does not fill in any table options, JdbcCatalog can set connector to 'jdbc' and fill username, password and base-url;

when using HiveCatalogFor example, when using JDBCCatalog, if the user does not fill in any table options, JDBCCatalog HiveCatalog can set connector to 'jdbc' and fill username, password and base-url.hive';

User-implemented catalogs can also use this mechanism to fill in some options;

It should be noted that the InMemoryCatalog, the tables saved in it all exist in the external system, so the table options have to be filled in manually by the user, the Catalog cannot infer it automatically.

If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.


3) 3) Use CatalogBaseTable and Catalog objects to construct JobStatusHook so that Catalog can be used to ;

Because when the Hook code is finally executed on the JM side, the CatalogBaseTable needs to be create/drop the table when JobStatusHook executesthrough the Catalog;

So Catalog and CatalogBaseTable are member variables of hook, which also need to be serialized and can be passed to JM.


4) Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph.

...

Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable, it's the tools that Flink already provides.

For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized.

We save the classname of the Catalog together with the serialized content, like this:

Catalog ClassName
Catalog serialized data

Since the Catalog class may not have a parameterless constructor, so we can't use Class.newInstance Class#newInstance to initialize an object, we can use the framework objenesis to solve.

After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API.

This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

The client side needs to implement the serialization function of Catalog and CatalogBaseTable.

solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

For example, JdbcCatalog#serialize can save catalogName, defaultDatabase, username, pwd, baseUrl, and JdbcCatalog#deserialize can re-initialize a JdbcCatalog object through these parameters;

HiveCatalog#serialize can save catalogName, defaultDatabase, hiveConf, hiveVersion, and HiveCatalog#deserialize can re-initialize a HiveCatalog object through these parameters;

InMemoryCatalog#serialize only needs to save the catalogName and defaultDatabase, and InMemoryCatalog#deserialize can re-initialize an InMemoryCatalog object through these two parameters;Catalog and CatalogBaseTable related objects do not implement the Serializable interface, and most of them do not have parameterless constructors, so the serialization implementation is complex.





Runtime

Provide JM side, job status change hook mechanism.

...

Users need to be clear about their business needs and set the table options correctly.

The Catalog#inferTableOptions API can also automatically infer whether to add the connector attribute based on whether the Catalog supports ManagedTable.

Compatibility, Deprecation, and Migration Plan

...