You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 41 Next »

Status

Current state["Under Discussion"]

Discussion thread: here (<- link to https://lists.apache.org/thread/mc0lv4gptm7som02hpob1hdp3hb1ps1v)

JIRA: Unable to render Jira issues macro, execution error.

Released: 1.16

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The current syntax/features of Flink SQL is very perfect in both stream mode and batch mode.

But there are still some usability to improve.

for example, If the user wants to insert data into a new table, 2 steps are required:

First, prepare the DDL statement of the table named t1;

Second, insert the data into t1;

These two steps seem to be normal, but if there are many fields, spelling DDL statements can be difficult, 

and write out these columns in the following insert statement.

Therefore, we can support CTAS (CREATE TABLE AS SELECT) like MySQL, Oracle, Microsoft SQL Server, Hive, Spark, etc ...

It will be more user friendly. In addition, the Hive dialect already has some support for CTAS.

My suggestion would be to support a variation of an optional Feature T172, “AS subquery clause in table definition”, of SQL standard.

Public API Changes

Table

Providing method that are used to execute CTAS for Table API user.

@PublicEvolving
public interface Table extends Explainable<Table>, Executable {

    /**
* Declare the pipeline defined by the given {@link Table} object
* to create the table at the specified path.
*/
CreatingTable saveAs(String tablePath);
}

CreatingTable

Add CreatingTable to use CTAS for table API.

/** A table that need to be created for CTAS. */
@PublicEvolving
public interface CreatingTable {

/**
* add table option.
* such as option("connector", "filesystem");
*/
CreatingTable option(String key, String value);

/**
* Create the table under the specified path
* and write the pipeline data to this table.
*/
TablePipeline create();

/**
* Create the table under the specified path if not exist
* and write the pipeline data to this table.
*/
TablePipeline createIfNotExist();
}

The CreatingTable interface is newly introduced because if we add the create/createIfNotExist API to the Table interface,

the user must call the saveAs API before calling these API, which will cause additional usage costs to the user.

The recommended way to use is:

TablePipeline tablePipeline = table.saveAs("my_ctas_table")
.option("connector", "filesystem")
.option("format", "testcsv")
.option("path", "/tmp/my_ctas_table/")
.create();
tablePipeline.execute();

We save the properties set through the option API and set them in the CatalogBaseTable when executing the create/createIfNotExist API, so as to generate the TableSink.

Catalog

Providing method that are used to serialize/deserialize Catalog and infer the option of CatalogBaseTable.

@PublicEvolving
public interface Catalog {

   /**
* When the CTAS function was used, the Catalog combined {@link Catalog#supportsManagedTable}
* to infer whether to add some options to the {@link CatalogBaseTable}.
*
* For example:
* {@link JdbcCatalog} can add connector = 'jdbc', user, password, url and other options to {@link CatalogBaseTable},
* {@link HiveCatalog} can add connector = 'hive' to {@link CatalogBaseTable}, help generate TableSink.
* The tables in {@link GenericInMemoryCatalog} already exist externally,
* options must be filled in manually by the user, and the Catalog cannot be automatically inferred.
*/
default void inferTableOptions(ObjectPath tablePath, CatalogBaseTable table) {}

   /**
* Assist in the serialization of different catalogs.
* Catalog decides for itself which information needs to be serialized.
*/
default void serialize(OutputStream output) throws IOException {}

/**
* Assist in deserialization of different catalogs.
* Determine whether to create a new Catalog instance based on
* the deserialized data and the implementation of the catalog.
*/
default Catalog deserialize(InputStream input) throws ClassNotFoundException, IOException {
return this;
}
}

Catalog#inferTableOptions is convenient for users to customize the Catalog, and when it supports the CTAS function,

the options of the table can be automatically inferred to avoid job failure due to lack of information.

The serialize/deserialize API helps realize the serialization and deserialization of the Catalog,

and the user can decide what information needs to be saved during serialization, so that it can be used during deserialization.

Proposed Changes

Syntax

I suggest introducing a CTAS clause with a following syntax:

syntax
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]


