Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Combining the current situation of Flink and the needs of our business, choosing a Level-2 atomicity implementation for Flink in batch execution mode. However, in streaming mode, we don't provide atomicity guarantees because of job is long running. Moreover, at the moment here no strong needs to guarantee atomicity in stream mode.

Syntax

I suggest introducing a CTAS clause with a following syntaxWe proposing the CREATE TABLE AS SELECT(CTAS) clause as following:


Code Block
languagesql
titlesyntax
CREATE TABLE [ IF NOT EXISTS ] table_name 
[ WITH ( table_properties ) ]
[ AS query_expression ]

The optional AS query_expression is we new propose change which is the SELECT query expression.


Assuming the following CTAS queryExample:

Code Block
languagesql
titlesyntax
CREATE TABLE ctas_hudi
 WITH ('connector.type' = 'hudi')
 AS SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

...

Resulting table equivalent to:

Code Block
languagesql
themeEmacs
titlesyntax
CREATE TABLE ctas_hudi
 (
 	id BIGINT,
 	name STRING,
 	age INT
 )
 WITH ('connector.type' = 'hudi');

INSERT INTO ctas_hudi SELECT id, name, age FROM hive_catalog.default.test WHERE mod(id, 10) = 0;

Public Interface

Table

Providing method that are used to execute CTAS(CREATE TABLE AS SELECT) for Table API user.

@PublicEvolving
public interface Table extends Explainable<Table>, Executable {

    /**
* Declare the pipeline defined by the given {@link Table} object
* to create the table at the specified path.
*/
    CreateOrRepalceTable saveAs(String tablePath);
}

CreateOrReplaceTable

Proposing a public interface CreateOrReplaceTable used by CTAS(CREATE TABLE AS SELECT) for table API user.

...

We save the properties set through the option API and set them in the CatalogBaseTable when executing the create/createIfNotExist API, so as to generate the TableSink.

Catalog

...

Regarding Catalog interface, to support Create Table As Select syntax, two changes are needed here:

  1. This interface should extands the java Serializable interface, then it can be serialized as a part of JobGraph and pass to JM side
  2. Providing a new method inferTableOptions that is used to infer the options of CatalogBaseTable, these options will be used to compile the sql to JobGraph successfully. This method throw UnsupportedOperationException defaultlly.
/**
* The CREATE TABLE AS SELECT(CTAS) featuresyntax requires CREATE/DROP TABLE operations via Catalog on the JM side,
* so this interface Catalogshould extends Serializable sointerface, thatthen Catalogit can be passedserialized toas JMa frompart the clientof {@link JobGraph}.
*/

@PublicEvolving
public interface Catalog extends Serializable {

    /**
* This method is used to infer the default options for {@link CatalogBaseTable} through {@link Catalog} options to compile
* the sql successfully by planner when using the {@code Create Table As Select} syntax.
*
* Assuming an user want to select data from a kafka table and then insert the result to mysql table, if the mysql table is not existed in
* in physical mysql storage, user also doesn't want to create the table manually in mysql side because of complex type mapping.
* User can create the {@link JdbcCatalog} firstly which connect to the msyql instance, then use
* {@code CREATE TABLE `mysql`.`user_db`.`order_cnt` AS SELECT * FROM `KafkaTable`} syntax, it is convenient to load data from kafka
* to msyql. Due to the {@link JdbcCatalog} has provides user, password, url and other options, so user doesn't need to fill the
* option in the query. If user doesn't specify the target table required options, planner will call this method to fill the options to
* {@link CatalogBaseTable} which are need to compile sql by planner.

*
* <p>{@link JdbcCatalog} example:
* <pre>{@code
* // If the user does not set any Table's options,
* // then the implementation of JdbcCatalog#inferTableOptions
* // can be like this to avoid the execution failure.
* public CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
* Map<String, String> tableOptions = table.getOptions();
* tableOptions.put("connector", "jdbc");
* tableOptions.put("url", getBaseUrl());
* tableOptions.put("table-name", tablePath.getObjectName());
* tableOptions.put("username", getUsername());
* tableOptions.put("password", getPassword());
* return table.copy(tableOptions);
* }
* }</pre>
*/
default CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
throw new UnsupportedOperationException();
}

}

Catalog#inferTableOptions is convenient for users to customize the Catalog, and when it supports the CTAS function, the options of the table can be automatically inferred to avoid job failure due to lack of information.

CatalogDatabase


CatalogDatabase

In the InMemoryCatalog scenario, in order to avoid the situation that the database does not exist when the table creation is executed on the JM side, we also need to serialize the CatalogDatabase when we serialize InMemoryCatalog, so the CatalogDatabase needs to extend Serializable. Currently only InMemoryCatalog serialization requires serializing CatalogDatabase.CatalogDatabase extensions as serializable

/** Interface of a database in a catalog, and can be serialized along with Catalog. */
@PublicEvolving
public interface CatalogDatabase extends Serializable {

... ...

}

In the InMemoryCatalog scenario, in order to avoid the situation that the database does not exist when the table creation is executed on the JM side, we also serialize the CatalogDatabase when we serialize InMemoryCatalog, so the CatalogDatabase needs to extend Serializable.

Currently only InMemoryCatalog serialization requires serializing CatalogDatabase.

Implementation Plan

The overall execution process is shown in the following figure.

...

