Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Providing method that are used to infer the options of CatalogBaseTable, these options will be used to compile the sql to JobGraph successfully.

/**
*
Please add annotation about catalog serializeThe CREATE TABLE AS SELECT(CTAS) feature requires CREATE/DROP TABLE operations via Catalog on the JM side,
* so Catalog extends Serializable so that Catalog can be passed to JM from the client.
*/

@PublicEvolving
public interface Catalog extends Serializable {

    /**
* This method is used to infer the default options for {@link CatalogBaseTable} through {@link Catalog} options to compile
* the sql successfully by planner when using the {@code Create Table As Select} syntax.
*
* Assuming an user want to select data from a kafka table and then insert the result to mysql table, if the mysql table is not existed in
* in physical mysql storage, user also doesn't want to create the table manually in mysql side because of complex type mapping.
* User can create the {@link JdbcCatalog} firstly which connect to the msyql instance, then use
* {@code CREATE TABLE `mysql`.`user_db`.`order_cnt` AS SELECT * FROM `KafkaTable`} syntax, it is convenient to load data from kafka
* to msyql. Due to the {@link JdbcCatalog} has provides user, password, url and other options, so user doesn't need to fill the
* option in the query. If user doesn't specify the target table required options, planner will call this method to fill the options to
* {@link CatalogBaseTable} which are need to compile sql by planner.

*
* <p>{@link JdbcCatalog} example:
* <pre>{@code
* // If the user does not set any Table's options,
* // then the implementation of JdbcCatalog#inferTableOptions
* // can be like this to avoid the execution failure.
* public CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
* Map<String, String> tableOptions = table.getOptions();
* tableOptions.put("connector", "jdbc");
* tableOptions.put("url", getBaseUrl());
* tableOptions.put("table-name", tablePath.getObjectName());
* tableOptions.put("username", getUsername());
* tableOptions.put("password", getPassword());
* return table.copy(tableOptions);
* }
* }</pre>
*/
default CatalogBaseTable inferTableOptions(ObjectPath tablePath, CatalogTable table) {
throw new UnsupportedOperationException();
}

}

Catalog#inferTableOptions is convenient for users to customize the Catalog, and when it supports the CTAS function, the options of the table can be automatically inferred to avoid job failure due to lack of information.

Implementation Plan

CatalogDatabase

CatalogDatabase extensions as serializable

/** Interface of a database in a catalog, and can be serialized along with Catalog. */
@PublicEvolving
public interface CatalogDatabase extends Serializable {

... ...

}

Implementation Plan

The overall The overall execution process is shown in the following figure.

...

Key Points for Catalog Support Serializability:

Built-in catalog:

  • InMemoryCatalog: here are some special case. Due to the tables in InMemoryCatalog already exist in the external system, metadata information in InMemoryCatalog is only used by the job itself and is only stored in memory. The database related information in InMemoryCatalog needs to be serialized and then passed to JM, otherwise the database may not exist when JM creates the table. Other objects do not need to be serialized. The CatalogDatabase interface need extends the serializable.
  • JdbcCatalog: The main member variables are directly serializable, such as username, password, base url, etc. The JdbcDialectTypeMapper interface need extends the serializable.
  • HiveCatalog: All member variables can be serialized directly, except for the HiveConf object, which cannot be serialized directly. We can refer to JobConfWrapper to solve the serialization problem of HiveConf.

User-defined catalog:

  • User-defined catalog that require support for CREATE TABLE AS SELECT (CTAS) features must ensure that the catalog can be serialized.

Create Create Table As Select(CTAS) features depend on the serializability of the catalog. To quickly see if the catalog supports CTAS, we need to try to serialize the catalog in planner and if it fails, an exception will be thrown to indicate to the user that the catalog does not support CTAS because it cannot be serialized.

...

  1. When the task starts, the JobGraph will be deserialized, and then the JobStatusHook can also be deserialized.
  2. When deserializing JobStatusHook, Catalog and CatalogBaseTable will also be deserialized.
    • Deserialize CatalogBaseTable using CatalogPropertiesUtil#deserializeCatalogTable method.
    • When After deserializing a the Catalog, first read the catalog name and properties, then use the FactoryUtil#createCatalog to get catalog instancecall Catalog#open to ensure that the Catalog can be used.
  3. When the job is start and the job status changes, the JobStatusHook method will be called:

For example, our JobStatusHook implementation is called CTASJobStatusHook, and use JdbcCatalog, JdbcCatalog serialized by Planner has been covered in the previous section and will not be repeated.

We can deserialize the Catalog Name and properties, and then use the FactoryUtil#createCatalog method to get the JdbcCatalog instance. Then when the job status changes, the CTASJobStatusHook method can Then when the job status changes, the CTASJobStatusHook method can be called:

  • When the job status is CREATED, the runtime module will call the CTASJobStatusHook#onCreated method, which will call the JdbcCatalog#createTable method to create a table.
  • When the final status of the job is FAILED, the runtime module will call the CTASJobStatusHook#onFailed method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is CANCELED, the runtime module will call the CTASJobStatusHook#onCanceled method, which will call the JdbcCatalog#dropTable method to drop table.
  • When the final status of the job is FINISH, the runtime module will call the CTASJobStatusHook#onFinished method, and we do not need to do any additional operations.

...

The advantages of this solution are simple design, ease of compatibility and reduced complexity of implementation for the user, and does not require complex serialization and deserialization tools. The

The disadvantage of this solution is that it does not cover the usage scenario of TableEnvironment#registerCatalog.

Regarding the disadvantage, we can introduce CatalogDescriptor (like TableDescriptor) for Table API used to register catalog in the future, and Flink can get the properties of Catalog through CatalogDescriptor. The interface pseudo-code in TableEnvironment as following:

...

And it can't serialize database related data in InMemoryCatalog, so when using InMemoryCatalog#createTable on JM side, there may be an exception that database doesn't exist.


Note:  This solution only works if we create the Catalog using DDL, because we can only get the Catalog properties with the with keyword. If we use a Catalog registered by TableEnvironment#registerCatalog method, we cannot get these properties. Therefore, CTAS does not currently support jobs that use TableEnvironment#registerCatalog.

...