Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Add CreatingTable to use CTAS for table API.

/** A table that need to be created for CTAS. */
@PublicEvolving
public interface CreatingTable {

/**
* add table option.
* such as option("connector", "filesystem");
*/
CreatingTable option(String key, String value);

/**
* Create the table under the specified path
* and write the pipeline data to this table.
*/
TablePipeline create();

/**
* Create the table under the specified path if not exist
* and write the pipeline data to this table.
*/
TablePipeline createIfNotExist();
}

Catalog

Providing method that are used to create stage tableserialize/deserialize Catalog and infer the option of CatalogBaseTable.

@PublicEvolving
public interface Catalog {

   /**
* When the CTAS function was used, the Catalog combined {@link Catalog#supportsManagedTable}
* to infer whether to add some options to the {@link CatalogBaseTable}.
*
* For example:
* {@link JdbcCatalog} can add user, password, url and other options to {@link CatalogBaseTable}.
*/
default void inferTableOptions(CatalogBaseTable table) {}

   /**
* Assist in the serialization of different catalogs.
* Catalog decides for itself which information needs to be serialized.
*/
default void serialize(OutputStream output) throws IOException {}

/**
* Assist in deserialization of different catalogs.
* Determine whether to create a new Catalog instance based on
* the deserialized data and the implementation of the catalog.
*/
default Catalog deserialize(InputStream input) throws ClassNotFoundException, IOException {
return this;
}
}

...

First of all, make it clear, CTAS command create table must go through catalog.

Implementation Plan

We will introduce new concepts: Stage Table.

Stage Table: Created through the Catalog#createStageTable API, stored in the memory of the Catalog, visible in the SQL compilation stage;

There will be a collection record stage table in the catalog, which is created in the catalog backend storage when the job status is CREATED;

Dropped from the catalog backend storage when the job status is FAILED or CANCELED.

Through the research summary and analysis, the overall implementation process is as follows:

Execution Flow

Through the research summary and analysis, the current status of CTAS in the field of big data is:

  • Flink: Flink dialect does not support CTAS. ==> LEVEL-1
  • Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
  • Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3

Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.

Execution Flow

Image AddedImage Removed

The overall execution process is shown in the figure above.

...

  1. Create sink table(stage table) through Catalog's new API createStageTable.
  2. Construct CTASJobStatusHook with Catalog as a construction parameter, CTASJobStatusHook is an implementation of the JobStatusHook interface.
  3. Register CTASJobStatusHook with StreamGraph, then passed to JobGraph and serialized(Need to implement serialization/deserialization of Catalog and JobStatusHook).
  4. When the job starts and the status is CREATED, the runtime module will call the JobStatusHook#onCreated method, and we call the Catalog#createTable method in the CTASJobStatusHook#onCreated method.
  5. When the final status of the job is FAILED, the runtime module will call the JobStatusHook#onFailed method, we call the Catalog#dropTable method in the CTASJobStatusHook#onFailed method.
  6. When the final status of the job is CANCELED, the runtime module will call the JobStatusHook#onCanceled method, we call the Catalog#dropTable method in the CTASJobStatusHook#onCanceled method.
  7. When the final status of the job is FINISH, the runtime module will call the JobStatusHook#onFinished method, and we do not need to do any additional operations.

Hook design

Definition of JobStatusHookRuntime

/**
* Hooks provided by users on job status changing.
*/
public interface JobStatusHook {

/** When Job become CREATED status. It would only be called one time. */
default void onCreated(JobID jobId) {}

/** When job finished successfully. */
default void onFinished(JobID jobId) {}

/** When job failed finally. */
default void onFailed(JobID jobId, Throwable throwable) {}

/** When job get canceled by users. */
default void onCanceled(JobID jobId) {}
}

Register JobStatusHookPlanner

public class StreamGraph implements Pipeline {

... ...

    /** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHook hook) {
...
}
}

...