Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

/**
* The CREATE TABLE AS SELECT(CTAS) syntax requires CREATE/DROP TABLE operations via Catalog on the JM side,
* so this interface should extends Serializable interface, then it can be serialized as a part of {@link JobGraph}.
*/

@PublicEvolving
public interface Catalog extends Serializable {

    /**
* This method is used to infer the default options for {@link CatalogBaseTable} through {@link Catalog} options to compile
* the sql successfully by planner when using the {@code Create Table As Select} syntax.
*
* Assuming an user want to select data from a kafka table and then insert the result to mysql table, if the mysql table is not existed in
* in physical mysql storage, user also doesn't want to create the table manually in mysql side because of complex type mapping.
* User can create the {@link JdbcCatalog} firstly which connect to the msyql instance, then use
* {@code CREATE TABLE `mysql`.`user_db`.`order_cnt` AS SELECT * FROM `KafkaTable`} syntax, it is convenient to load data from kafka
* to msyql. Due to the {@link JdbcCatalog} has provides user, password, url and other options, so user doesn't need to fill the
* option in the query. If user doesn't specify the target table required options, planner will call this method to fill the options to
* {@link CatalogBaseTable} which are need to compile sql by planner.

*
* <p>{@link JdbcCatalog} example:
* <pre>{@code
* // If the user does not set any Table's options,
* // then the implementation of JdbcCatalog#inferTableOptions
* // can be like this to avoid the execution failure.
* public CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
* Map<String, String> tableOptions = table.getOptions();
* tableOptions.put("connector", "jdbc");
* tableOptions.put("url", getBaseUrl());
* tableOptions.put("table-name", tablePath.getObjectName());
* tableOptions.put("username", getUsername());
* tableOptions.put("password", getPassword());
* return table.copy(tableOptions);
* }
* }</pre>
*/
default CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
throw new UnsupportedOperationException();
}

}

CatalogDatabase

In the InMemoryCatalog scenario, in order to avoid the situation that the database does not exist when the table creation is executed on the JM side, we also need to serialize the CatalogDatabase when we serialize InMemoryCatalog, so the CatalogDatabase needs to extend Serializable. Currently only InMemoryCatalog serialization requires serializing CatalogDatabase.

...

Implementation Plan

The overall execution process is shown in the following figure.

...

Key Points for Catalog Support Serializability:

Built-in Catalog::

  • InMemoryCatalog: Due to the CatalogDatabase and CatalogBaseTable etc can't be serialized by java serialization mechanism directly, so the InMemoryCatalog doesn't support to serializeInMemoryCatalog: Here are some special case, due to the tables in InMemoryCatalog already exist in the external system, metadata information in InMemoryCatalog is only used by the job itself and is only stored in memory. The database related information in InMemoryCatalog needs to be serialized and then passed to JM, otherwise the database may not exist when JM creates the table. Other objects do not need to be serialized. The CatalogDatabase interface need extends the Serializable.
  • JdbcCatalog: The required member variables to construct Catalog object are directly serializable, such as username, password, base url, etc. The JdbcDialectTypeMapper interface need extends the serializable.
  • HiveCatalog: All member variables can be serialized directly, except for the HiveConf object, which cannot be serialized directly. We can refer to JobConfWrapper to solve the serialization problem of HiveConf.

...

  1. HiveCatalog:  
    1. If hive-conf-dir is specified, since the configuration of hive-conf-dir is a local path, please make sure that all nodes in the cluster put the hive configuration file under the same path, otherwise JM will not find the file and FAILED. This problem also exists in the current application mode of Flink.
    2. If hive-conf-dir is not specified, then HiveCatalog will look for hive-site.xml from Java Classpath, then we have to solve the hive-site.xml upload problem and make sure that all modes in Flink Client and JM Classpath can find Otherwise the job will fail.
    InMemoryCatalog:
    1. We need to additionally serialize the database information that already exists in the InMemoryCatalog, otherwise the operation to create a table on the JM side may fail because the corresponding database cannot be found.

References

  1. Support SELECT clause in CREATE TABLE(CTAS)
  2. MySQL CTAS syntax
  3. Microsoft Azure Synapse CTAS
  4. LanguageManual DDL#Create/Drop/ReloadFunction
  5. Spark Create Table Syntax

...