Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The recommended way to use is:

TablePipeline tablePipeline = table.saveAs("my_ctas_table")
.option("connector", "filesystem")
.option("format", "testcsv")
.option("path", "/tmp/my_ctas_table/")
.create();
tablePipeline.execute();

We save the properties set through the option API and set them in the CatalogBaseTable when executing the create/createIfNotExist API, so as to generate the TableSink.

...

public class StreamGraph implements Pipeline {

... ...

    /** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHook hook) {
...
}
}

The client side needs to implement the serialization function of Catalog and CatalogBaseTable.

Catalog and CatalogBaseTable related objects do not implement the Serializable interface, and most of them do not have parameterless constructors, so the serialization implementation is complex.

Runtime

Provide JM side, job status change hook mechanism.

...


1) Compile SQL to generate CatalogBaseTable (The table to be created) and CreateTableASOperation.

2) Use Catalog#inferTableOption API to do options filling to CatalogBaseTable.  The specific implementation is determined by the Catalog.

If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.

For example, when using JDBCCatalog, if the user does not fill in any table options, JDBCCatalog can set connector to 'jdbc' and fill username, password and base-url.

3) Use CatalogBaseTable and Catalog objects to construct JobStatusHook so that Catalog can be used to create/drop the table when JobStatusHook executes.

4) Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph.

Because JobGraph uses ObjectOutputStream serialization, since CatalogBaseTable and Catalog cannot be serialized directly using ObjectOutputStream, JobStatusHook instances need to implement interface Externalizable.

Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable.

For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized.

We save the classname of the Catalog together with the serialized content, like this:

Catalog ClassName
Catalog serialized data

Since the Catalog class may not have a parameterless constructor, so we can't use Class.newInstance to initialize an object, we can use the framework objenesis to solve.

After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API.

This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

The client side needs to implement the serialization function of Catalog and CatalogBaseTable.

Catalog and CatalogBaseTable related objects do not implement the Serializable interface, and most of them do not have parameterless constructors, so the serialization implementation is complex.


Runtime

Provide JM side, job status change hook mechanism.

/**
* Hooks provided by users on job status changing.
*/
public interface JobStatusHook {

/** When Job become CREATED status. It would only be called one time. */
default void onCreated(JobID jobId) {}

/** When job finished successfully. */
default void onFinished(JobID jobId) {}

/** When job failed finally. */
default void onFailed(JobID jobId, Throwable throwable) {}

/** When job get canceled by users. */
default void onCanceled(JobID jobId) {}
}

On the JM side, deserialize the JobStatusHook,

When the job status transitions to CREATED, call JobStatusHook#onCreated;

When the job status transitions to FAILED, call JobStatusHook#onFailed;

When the job status transitions to CANCELED, call JobStatusHook#onCanceled;

When the job status transitions to FINISH, call JobStatusHook#onFinished.

We can integrate create table in the onCreated method, Integrate drop table in onFailed and onCanceled methods.



5) Submit the job to the cluster for execution. Then the runtime will call the corresponding method of JobStatusHook when the job status changes.

When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method and then call the Catalog#createTable method to create

On the JM side, deserialize the JobStatusHook,

When the job status transitions to CREATED, call JobStatusHook#onCreated;

When the job status transitions to FAILED, call JobStatusHook#onFailed;

When the job status transitions to CANCELED, call JobStatusHook#onCanceled;

When the job status transitions to FINISH, call JobStatusHook#onFinished.

We can integrate create table in the onCreated method, Integrate drop table in onFailed and onCanceled methods.

If the job is configured with RestartStrategy, be aware that these methods will be called multiple times.

Execution Flow

Detailed execution process

Steps:

1) Compile SQL to generate CatalogBaseTable (The table to be created) and CreateTableASOperation.

2) Use Catalog#inferTableOption API to do options filling to CatalogBaseTable.  The specific implementation is determined by the Catalog.

If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.

For example, when using JDBCCatalog, if the user does not fill in any table options, JDBCCatalog can set connector to 'jdbc' and fill username, password and base-url.

3) Use CatalogBaseTable and Catalog objects to construct JobStatusHook so that Catalog can be used to create/drop the table when JobStatusHook executes.

4) Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph.

Because JobGraph uses ObjectOutputStream serialization, since CatalogBaseTable and Catalog cannot be serialized directly using ObjectOutputStream, JobStatusHook instances need to implement interface Externalizable.

Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable.

For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized.

We save the classname of the Catalog together with the serialized content, like this:

...

Since the Catalog class may not have a parameterless constructor, so we can't use Class.newInstance to initialize an object, we can use the framework objenesis to solve.

After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API.

This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

5) Submit the job to the cluster for execution. Then the runtime will call the corresponding method of JobStatusHook when the job status changes.

When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method and then call the Catalog#createTable method to create table.

When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method and then call the Catalog#dropTable method to drop table.

When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method and then call the Catalog#dropTable method to drop table.

When the final status of the job is FINISHFAILED, the runtime module will call the JobStatusHook#onFinished JobStatusHook#onFailed method , and we do not need to do any additional operations.

Precautions

when need drop table:

  1. User manually cancel the job.
  2. Job final FAILED status, such as after exceeds the maximum number of task Failovers.

Drop table and TableSink are strongly bound:

Do not do drop table operations in the framework, drop table is implemented in TableSink according to the needs of specific TableSink, because the operations of different sinks is different.

For example, in HiveTableSink, we need to delete the temporary directory and drop the metadata in the Metastore, but FileSystemTableSink only need to delete the temporary directory,

and then call the Catalog#dropTable method to drop table.

When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method and then call the Catalog#dropTable method to drop table.

When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operationsit is also possible that no operations is required.

Atomicity & Data Visibility

...