Due to the client process may exit soon, such as detached mode, choose to create/drop the table on the JM side, and create table and drop table are executed through Catalog, and the job execution maybe take a longer time, so the table creation(before job start) and drop(when job failed or cancelled) should be executed in JM side instead of client. In addition, the creation and drop of table is executed through Catalog which is consistent with existing behavior. Therefore, a new Hook mechanism and Catalog serialization and deserialization solution need to be introduced. So should be introduced which is needed by JM to execute the corresponding action,  the hook depend on Catalog to complete the function. In summary, the overall execution process of CTAS job is as following:

  1. Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, and the Hook, Catalog and CatalogBaseTable are serializedare the member variable of Hook which should be serializable .
  2. Submit the job to the cluster , if it is in detached mode, through the client can exit.
  3. When the job starts, construct the hook object,  deserialize hooks, the Catalog and CatalogBaseTable ; Call in hook. Then call the Catalog#createTable method through the hook to create the CatalogBaseTable.
  4. Task start task executionto execute.
  5. If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by calling the hook of to call the Catalog#dropTable method.

...

The final tasks of the job are all generated by Planner. We want to complete the create/drop table action through Hook on the JM side, so we need an API to register the Hook on the JM side.

Introduce Introducing the process of CTAS in Planner:

...

Compile SQL to generate CatalogBaseTable (The sink table to be created) and CreateTableASOperation.

...

Use Catalog#inferTableOptions interface method to do options filling to filling the required options of CatalogBaseTable. The specific implementation is determined by the Catalog.

For example, when using JdbcCatalog, if the user does not fill in any table options, JdbcCatalog can will set connector to 'jdbc' and fill username, password and base-url; when .  When using HiveCatalog, if the user does not fill in any table options, HiveCatalog can will set connector to 'hive'; User-implemented catalogs can also use this mechanism to fill in some required options.

It should be noted that the InMemoryCatalog case, the tables saved in it all exist in the external system, so the table options have to be filled in manually by the user, the Catalog cannot infer it automatically. If the Catalog does not support ManagedTable and the user does not set the connector information, the execution will fail.

...

Using CatalogBaseTable and Catalog objects to construct JobStatusHook. Due to  the JobStatusHook is finally executed on the JM side, and the CatalogBaseTable needs to be createcreated/drop dropped through the Catalog in hook, so Catalog and CatalogBaseTable are member variables of hook, which also need to be serialized and can be passed to JM.

...

The purpose of doing so is:

  1. Use the same method to solve the serialization/deserialization problem of DDL creation Catalog and TableEnvironment#registerCatalog registration CatalogBy this way, the Catalog registered via DDL and Table API both can be serialized.
  2. Reduce the cost of user-defined catalogs without much consideration for serialization (If the CREATE TABLE AS SELECT (CTAS) feature syntax is supported, the catalog must be serializable).

Key Points for Catalog Support Serializability:

Built-in catalogCatalog:

  • InMemoryCatalog: here Here are some special case. Due , due to the tables in InMemoryCatalog already exist in the external system, metadata information in InMemoryCatalog is only used by the job itself and is only stored in memory. The database related information in InMemoryCatalog needs to be serialized and then passed to JM, otherwise the database may not exist when JM creates the table. Other objects do not need to be serialized. The CatalogDatabase interface need extends the serializableSerializable.
  • JdbcCatalog: The main required member variables to construct Catalog object are directly serializable, such as username, password, base url, etc. The JdbcDialectTypeMapper interface need extends the serializable.
  • HiveCatalog: All member variables can be serialized directly, except for the HiveConf object, which cannot be serialized directly. We can refer to JobConfWrapper to solve the serialization problem of HiveConf.

User-defined catalogCatalog:

  • User-defined catalog that require support for CREATE TABLE AS SELECT (CTAS) features must syntax should ensure that the catalog can be serialized.

Create Table As Select(CTAS) feature depends on the serializability of the catalog. To quickly see if the catalog supports CTAS, we need to try to serialize the catalog when compile SQL in planner and if it fails, an exception will be thrown to indicate to the user that the catalog does not support CTAS because it cannot be serialized.

...

Flink's current Hook design cannot meet the needs of CTAS. For example, the JobListener is executed on the Client client side; JobStatusListener is on the JM side, but it cannot be serialized and it function cannot meets CTAS. Thus we tend to propose a new interface which is called JobStatusHook, which that could be attached to the JobGraph and executed in the JobMaster. The interface will also be marked as Internal. 

The process of CTAS in runtime

  1. When the task startsjob starting, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.
  2. When deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.
    • Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.
    • After deserializing the Catalog, call Catalog#open to ensure that the Catalog can be used.
  3. When the job is start and the job status changes, the JobStatusHook method will be called by JM:

For example, our JobStatusHook implementation is called CTASJobStatusHook, and use JdbcCatalog, JdbcCatalog is serialized by Planner has been covered in the previous section and will not be repeated.

...

If the external storage system supports transactions or two-phase commit, then data visibility is related bound to the Checkpoint cycle. Otherwise, data is visible immediately after writing, which is consistent with the current flink behavior.

...

If we added serialize and deserialize APIs, Catalog must implement serialization and deserialization itself. We save the classname class name of the Catalog together with the serialized content, like this:

...

We need to serialize catalog name and the options which are used in create catalog DDL, then JM side can use these options to re-initialize the catalog by flink ServiceLoader mechnismmechanism(UsingFactoryUtil#Using FactoryUtil#createCatalog to get catalog).  To InMemoryCatalog, here are some special case. Due to the tables in InMemoryCatalog already exist in the external system, metadata information in InMemoryCatalog is only used by the job itself and is only stored in memory. The database related information in InMemoryCatalog needs to be serialized and then passed to JM, otherwise the database may not exist when JM creates the table. Other objects do not need to be serialized.

...

Appendix

Program research

I have investigated other bigdata big data engine implementations such as hive, spark:

...