Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

These two steps seem to be normal, but if there are many fields, spelling DDL statements can be difficult, and write out these columns in the following insert statement. Therefore, we can support CTAS (CREATE TABLE AS SELECT) like MySQL, Oracle, Microsoft SQL Server, Hive, Spark, etc ... It will be more user friendly. In addition, the Hive dialect already has some support for CTAS. My suggestion would be to support a variation of an optional Feature T172, “AS subquery clause in table definition”, of SQL standard.

Public API Changes

Table

Providing method that are used to execute CTAS for Table API user.

...

    /**
* Declare the pipeline defined by the given {@link Table} object
* to create the table at the specified path.
*/
    CreateOrRepalceTable saveAs(String tablePath);
}

CreateOrReplaceTable

Proposing a public interface CreateOrReplaceTable used by CTAS for table API.

...

Through the appendix research summary and analysis, the current status of CREAE TABLE AS SELECT(CTAS) in the field of big data is:

  • Flink: Flink dialect does not support CTAS. ==> LEVEL-1
  • Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
  • Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3

Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink in batch execution mode. However, in streaming mode, we don't provide atomicity guarantees because of job is long running. Moreover, at the moment there seems to be no strong need to guarantee atomicity in stream mode.

Syntax

I suggest introducing a CTAS clause with a following syntax:

Code Block
languagesql
titlesyntax
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]


Example:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 WITH ('connector.type' = 'hudi')
 AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;


Resulting table equivalent to:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Table

Providing method that are used to execute CTAS for Table API user.

@PublicEvolving
public interface Table extends Explainable<Table>, Executable {

    /**
* Declare the pipeline defined by the given {@link Table} object
* to create the table at the specified path.
*/
    CreateOrRepalceTable saveAs(String tablePath);
}

CreateOrReplaceTable

Proposing a public interface CreateOrReplaceTable used by CTAS for table API.

/** A table that need to be created for CTAS. */
@PublicEvolving
public interface CreateOrReplaceTable {

/**
* add table option.
* such as option("connector", "filesystem");
*/
CreateOrRepalceTable option(String key, String value);

/**
* Create the table under the specified path
* and write the pipeline data to this table.
*/
TablePipeline create();

/**
* Create the table under the specified path if not exist
* and write the pipeline data to this table.
*/
TablePipeline createIfNotExist();
}

The CreateOrReplaceTable interface is introduced newly because if we add the create/createIfNotExist API to the Table interface, the user must call the saveAs API before calling these API, which will cause additional usage costs to the user.

The recommended way to use is:

TablePipeline tablePipeline = table.saveAs("my_ctas_table")
.option("connector", "filesystem")
.option("format", "testcsv")
.option("path", "/tmp/my_ctas_table/")
.create();
tablePipeline.execute();

We save the properties set through the option API and set them in the CatalogBaseTable when executing the create/createIfNotExist API, so as to generate the TableSink.

Catalog

Providing method that are used to serialize/deserialize Catalog and infer the option of CatalogBaseTable.

@PublicEvolving
public interface Catalog {

The CreateOrReplaceTable interface is introduced newly because if we add the create/createIfNotExist API to the Table interface, the user must call the saveAs API before calling these API, which will cause additional usage costs to the user.

The recommended way to use is:

...

TablePipeline tablePipeline = table.saveAs("my_ctas_table")
.option("connector", "filesystem")
.option("format", "testcsv")
.option("path", "/tmp/my_ctas_table/")
.create();
tablePipeline.execute();

We save the properties set through the option API and set them in the CatalogBaseTable when executing the create/createIfNotExist API, so as to generate the TableSink.

Catalog

Providing method that are used to serialize/deserialize Catalog and infer the option of CatalogBaseTable.

@PublicEvolving
public interface Catalog {
/**
* When the CTAS function was used, the Catalog combined {@link Catalog#supportsManagedTable}
* to infer whether to add some options to the {@link CatalogBaseTable}.
*
* For example:
* {@link JdbcCatalog} can add connector = 'jdbc', user, password, url and other options to {@link CatalogBaseTable},
* {@link HiveCatalog} can add connector = 'hive' to {@link CatalogBaseTable}, help generate TableSink.
* The tables in {@link GenericInMemoryCatalog} already exist externally,
* options must be filled in manually by the user, and the Catalog cannot be automatically inferred.
*/
default CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) {
       throw new UnsupportedOperationException();
}
/**
* Assist in the serialization of different catalogs.
* Catalog decides for itself which information needs to be serialized.
*/
default void serialize(OutputStream output) throws IOException {}

   /**
* AssistWhen inthe deserialization of different catalogs.
* Determine whether to create a newCTAS function was used, the Catalog instancecombined based{
@link onCatalog#supportsManagedTable}
* to theinfer deserializedwhether datato andadd thesome implementationoptions ofto the catalog{@link CatalogBaseTable}.
*/
default* Catalog deserialize(InputStream input) throws ClassNotFoundException, IOException {
return this;
}
}

Catalog#inferTableOptions is convenient for users to customize the Catalog, and when it supports the CTAS function, the options of the table can be automatically inferred to avoid job failure due to lack of information.

The serialize/deserialize API helps realize the serialization and deserialization of Catalog, and the user can decide what information needs to be saved during serialization, so that it can be used during deserialization.

Proposed Changes

Syntax

I suggest introducing a CTAS clause with a following syntax:

Code Block
languagesql
titlesyntax
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]

Example:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 WITH ('connector.type' = 'hudi')
 AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Resulting table equivalent to:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Implementation Plan

Through the appendix research summary and analysis, the current status of CTAS in the field of big data is:

  • Flink: Flink dialect does not support CTAS. ==> LEVEL-1
  • Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
  • Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3

Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.

...

For example:
* {@link JdbcCatalog} can add connector = 'jdbc', user, password, url and other options to {@link CatalogBaseTable},
* {@link HiveCatalog} can add connector = 'hive' to {@link CatalogBaseTable}, help generate TableSink.
* The tables in {@link GenericInMemoryCatalog} already exist externally,
* options must be filled in manually by the user, and the Catalog cannot be automatically inferred.
*/
default CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) {
       throw new UnsupportedOperationException();
}
}

Catalog#inferTableOptions is convenient for users to customize the Catalog, and when it supports the CTAS function, the options of the table can be automatically inferred to avoid job failure due to lack of information.

Implementation Plan

The overall execution process is shown in the following figure above.

Image Added

Due to the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side, and create table and drop table are executed through Catalog. Therefore, a new Hook mechanism and Catalog serialization and deserialization solution need to be introduced. So the overall execution process of CTAS job is as following:

...