Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The overall execution process is shown in the figure above.

Because the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side.

Detailed execution process

Steps:

1) Compile SQL to generate CatalogBaseTable (The table to be created) and CreateTableASOperation.

2) Use Catalog#inferTableOption API to do options filling to CatalogBaseTable.  The specific implementation is determined by the Catalog.

If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.

For example, when using JDBCCatalog, if the user does not fill in any table options, JDBCCatalog can set connector to 'jdbc' and fill username, password and base-url.

3) Use CatalogBaseTable and Catalog objects to construct JobStatusHook so that Catalog can be used to create/drop the table when JobStatusHook executes.

4) Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph.

Because JobGraph uses ObjectOutputStream serialization, since CatalogBaseTable and Catalog cannot be serialized directly using ObjectOutputStream, JobStatusHook instances need to implement interface Externalizable.

Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable.

For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized.

We save the classname of the Catalog together with the serialized content, like this:

Catalog ClassName
Catalog serialized data

Since the Catalog class may not have a parameterless constructor, so we can't use Class.newInstance to initialize an object, we can use the framework objenesis to solve.

After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API.

This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

5) Submit the job to the cluster for execution. 



  1. Create sink table(stage table) through Catalog's new API createStageTable.
  2. Construct CTASJobStatusHook with Catalog as a construction parameter, CTASJobStatusHook is an implementation of the JobStatusHook interface.
  3. Register CTASJobStatusHook with StreamGraph, then passed to JobGraph and serialized(Need to implement serialization/deserialization of Catalog and JobStatusHook).
  4. When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method, and we call the Catalog#createTable method in the CTASJobStatusHook#onCreated method.
  5. When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method, we call the Catalog#dropTable method in the CTASJobStatusHook#onFailed method.
  6. When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method, we call the Catalog#dropTable method in the CTASJobStatusHook#onCanceled method.
  7. When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.

...