Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...


Vote thread
JIRA

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-26942

...

Release1.

...

17


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

To ensure that Flink SQL is semantic consistent in streaming mode and batch mode, combining the current situation of Flink and the needs of our business, choosing LEVEL-1 as the default behavior for Flink streaming and batch mode. If the user requires LEVEL-2 atomicity,  this ability can be achieved by enabling the table.ctas-rtas.atomicity-enabled option. In general, batch mode usually requires LEVEL-2 atomicity. In a nutshell, Flink provides two level atomicity guarantee, LEVEL-1 as the default behavior.

...

We save the properties set through the option API and set them in the CatalogBaseTable when executing the create API, so as to generate the DynamicTableSink.

Catalog

Due to Create Table As Select syntax doesn't guarantee atomicity default, to support atomic semantic, Catalog should support serialization so as it can  be serialized as a part of JobGraph and pass to JM side. Hence, here proposing a new interface AtomicCatalog which extands java Serializable interface. If you need atomic Create Table As Select, your catalog should implement this interface. By the way, this interface doesn't provide any specific method currently, but in the future we may introduce some new methods to support isolation of CTAS and RTAS through this API.

...

/**
* The CREATE TABLE AS SELECT(CTAS) syntax requires CREATE/DROP TABLE operations via Catalog on the JM side,
* so the specific Catalog implement should extends this interface, then the Catalog can be serialized as a part of {@link JobGraph}.
*/

@PublicEvolving
public interface AtomicCatalog extends Serializable {}

TableConfigOptions

Add table.ctas.atomicity-enabled option to allow users to enable atomicity when using create table as select syntax.

@PublicEvolving
public class TableConfigOptions {
   @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_CTAS_ATOMICITY_ENABLED =
key("table.ctas.atomicity-enabled")
.booleanType()
.defaultValue(false)

TableConfigOptions

Add table.ctas-rtas.atomicity-enabled option to allow users to enable atomicity when using create table as select syntax.

@PublicEvolving
public class TableConfigOptions {
   @Documentation.TableOption(execMode = Documentation.ExecMode.BATCH_STREAMING)
public static final ConfigOption<Boolean> TABLE_CTAS_OR_RTAS_ATOMICITY_ENABLED =
key.withDescription(("table.ctas-rtas.atomicity-enabled")
.booleanType()
"Specifies if the create table as select operation is executed .defaultValue(false)atomically. "
.withDescription(
+ "By default, the operation is non-atomic. The target table is created in Client side, "Specifiesand ifit thewill createnot tablebe asdropped selecteven orthough replacethe tablejob asfails operationor is executed atomicallycancelled. "
+ "ByIf default,set thethis operationoption is non-atomic. Theto true, the target table is created in ClientJM side, andit italso will not be dropped evenwhen though the job fails or is cancelled. ");
}

Catalog

We will update Catalog's javadocs to add the following description:

If Catalog needs to support the atomicity feature of CTAS, then Catalog must implement Serializable and make the Catalog instances can 

...

be serializable/deserializable using Java serialization.
When atomicity support for CTAS is enabled, Planner will check if the Catalog instance can be serialized using the Java serialization.

...

Implementation Plan

We provide two semantics for Flink CTAS: Non-atomic and Atomic. Non-atomic implementations are the default behavior of Streaming and Batch modes. 

...

  • Enabling the atomicity option..
  • Catalog can be serialized(Catalog providers need to implement the Serializable interface of java and Catalog can be serialized/deserialized), ensuring atomicity by performing created/dropped table on the JM side.

...

For CatalogBaseTable, we use CatalogPropertiesUtil to serialize/deserialize it , it's the tools that Flink already provides.

For Catalog, we Catalog providers need Catalog to extends Serializable implement the Serializable interface of java, so that it can be serialized/deserialized directly. The planner will attempt to pre-serialize the Catalog, and if the serialization fails, an exception will be thrown indicating to the user that the Catalog cannot be serialized does not support atomicity semantics.

The purpose of doing so is:

...

  • InMemoryCatalog: Due to the CatalogDatabase and CatalogBaseTable etc can't be serialized by java serialization mechanism directly, so the InMemoryCatalog doesn't support to serialize which means it can not support atomic semantic.
  • JdbcCatalog: The required member variables to construct Catalog object are directly serializable, such as username, password, base url, etc. The JdbcDialectTypeMapper interface need extends the serializable, so this Catalog can implement the AtomicCatalog Serializable interface.
  • HiveCatalog: All member variables can be serialized directly, except for the HiveConf object, which cannot be serialized directly. We can refer to JobConfWrapper to solve the serialization problem of HiveConf, so this catalog also can implement the AtomicCatalog Serializable interface.

User-defined Catalog:

  • User-defined catalog that require support for CREATE TABLE AS SELECT (CTAS) syntax should  implement AtomicCatalog Serializable interface to support atomic semantic.

...

  1. When the job starting, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.
  2. When deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.
    • Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.
    • After deserializing the Catalog, call Catalog#open in the JobStatusHook#onCreated method to ensure that the Catalog can be usedis working.
  3. When the job is start and the job status changes, the JobStatusHook method will be called by JM:

...