Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

First of all, make it clear, CTAS command create table must go through catalog.

We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:

In-memory catalog:

  1. Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
  2. The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.

Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.

External catalog:

  1. Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
  2. The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.

Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog. 

Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.

Syntax

I suggest introducing a CTAS clause with a following syntax:

...

otherwise, data is visible in real time, which is consistent with the current flink behavior.

Batch mode:

Data should be written to the temporary directory first, visible after the final job is successful.

Public API Changes

Table Environment

...

Providing method that are used to execute CTAS for Table API

...

The executeSql method will be reused

...

languagejava
titleTableEnvironment
collapsetrue

user.

@PublicEvolving
public interface TableEnvironment {
    /**

...


*

...

Registers the given

...

 {@link Table}'s result as a catalog table with {@link TableDescriptor}'s options.
*

...


*

...

<p> 

...

CTAS 

...

for 

...

Table API.
*
* <p>Examples:
*

...

* <pre>{

...

@code

...

 

...

 

...

 

...

 

...

* tEnv.createTable("MyTable", TableDescriptor.forConnector("hive")
* .build());
*

...

 }</pre>
*
* @param path The path under which the table will be registered. See also the {@link
* TableEnvironment} class description for the format of the path.
*

...

 @param descriptor Template for creating a {@link 

...

CatalogTable} 

...

instance.
* @param query The {@link

...

Table} object describing the pipeline for further transformations.
*

...

/

...

void 

...

createTable(String 

...

path, 

...

TableDescriptor 

...

descriptor, 

...

Table query);
}


Catalog

We can think that there are two types of catalogs in Flink, in-memory catalogs and external catalogs:

In-memory catalog:

  1. Metadata is a copy of the metadata of the external system, and the user ensures that the entity exists in the external system and the metadata is consistency, otherwise, throw exception when running. CTAS need create table first, so it is hard to ensures that the entity exists in the external system and the metadata is consistency.
  2. The user needs to configure the parameters of the external system through the with syntax, and Flink cannot obtain it through the in-memory directory.

Such as kafka table, we need the user to tell us the address of the kafka server, the name of the topic, and the data serialization format, otherwise flink job will failed.

External catalog:

  1. Metadata directly refers to external systems, and there is no consistency problem. Create table also directly calls the external system, so it is naturally guaranteed that the entity exists in the external system.
  2. The with syntax parameter is optional, Flink can obtain the necessary parameters through the external catalog.

Such as hive table, we can obtain the table information required by the Flink engine through HiveCatalog. 


Both in-memory catalog and external catalog will support CTAS, if the CTAS command is executed in the in-memory catalog and the target store does not exist in the external system, the Flink job will fail, which is consistent with the current flink behavior.


In-memory Catalog ,we should check the table's options, avoid users not setting configuration parameters.

Providing method that are used to execute create table for CTAS to check table's options.

/**
* Creates a new table for CTAS.
*
* <p>The framework will make sure to call this method with fully validated {@link
* CatalogTable} Those instances are easy to serialize for a durable catalog implementation.
*
* @param tablePath path of the table to be created
* @param table the table definition
* @param dropIfExists flag to specify behavior when a table already exists at the
* given path: if set to false, it throws a TableAlreadyExistException, if set to true,
* drop the table first, then create.
* @param checkTableOptions flag to specify behavior to check {@param table}'s options.
* @throws TableAlreadyExistException if table already exists and ignoreIfExists is false
* @throws DatabaseNotExistException if the database in tablePath doesn't exist
* @throws CatalogException in case of any runtime exception
*/
void createTable(ObjectPath tablePath, CatalogTable table, boolean dropIfExists,
boolean checkTableOptions)
throws TableAlreadyExistException, DatabaseNotExistException, CatalogException;


Compatibility, Deprecation, and Migration Plan

...