Example:

syntax
CREATE TABLE ctas_hudi
 WITH ('connector.type' = 'hudi')
 AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;


Resulting table equivalent to:

syntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Implementation Plan

Through the appendix research summary and analysis, the current status of CTAS in the field of big data is:

  • Flink: Flink dialect does not support CTAS. ==> LEVEL-1
  • Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-2
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-3
  • Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-3

Combining the current situation of Flink and the needs of our business, choosing a Level-2 implementation for Flink.

The overall execution process is shown in the figure above.

Because the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side, and create table and drop table are executed through Catalog.

Therefore, a new Hook mechanism and Catalog serialization and deserialization solution need to be introduced.

The overall execution process is as follows,

  1. Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, and the Hook, Catalog and CatalogBaseTable are serialized.
  2. Submit the job to the cluster, if it is in detached mode, the client can exit.
  3. When the job starts, deserialize hooks, Catalog and CatalogBaseTable; Call the Catalog#createTable method through the hook to create the CatalogBaseTable.
  4. start task execution.
  5. If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by calling the hook of the Catalog#dropTable method.

Planner

Providing method for planner to register JobStatusHook with StreamGraph.

public class StreamGraph implements Pipeline {

... ...

    /** Registers the JobStatusHook. */
void addJobStatusHook(JobStatusHook hook) {
...
}
}

The final tasks of the job are all generated by Planner. We want to complete the create table/drop table through Hook on the JM side, so we need an API to register the Hook on the JM side.

Introduce the operation of CTAS in Planner:

1) Compile SQL to generate CatalogBaseTable (The table to be created) and CreateTableASOperation.


2) Use Catalog#inferTableOption API to do options filling to CatalogBaseTable.  The specific implementation is determined by the Catalog.

For example, when using JdbcCatalog, if the user does not fill in any table options, JdbcCatalog can set connector to 'jdbc' and fill username, password and base-url;

when using HiveCatalog, if the user does not fill in any table options, HiveCatalog can set connector to 'hive';

User-implemented catalogs can also use this mechanism to fill in some options;

It should be noted that the InMemoryCatalog, the tables saved in it all exist in the external system, so the table options have to be filled in manually by the user, the Catalog cannot infer it automatically.

If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.


3) Use CatalogBaseTable and Catalog objects to construct JobStatusHook;

Because when the Hook code is finally executed on the JM side, the CatalogBaseTable needs to be create/drop through the Catalog;

So Catalog and CatalogBaseTable are member variables of hook, which also need to be serialized and can be passed to JM.


4) Planner registers JobStatusHook with StreamGraph, then the JobStatusHook is also serialized and passed to JM through the serialization of JobGraph.

Because JobGraph uses ObjectOutputStream serialization, since CatalogBaseTable and Catalog cannot be serialized directly using ObjectOutputStream, JobStatusHook instances need to implement interface Externalizable.

Internally use CatalogPropertiesUtil to serialize/deserialize CatalogBaseTable, it's the tools that Flink already provides.

For Catalog, we have added serialize and deserialize APIs, and the Catalog implements its own properties that need to be serialized.

We save the classname of the Catalog together with the serialized content, like this:

Catalog ClassName
Catalog serialized data

Since the Catalog class may not have a parameterless constructor, so we can't use Class#newInstance to initialize an object, we can use the framework objenesis to solve.

After using objenesis to get the Catalog object (an empty Catalog instance), get the real Catalog instance through the Catalog#deserialize API.

This solves the serialization/deserialization problem of CatalogBaseTable and Catalog.

For example, JdbcCatalog#serialize can save catalogName, defaultDatabase, username, pwd, baseUrl, and JdbcCatalog#deserialize can re-initialize a JdbcCatalog object through these parameters;

