Motivation

The current CatalogTable and SeaTunnelRowType are disjointed, and SeaTunnelRowType cannot contain certain specific information. There is a need to smoothly transition data type objects operated at the overall connector level from SeaTunnelRowType to CatalogTable.

This solution primarily addresses the following issues:

  1. Shifting the creation of CatalogTable to the internal SeaTunnelSource, where the Source determines the CatalogTable to be handled based on config information and returns it to the framework layer.
  2. Supporting the capability for the same Source to handle different Catalogs, allowing access to CatalogTable information through Catalog or enabling the Source to generate CatalogTable using its own config.
  3. Establishing a binding between Catalog and Source, with the Source creating Catalog through a provided unified utility class.
  4. Removing sub-configurations related to Catalog.

Process Compare

The main change in the solution is to relocate the logic for creating Catalog and generating CatalogTable into SeaTunnelSource. This relocation is determined by the Source's corresponding Config and its type, dictating how Catalog and CatalogTable should be generated.

Before the STIP:

After the STIP:

Config Change

We have removed the dedicated Catalog sub-configuration module from Source/Sink.

Before the STIP:

MySQL-CDC {
  result_table_name = "customers_mysql_cdc"
  server-id = 5652
  username = "st_user"
  password = "seatunnel"
  table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
  base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
  // catalog子配置
  catalog {
    factory = MySQL
  }
}

After the STIP:

MySQL-CDC {
  result_table_name = "customers_mysql_cdc"
  server-id = 5652
  username = "st_user"
  password = "seatunnel"
  table-names = ["mysql_cdc.mysql_cdc_e2e_source_table"]
  base-url = "jdbc:mysql://mysql_cdc_e2e:3306/mysql_cdc"
}


How to implement this?

Considering that currently some connectors create catalogs through dedicated catalog sub-configurations, but there is no support for selecting a catalog (i.e., the catalog factory is bound to the data source and follows a one-to-many relationship, where all CatalogFactory selections can be obtained through the current Source/Sink configuration).

  1. Remove the instantiation of CatalogFactory from the MultipleTableJobConfigParser, thereby eliminating the dependency on a unified catalog sub-configuration.
  2. Extract the logic for instantiating the corresponding Catalog and provide it for direct use by Source/Sink. This way, Source/Sink can obtain the corresponding Catalog without the need to modify existing configurations.

Public Interfaces

SeaTunnelSource

Add a `getProducedCatalogTable` method with a default implementation to SeaTunnelSource, and deprecate `getProducedType`.

public interface SeaTunnelSource {
 
    /**
     * Get the data type of the records produced by this source.
     *
     * @return Produced data type.
     */
    // TODO We need to remove all calls to this method at the framework level, including Spark, Flink, and Zeta.
    @Deprecated
    SeaTunnelDataType<T> getProducedType();
 
    /**
     * Get the catalog tables output by this source
     *
     * @return
     */
    default List<CatalogTable> getProducedCatalogTables() {
        // TODO: For all the Sources that haven't been adapted to the new `getProducedCatalogTable` method, make them compatible by generating CatalogTable using `ResultTableName` (some configurations may not specify this parameter, and a rule-based value will be dynamically generated, e.g., JDBC -> "jdbc_result_table1").
        // TODO: Currently, an adaptation will be made for Spark/Flink to ensure that there is only one CatalogTable at runtime; otherwise, an error will occur. Similarly, on all engines, if a Transform is declared, only one CatalogTable will be allowed.
        return List.of(new CatalogTable(getProducedType()));
    }
}

TableFactoryContext

The current `TableFactoryContext` is provided to both Source and Sink for passing CatalogTable information. It needs to be split into `SourceTableFactoryContext` and `SinkTableFactoryContext`, while also removing the `catalogTables` information from `SourceTableFactoryContext`. Note: The classloader information currently is not being used in the design and may be considered for removal.

public class TableFactoryContext {
 
    private final List<CatalogTable> catalogTables;
    @Getter private final ReadonlyConfig options;
    private final ClassLoader classLoader;
 
    public TableFactoryContext(
            List<CatalogTable> catalogTables, ReadonlyConfig options, ClassLoader classLoader) {
        this.catalogTables = catalogTables;
        this.options = options;
        this.classLoader = classLoader;
    }
}

Pseudo code after modification

1. Source/Transform/Sink Config = MultipleTableJobConfigParser.readConfig()
2. SeaTunnelSource = MultipleTableJobConfigParser.parseSource(SourceConfig) {
    TableSourceFactory = SPI.create()
    return SeaTunnelSource = TableSourceFactory.create(context) {
        // version 1:
        // TODO getCatalogTable Needs to be marked as deprecated
        CatalogTables = CatalogUtil.getCatalogTable(context.SourceConfig)
        return new SeaTunnelSource(CatalogTables)
        // version 2:
        return new SeaTunnelSource(context) {
            Catalog = CatalogUtil.getCatalog()
            CatalogTables = Catalog.getCatalogTables()
        }
    }
}
3. SourceCatalogTable = SeaTunnelSource.getProducedCatalogTables()
4. MultipleTableJobConfigParser.parseTransform(SourceCatalogTable)
 
...

Compatibility

The Catalog section needs adjustments, including partial modifications to connectors supporting Catalog-related functionality in the Source. Compatibility at the user level will be maintained.

  • No labels