Issues Address

https://github.com/apache/seatunnel/issues/5395

Motivation

There are two problems in the current multi-table writing solution:

1. The shuffle capability will be used, and the current shuffle implementation has a great impact on performance.

2. Too many links problem.

The purpose of this project is to solve these two main problems

Implementation plan

Implement a MultiTableSink, this Sink will implement the interface corresponding to the current SeaTunnelSink, and will manage multiple internal SeaTunnelSinks at the same time.

In the same way, implement MultiTableSinkWriter, MultiTableSinkCommitter, MultiTableSinkAggCommitter

When data is written into the MultiTableSink, it will find the corresponding SeaTunnelSink that needs to be written according to the tableId on the Row. This relationship is maintained in the MultiTableSink

Create multiple threads, multiple threads poll row data, and then slice the row data into different SeaTunnelSinks

Support for custom slicers

Create N SeaTunnelSinks per thread

The number of threads in the thread pool is the same as the number of connections in the connection pool


When the engine needs to call various interfaces implemented by Sink, MultiTableSink will implement forwarding and summarization

When there are resources that can be shared, such as Connection, or other resources that can be shared by multiple SeaTunnelSinks, we can manage them through SharedResourceManager.

Public Interfaces


SupportMultiTableSink

It is used to identify the SeaTunnelSink interface that supports multiple tables. After implementing this interface, we will package the corresponding SeaTunnelSink in the logic plan generation stage. For each degree of parallelism, MultiTableSink wraps multiple SeaTunnelSinks to complete the replacement of Sinks

/**
* The Sink Connectors which support multi table should implement this interface
*/
public interface SupportMultiTableSink<T extends SharedResource> extends SupportResourceShare<T> {
}

MultiTableResourceManager
It is specially designed for a single MultiTableSink/MultiTableSinkWriter/MultiTableSinkCommitter/MultiTableSinkAggCommitter to handle multiple SeaTunnelSink or SinkWriter resource sharing interfaces. Users can implement a custom resource sharer by implementing this interface

/**
 * The multi table resource manager
 */
public interface MultiTableResourceManager<T extends SharedResource> {
 
    default Optional<T> getSharedResource() {
        return Optional.empty();
    }
}

Shared Resource

Resource identification interface, used to identify resources that can be shared

public interface SharedResource {
}


SupportMultiTableSinkWriter

Used to identify the SinkWriter interface that supports multiple tables, currently only for identification

/**
 * The Sink Connector Writer which support multi table should implement this interface
 */
public interface SupportMultiTableSinkWriter<T extends SharedResource> extends SupportResourceShare<T> {
 
}

SupportResourceShare

For the initialization of MultiTableResourceManager, MultiTableSink will call the initMultiTableResourceManager method to realize one-time resource initialization, and then distribute the corresponding resources to different SeaTunnelSinks

public interface SupportResourceShare<T extends SharedResource> {
 
    default Optional<MultiTableResourceManager<T>> initMultiTableResourceManager() {
        return Optional.empty();
    }
 
    default void setMultiTableResourceManager(MultiTableResourceManager<T> multiTableResourceManager) {}
}

Proposed Changes

In MultiTableJobConfigParser, the configuration `seatunnel.optimizer.multitablesink` will be added to enable or disable the current MultiTableSink generation function
Add SeaTunnelOptimizer for logical plan optimization, currently supports conversion of multiple SeaTunnelSinks into a MultiTableSink

Compatibility

New features, compatible with historical features

  • No labels