HiveCatalog#serialize can save catalogName, defaultDatabase, hiveConf, hiveVersion, and HiveCatalog#deserialize can re-initialize a HiveCatalog object through these parameters;

InMemoryCatalog#serialize only needs to save the catalogName and defaultDatabase, and InMemoryCatalog#deserialize can re-initialize an InMemoryCatalog object through these two parameters;

The tables in the InMemoryCatalog already exist in the external system. The metadata information held in the InMemoryCatalog is only used by the job itself,

and is held only in memory. Therefore, all metadata information in the InMemoryCatalog does not need to be serialized and passed to JM. In JM, only need to initialize a new InMemoryCatalog.

Runtime

Provide JM side, job status change hook mechanism.

/**
* Hooks provided by users on job status changing.
*/
@Internal
public interface JobStatusHook {

/** When Job become CREATED status. It would only be called one time. */
default void onCreated(JobID jobId) {}

/** When job finished successfully. */
default void onFinished(JobID jobId) {}

/** When job failed finally. */
default void onFailed(JobID jobId, Throwable throwable) {}

/** When job get canceled by users. */
default void onCanceled(JobID jobId) {}
}

Flink's current Hook design cannot meet the needs of CTAS.

For example, the JobListener is on the Client side;

JobStatusListener is on the JM side, but it cannot be serialized.

Thus we tend to add a new interface JobStatusHook, which could be attached to the JobGraph and executed in the JobMaster. The interface will also be marked as Internal. 

When the task starts, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.

Through the previous method of serializing and deserializing Catalog and CatalogBaseTable, when deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.

Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.

When deserializing a Catalog, first read the Catalog ClassName, then use the framework objenesis to get an empty instance of the Catalog,

and finally call the Catalog#deserialize method to get a valid Catalog instance.

When the job is start and the job status changes, the JobStatusHook method will be called:

For example, our JobStatusHook implementation is called CTASJobStatusHook, and uses JdbcCatalog:

The JdbcCatalog serialized by planner is like this :

org.apache.flink.connector.jdbc.catalog.JdbcCatalog

'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'

We can get an empty instance of JdbcCatalog through the framework objenesis, and then read the default-database, username, password, base-url in the JdbcCatalog#deserialize method,

so that a valid JdbcCatalog can be initialized.

Then when the job status changes, the CTASJobStatusHook method can be called:

When the job status is CREATED, the runtime module will call the CTASJobStatusHook#onCreated method, which will call the JdbcCatalog#createTable method to create a table.

When the final status of the job is FAILED, the runtime module will call the CTASJobStatusHook#onFailed method, which will call the JdbcCatalog#dropTable method to drop table.

When the final status of the job is CANCELED, the runtime module will call the CTASJobStatusHook#onCanceled method, which will call the JdbcCatalog#dropTable method to drop table.

When the final status of the job is FINISH, the runtime module will call the CTASJobStatusHook#onFinished method, and we do not need to do any additional operations.

Atomicity & Data Visibility

Atomicity

CTAS does not provide strict atomicity, we will create the table first, the final atomicity is support by the JobStatusHook(If the final status of the job is not SUCCESSED, then drop the table).

Data Visibility

Regarding data visibility, it is determined by the TableSink and runtime-mode:

Stream mode:

If the external storage system supports transactions or two-phase commit, then data visibility is related to the Checkpoint cycle,

otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

Batch mode:
  • FileSystem Sink: Data should be written to the temporary directory first, visible after the final job is successful(final visibility).
  • Two-phase commit Sink:  Data visible after the final job is successful(final visibility).
  • Supports transaction Sink:  Commit transactions after the final job is successful(final visibility). Commit transactions periodically or with a fixed number of records(incremental visibility).
  • Other Sink:  Data is visible immediately after writing(write-visible).

Managed Table

For Managed Table, please refer to FLIP-188 . Table options that do not contain the ‘connector’ key and value represent a managed table. CTAS also follows this principle.

