Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.

Execution Flow

Image Removed

The overall execution process is shown in the figure above.

Because the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side.

Detailed execution process

Steps:

1) Compile SQL to generate CatalogBaseTable (The table to be created) and CreateTableASOperation.

2) Use Catalog#inferTableOption API to do options filling to CatalogBaseTable.  The specific implementation is determined by the Catalog.

If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.

For example, when using JDBCCatalog, if the user does not fill in any table options, JDBCCatalog can set connector to 'jdbc' and fill username, password and base-url.

3) Use CatalogBaseTable and Catalog objects to construct JobStatusHook so that Catalog can be used to create/drop the table when JobStatusHook executes.

4) Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph.

Because JobGraph uses ObjectOutputStream serialization, since CatalogBaseTable and Catalog cannot be serialized directly using ObjectOutputStream, JobStatusHook instances need to implement interface Externalizable.

Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable.

For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized.

We save the classname of the Catalog together with the serialized content, like this:

...

Since the Catalog class may not have a parameterless constructor, so we can't use Class.newInstance to initialize an object, we can use the framework objenesis to solve.

After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API.

This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

5) Submit the job to the cluster for execution. 

  1. Create sink table(stage table) through Catalog's new API createStageTable.
  2. Construct CTASJobStatusHook with Catalog as a construction parameter, CTASJobStatusHook is an implementation of the JobStatusHook interface.
  3. Register CTASJobStatusHook with StreamGraph, then passed to JobGraph and serialized(Need to implement serialization/deserialization of Catalog and JobStatusHook).
  4. When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method, and we call the Catalog#createTable method in the CTASJobStatusHook#onCreated method.
  5. When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method, we call the Catalog#dropTable method in the CTASJobStatusHook#onFailed method.
  6. When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method, we call the Catalog#dropTable method in the CTASJobStatusHook#onCanceled method.
  7. When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.

Hook design

Runtime

...

Planner

Providing method for planner to register JobStatusHook with StreamGraph.

public class StreamGraph implements Pipeline {

... ...

    /** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHook hook) {
...
}
}

The client side needs to implement the serialization function of Catalog and CatalogBaseTable.

Catalog and CatalogBaseTable related objects do not implement the Serializable interface, and most of them do not have parameterless constructors, so the serialization implementation is complex.

A simple way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog.

But this solution is only applicable to the way of create catalog using DDL, we can get the Catalog options through the with keyword.

If we use the Catalog registered in TableEnvironment#registerCatalog, we cannot get options.

There are a large number of users who use TableEnvironment#registerCatalog to register the Catalog in the production environment.

So this solution cannot cover the whole scene.

Runtime

Provide JM side, job status change hook mechanism.

/**
* Hooks provided by users on job status changing.
*/
public interface JobStatusHook {

/** When Job become CREATED status. It would only be called one time. */
default void onCreated(JobID jobId) {}

/** When job finished successfully. */
default void onFinished(JobID jobId) {}

/** When job failed finally. */
default void onFailed(JobID jobId, Throwable throwable) {}

/** When job get canceled by users. */
default void onCanceled(JobID jobId) {}
}

On the JM side, deserialize the JobStatusHook,

When the job status transitions to CREATED, call JobStatusHook#onCreated;

When the job status transitions to FAILED, call JobStatusHook#onFailed;

When the job status transitions to CANCELED, call JobStatusHook#onCanceled;

When the job status transitions to FINISH, call JobStatusHook#onFinished.

We can integrate create table in the onCreated method, Integrate drop table in onFailed and onCanceled methods.

If the job is configured with RestartStrategy, be aware that these methods will be called multiple times.

Execution Flow

Image Added

The overall execution process is shown in the figure above.

Because the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side.

Detailed execution process

Steps:

1) Compile SQL to generate CatalogBaseTable (The table to be created) and CreateTableASOperation.

2) Use Catalog#inferTableOption API to do options filling to CatalogBaseTable.  The specific implementation is determined by the Catalog.

If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.

For example, when using JDBCCatalog, if the user does not fill in any table options, JDBCCatalog can set connector to 'jdbc' and fill username, password and base-url.

3) Use CatalogBaseTable and Catalog objects to construct JobStatusHook so that Catalog can be used to create/drop the table when JobStatusHook executes.

4) Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph.

Because JobGraph uses ObjectOutputStream serialization, since CatalogBaseTable and Catalog cannot be serialized directly using ObjectOutputStream, JobStatusHook instances need to implement interface Externalizable.

Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable.

For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized.

We save the classname of the Catalog together with the serialized content, like this:

Catalog ClassName
Catalog serialized data

Since the Catalog class may not have a parameterless constructor, so we can't use Class.newInstance to initialize an object, we can use the framework objenesis to solve.

After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API.

This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

5) Submit the job to the cluster for execution. Then the runtime will call the corresponding method of JobStatusHook when the job status changes.

When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method and then call the Catalog#createTable method to create table.

When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method and then call the Catalog#dropTable method to drop table.

When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method and then call the Catalog#dropTable method to drop table.

When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.

Planner

...

Supported Job Mode

Support both streaming and batch mode.

...