Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

@PublicEvolving
public interface Table extends Explainable<Table>, Executable {

    /**
* Declare the pipeline defined by the given {@link Table} object
* to create the table at the specified path.
*/
CreatingTable saveAs(String tablePath);
}

CreatingTable

Add CreatingTable to use Proposing a public interface CreateOrReplaceTable used by CTAS for table API.

/** A table that need to be created for CTAS. */
@PublicEvolving
public interface CreatingTableCreateOrReplaceTable {

/**
* add table option.
* such as option("connector", "filesystem");
*/
CreatingTableCreateOrRepalceTable option(String key, String value);

/**
* Create the table under the specified path
* and write the pipeline data to this table.
*/
TablePipeline create();

/**
* Create the table under the specified path if not exist
* and write the pipeline data to this table.
*/
TablePipeline createIfNotExist();
}

The CreatingTable CreateOrReplaceTable interface is newly introduced because if we add the create/createIfNotExist API to the Table interface, the user must call the saveAs API before calling these API, which will cause additional usage costs to the user.

...

Catalog#inferTableOptions is convenient for users to customize the Catalog, and when it supports the CTAS function, the options of the table can be automatically inferred to avoid job failure due to lack of information.

The serialize/deserialize API helps realize the serialization and deserialization of the Catalog, and the user can decide what information needs to be saved during serialization, so that it can be used during deserialization.

...

The overall execution process is shown in the figure above.

Because Due to the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side, and create table and drop table are executed through Catalog. Therefore, a new Hook mechanism and Catalog serialization and deserialization solution need to be introduced.

The overall execution process of CTAS job is as follows,following:

  1. Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, and the Hook, Catalog and CatalogBaseTable are serialized.
  2. Submit the job to the cluster, if it is in detached mode, the client can exit.
  3. When the job starts, deserialize hooks, Catalog and CatalogBaseTable; Call the Catalog#createTable method through the hook to create the CatalogBaseTable.
  4. start task execution.
  5. If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by calling the hook of the Catalog#dropTable method.

...