Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

public class StreamGraph implements Pipeline {

   
    private final List<JobStatusHook> jobStatusHooks = new ArrayList<>();

... ...

    /** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHookList<JobStatusHook> hook) {
...
}
}

The final tasks of the job are all generated by Planner. We want to complete the create table/drop table through Hook on the JM side, so we need an API to register the Hook on the JM side.

Introduce the operation of CTAS in Planner:

step1:

1) Compile SQL to generate CatalogBaseTable (The table to be created) and CreateTableASOperation.

step2:

2) Use Catalog#inferTableOption API to do options filling to CatalogBaseTable.  The The specific implementation is determined by the Catalog.

For example, when using JdbcCatalog, if the user does not fill in any table options, JdbcCatalog can set connector to 'jdbc' and fill username, password and base-url; when using HiveCatalog, if the user does not fill in any table options, HiveCatalog can set connector to 'hive';User-implemented catalogs can also use this mechanism to fill in some options;.

It should be noted that the InMemoryCatalog, the tables saved in it all exist in the external system, so the table options have to be filled in manually by the user, the Catalog cannot infer it automatically. If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.

step3:

3) Use CatalogBaseTable and Catalog objects to construct JobStatusHook; Because when the Hook code is finally executed on the JM side, the CatalogBaseTable needs to be create/drop through the Catalog; So Catalog and CatalogBaseTable are member variables of hook, which also need to be serialized and can be passed to JM.

step4:

4) Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph. Because JobGraph uses ObjectOutputStream serialization, since CatalogBaseTable and Catalog cannot be serialized directly using ObjectOutputStream, JobStatusHook instances need to implement interface Externalizable. Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable, it's the tools that Flink already provides.

For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized. We save the classname of the Catalog together with the serialized content, like this:

...

Since the Catalog class may not have a parameterless constructor, so we can't use Class#newInstance to initialize an object, we can use the framework objenesis to solve. After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API. This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

For example, JdbcCatalog#serialize can save catalogName, defaultDatabase, username, pwd, baseUrl, and JdbcCatalog#deserialize can re-initialize a JdbcCatalog object through these parameters; HiveCatalog#serialize can save catalogName, defaultDatabase, hiveConf, hiveVersion, and HiveCatalog#deserialize can re-initialize a HiveCatalog object through these parameters; InMemoryCatalog#serialize only needs to save the catalogName and defaultDatabase, and InMemoryCatalog#deserialize can re-initialize an InMemoryCatalog object through these two parameters;.

The tables in the InMemoryCatalog already exist in the external system. The metadata information held in the InMemoryCatalog is only used by the job itself, and is held only in memory. Therefore, all metadata information in the InMemoryCatalog does not need to be serialized and passed to JM. In JM, only need to initialize a new InMemoryCatalog.

...

/**
* Hooks provided by users on job status changing.
*/
@Internal
public interface JobStatusHook extends Serializable {

/** When Job become CREATED status. It would only be called one time. */
default void onCreated(JobID jobId) {}

/** When job finished successfully. */
default void onFinished(JobID jobId) {}

/** When job failed finally. */
default void onFailed(JobID jobId, Throwable throwable) {}

/** When job get canceled by users. */
default void onCanceled(JobID jobId) {}
}

Flink's current Hook design cannot meet the needs of CTAS. For example, the JobListener is on the Client side; JobStatusListener is on the JM side, but it cannot be serialized. Thus we tend to add propose a new interface JobStatusHook, which could be attached to the JobGraph and executed in the JobMaster. The interface will also be marked as Internal. 

  1. When the task starts, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.
  2. Through the previous method of serializing and deserializing Catalog and CatalogBaseTable, when deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.
    • Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.
    • When deserializing a Catalog, first read the Catalog ClassName, then use the framework objenesis to get an empty instance of the Catalog,
    • and finally call the Catalog#deserialize method to get a valid Catalog instance.
  3. When the job is start and the job status changes, the JobStatusHook method will be called:

For example, our JobStatusHook implementation is called CTASJobStatusHook, and uses JdbcCatalog:

...

We can get an empty instance of JdbcCatalog through the framework objenesis, and then read the default-database, username, password, base-url in the JdbcCatalog#deserialize method, so that a valid JdbcCatalog can be initialized. Then when the job status changes, the CTASJobStatusHook method can be called:

  • When the job status is CREATED, the runtime module will call the CTASJobStatusHook#onCreated method, which will call the JdbcCatalog#createTable method to create a table.
  • When the final status of the job is FAILED, the runtime module will call the CTASJobStatusHook#onFailed method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is CANCELED, the runtime module will call the CTASJobStatusHook#onCanceled method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is FINISH, the runtime module will call the CTASJobStatusHook#onFinished method, and we do not need to do any additional operations.

Atomicity & Data Visibility

...

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,otherwise. Otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

...

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle. For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/.

CTAS supports Managed Table and Non-Managed Table.Users , user need to be clear about their business needs and set the table options correctly. The Catalog#inferTableOptions API can also automatically infer whether to add the connector attribute based on whether the Catalog supports ManagedTable.

...

Because this solution is only applicable to the way of create catalog using DDL, we can get the Catalog options through the with keyword. If we use the Catalog registered in TableEnvironment#registerCatalog, we cannot get options. There are a large number of users who use TableEnvironment#registerCatalog to register the Catalog in the production environment.

...