Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Hive SQL and Spark SQL are mainly used in offline(batch mode) scenarios; Flink SQL is suitable for both real-time(streaming mode) and offline(batch mode) scenarios. In a real-time scenario, we believe that the job is always running and does not stop, and the data is written in real time and visible in real time, so we do not think it is necessary to provide atomicity in a real-time scenario.

...

/**
* The CREATE TABLE AS SELECT(CTAS) syntax requires CREATE/DROP TABLE operations via Catalog on the JM side,
* so this interface should extends Serializable interface, then it can be serialized as a part of {@link JobGraph}.
*/

@PublicEvolving
public interface Catalog extends Serializable {

    /**
* This method is used to infer the default options for {@link CatalogBaseTable} through {@link Catalog} options to compile
* the sql successfully by planner when using the {@code Create Table As Select} syntax.
*
* Assuming an user want to select data from a kafka table and then insert the result to mysql table, if the mysql table is not existed in
* in physical mysql storage, user also doesn't want to create the table manually in mysql side because of complex type mapping.
* User can create the {@link JdbcCatalog} firstly which connect to the msyql instance, then use
* {@code CREATE TABLE `mysql`.`user_db`.`order_cnt` AS SELECT * FROM `KafkaTable`} syntax, it is convenient to load data from kafka
* to msyql. Due to the {@link JdbcCatalog} has provides user, password, url and other options, so user doesn't need to fill the
* option in the query. If user doesn't specify the target table required options, planner will call this method to fill the options to
* {@link CatalogBaseTable} which are need to compile sql by planner.

*
* <p>{@link JdbcCatalog} example:
* <pre>{@code
* // If the user does not set any Table's options,
* // then the implementation of JdbcCatalog#inferTableOptions
* // can be like this to avoid the execution failure.
* public CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
* Map<String, String> tableOptions = table.getOptions();
* tableOptions.put("connector", "jdbc");
* tableOptions.put("url", getBaseUrl());
* tableOptions.put("table-name", tablePath.getObjectName());
* tableOptions.put("username", getUsername());
* tableOptions.put("password", getPassword());
* return table.copy(tableOptions);
* }
* }</pre>
*/
default CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
throw new UnsupportedOperationException();
}

}

TableConfigOptions

Add configuration options to allow users to enable atomicity.

111111

Implementation Plan

We provide two semantics for Flink CTAS: Non-atomic and Atomic. Non-atomic implementations are the default behavior of Streaming and Batch modes. 

Non-atomic (default)

The overall execution process is shown in the following figure.

Image Removed

Image Added

The non-atomic implementation is basically the same as the existing Insert data process, except that the sink table is first created on the Client side via Catalog before performing the insert. No need for too much introduction.

Atomic

The atomicity implementation of Flink CTAS requires two parts:

  •  Add the Enable atomicity support option.
  • Catalog can be serialized, ensuring atomicity by performing created/dropped table on the JM side.

 The following describes the process when the user enables the atomicity support option. The overall execution process is shown in the following figure.

Image Added

Due to the client process may exit soon, such as Due to the client process may exit soon, such as detached mode, and the job execution maybe take a longer time, so the table creation(before job start) and drop(when job failed or cancelled) should be executed in JM side instead of client. In addition, the creation and drop of table is executed through Catalog which is consistent with existing behavior. Therefore, a new Hook mechanism should be introduced which is needed by JM to execute the corresponding action,  the hook depend on Catalog to complete the function. In summary, the overall execution process of CTAS job is as following:

  1. Flink Client compiles SQL and generates an execution plan, In this process, the Hook that needs to be executed on the JM side is generated, the Catalog and CatalogBaseTable are the member variable of Hook which should be serializable .
  2. Submit the job to the cluster through the client.
  3. When the job starts, construct the hook object,  deserialize the Catalog and CatalogBaseTable in hook. Then call the Catalog#createTable method through the hook to create the CatalogBaseTable.
  4. Task start to execute.
  5. If the final status of the job is failed or canceled, the created CatalogBaseTable needs to be dropped by the hook to call the Catalog#dropTable method.be dropped by the hook to call the Catalog#dropTable method.

The next describes the details of the implementationIn streaming mode, usually the data is written in real time and visible in real time, so streaming mode does not provide atomicity guarantee, then there is no need to use JobStatusHook mechanism. However, batch mode requires to use the JobStatusHook mechanism to ensure atomicity.  We will be compatible in Planner between stream mode without JobStatusHook and batch mode with JobStatusHook to achieve the ultimate atomicity of batch mode.

Planner

Providing method for planner to register JobStatusHook with StreamGraph.

...

Create Table As Select(CTAS) feature depends on the serializability of the catalog. To quickly see if the catalog supports CTAS, we need to try to serialize the catalog when compile SQL in planner and if it fails, an exception will be thrown to indicate to user that the catalog does not support CTAS because it cannot be serialized.

Note: The metadata in InMemoryCatalog is only stored in Flink's memory and does not modify metadata in external systems, so InMemoryCatalog does not need to support atomicity. That is, InMemoryCatalog does not need to be adapted to support serialization.

Runtime

Provide JM side, job status change hook mechanism.

...

Flink's current Hook design cannot meet the needs of CTAS. For example, the JobListener is executed on the client side; JobStatusListener is on the JM side, but it cannot be serialized and it function cannot meets CTAS. Thus we tend to propose a new interface which is called JobStatusHook, that could be attached to the JobGraph and executed in the JobMaster. The interface will also be marked as Internal. 

The process of CTAS in runtime

  1. When the job starting, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.
  2. When deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.
    • Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.
    • After deserializing the Catalog, call Catalog#open to ensure that the Catalog can be used.
  3. When the job is start and the job status changes, the JobStatusHook method will be called by JM:

...

 Non-atomic implementation is consistent with DataSource v1 logic. For details, see CreateTableAsSelectExec .

Atomic

Atomic implementation( for details, see AtomicCreateTableAsSelectExec), supported by StagingTableCatalog and StagedTable .

...