For details, please refer to the Table Store docs: https://nightlies.apache.org/flink/flink-table-store-docs-master/docs/development/create-table/

CTAS supports Managed Table and Non-Managed Table.

Users need to be clear about their business needs and set the table options correctly.

The Catalog#inferTableOptions API can also automatically infer whether to add the connector attribute based on whether the Catalog supports ManagedTable.

Compatibility, Deprecation, and Migration Plan

It is a new feature with no implication for backwards compatibility.

Test Plan

changes will be verified by UT

Rejected Alternatives

Catalog serialize

A simple way to serialize the catalog is to save the options of the catalog in the CatalogManager, so that the JM side only needs to use these options to re-initialize the catalog.

For example,  create catalog use DDL

CREATE CATALOG my_catalog WITH(
'type' = 'jdbc',
'default-database' = '...',
'username' = '...',
'password' = '...',
'base-url' = '...'
);

we just need to save the default-database, username, password, base-url, and then re-initialize the JdbcCatalog on JM.

The advantage of this solution is that the implementation is simple and convenient, and does not require complex serialization and deserialization tools.

The disadvantage of this solution is that it cannot cover the entire scene.

Because this solution is only applicable to the way of create catalog using DDL, we can get the Catalog options through the with keyword.

If we use the Catalog registered in TableEnvironment#registerCatalog, we cannot get options.

There are a large number of users who use TableEnvironment#registerCatalog to register the Catalog in the production environment.

References

  1. Support SELECT clause in CREATE TABLE(CTAS)
  2. MySQL CTAS syntax
  3. Microsoft Azure Synapse CTAS
  4. LanguageManual DDL#Create/Drop/ReloadFunction
  5. Spark Create Table Syntax

Appendix

Program research

I investigated other bigdata engine implementations such as hive, spark:

Hive(MR) :atomic

Hive MR is client mode, the client is responsible for parsing, compiling, optimizing, executing, and finally cleaning up.

Hive executes the CTAS command as follows:

  1. Execute query first, and write the query result to the temporary directory.
  2. If all MR tasks are executed successfully, then create a table and load the data.
  3. If the execution fails, the table will not be created.

Spark(DataSource v1) : non-atomic

There is a role called driver in Spark, the driver is responsible for compiling tasks, applying for resources, scheduling task execution, tracking task operation, etc.

Spark executes CTAS steps as follows:

  1. Create a sink table based on the schema of the query result.
  2. Execute the spark task and write the result to a temporary directory.
  3. If all Spark tasks are executed successfully, use the Hive API to load data into the sink table created in the first step.
  4. If the execution fails, driver will drop the sink table created in the first step.

Spark(DataSource v2, Not yet completed, Hive Catalog is not supported yet) : optional atomic

Non-atomic

 Non-atomic implementation is consistent with DataSource v1 logic. For details, see CreateTableAsSelectExec .

Atomic

Atomic implementation( for details, see AtomicCreateTableAsSelectExec), supported by StagingTableCatalog and StagedTable .

StagedTable supports commit and abort. 

StagingTableCatalog is in memory, when executes CTAS steps as follows:

  1. Create a StagedTable based on the schema of the query result, but it is not visible in the catalog.
  2. Execute the spark task and write the result into StagedTable.
  3. If all Spark tasks are executed successfully, call StagedTable#commitStagedChanges(), then it is visible in the catalog.
  4. If the execution fails, call StagedTable#abortStagedChanges().

Research summary

We want to unify the semantics and implementation of Streaming and Batch, we finally decided to use the implementation of Spark DataSource v1.

Reasons:

  • Streaming mode requires the table to be created first(metadata sharing), downstream jobs can consume in real time.
  • In most cases, Streaming jobs do not need to be cleaned up even if the job fails(Such as Redis, cannot be cleaned unless all keys written are recorded).
  • Batch jobs try to ensure final atomicity(The job is successful and the data is visible; otherwise, drop the metadata and delete the temporary data).
















  • No labels