Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Flink: Flink dialect does not support CTAS. ==> LEVEL-1
  • Flink: Hive dialect already supports CTAS but does not guarantee atomic(can not roll back). ==> LEVEL-2
  • Spark DataSource v1: is atomic (can roll back), but is not isolated. ==> LEVEL-3
  • Spark DataSource v2: Guaranteed atomicity and isolation. ==> LEVEL-4
  • Hive MR: Guaranteed atomicity and isolation. ==> LEVEL-54

Hive SQL and Spark SQL are mainly used in offline(batch mode) scenarios; Flink SQL is suitable for both real-time(streaming mode) and offline(batch mode) scenarios. In a real-time scenario, we believe that the job is always running and does not stop, and the data is written in real time and visible in real time, so we do not think it is necessary no need to provide atomicity.

To ensure that Flink SQL is semantically consistent in Streaming mode and Batch mode, combining the current situation of Flink and the needs of our business, choosing LEVEL-2 atomicity as the default behavior for Flink streaming and batch mode. If the user requires LEVEL-3 atomicity,  this ability can be achieved by enabling an atomicity , allowing users to enable option. In general, batch mode usually requires LEVEL-3 atomicity with an option. atomicity. In a nutshell, Flink provide two level atomicity guarantee, LEVEL-2 as the default behavior.

Syntax

We proposing the CREATE TABLE AS SELECT(CTAS) clause as following:

...

Regarding Catalog interface, to support Create Table As Select syntax(atomicity guarantee, if don't need atomicity, the second option is no need), two changes are needed here:

  1. Providing a new method inferTableOptions that is used to infer the options of CatalogBaseTable, these options will be used to compile the sql to JobGraph successfully. This method throw UnsupportedOperationException default.
  2. This interface should extands the java Serializable interface, then it can be serialized as a part of JobGraph and pass to JM side, so this require the essential options to construct a catalog object can be serialized.
  3. Providing a new method inferTableOptions that is used to infer the options of CatalogBaseTable, these options will be used to compile the sql to JobGraph successfully. This method throw UnsupportedOperationException default.
/**
* The CREATE TABLE AS SELECT(CTAS) syntax requires CREATE/DROP TABLE operations via Catalog on the JM side,
* so this interface should extends Serializable interface, then it can be serialized as a part of {@link JobGraph}.
*/

@PublicEvolving
public interface Catalog extends Serializable {

    /**
* This method is used to infer the default options for {@link CatalogBaseTable} through {@link Catalog} options to compile
* the sql successfully by planner when using the {@code Create Table As Select} syntax.
*
* Assuming an user want to select data from a kafka table and then insert the result to mysql table, if the mysql table is not existed in
* in physical mysql storage, user also doesn't want to create the table manually in mysql side because of complex type mapping.
* User can create the {@link JdbcCatalog} firstly which connect to the msyql instance, then use
* {@code CREATE TABLE `mysql`.`user_db`.`order_cnt` AS SELECT * FROM `KafkaTable`} syntax, it is convenient to load data from kafka
* to msyql. Due to the {@link JdbcCatalog} has provides user, password, url and other options, so user doesn't need to fill the
* option in the query. If user doesn't specify the target table required options, planner will call this method to fill the options to
* {@link CatalogBaseTable} which are need to compile sql by planner.

*
* <p>{@link JdbcCatalog} example:
* <pre>{@code
* // If the user does not set any Table's options,
* // then the implementation of JdbcCatalog#inferTableOptions
* // can be like this to avoid the execution failure.
* public CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
* Map<String, String> tableOptions = table.getOptions();
* tableOptions.put("connector", "jdbc");
* tableOptions.put("url", getBaseUrl());
* tableOptions.put("table-name", tablePath.getObjectName());
* tableOptions.put("username", getUsername());
* tableOptions.put("password", getPassword());
* return table.copy(tableOptions);
* }
* }</pre>
*/
default CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
throw new UnsupportedOperationException();
}

}

...

The atomicity implementation of Flink CTAS requires two parts:

  • Add Enabling the Enable atomicity support option.
  • Catalog can be serialized, ensuring atomicity by performing created/dropped table on the JM side.

...