Backgroud

At present, the Flink translation layer does not implement an Enumerator singleton as expected by the Source API design, and shards are distributed to Readers through the pull push model. Instead, each Reader will instantiate an Enumerator, resulting in the entire Source API implementation not meeting the expectations of batch flow integration.

Motivation

Correctly implement the Source API pull push model, enable normal rpc communication between Reader and Enumerator, and ensure that CDC Connector runs correctly on Flink.

Architecture design

Before introducing the systematic architecture design, let's first review the runtime process of the Source API:

1. Source create enumerator
2. Source create reader
3. Enumerator discovery and generate source splits
4. Reader pull source split from enumerator and enumerator push split to reader
5. Reader read data from data source

Based on the above process, enumerator has implemented unified sharding distribution, reader has implemented parallel reading function, and source benefits from this clear running mechanism to achieve batch flow integration.

Since our source design is completely based on Flink's source design, the entire API has a natural affinity for integration. Therefore, we only need to implement a set of Flink source connector warpers in the translation layer to warp our own source API.

FlinkSource

FlinkSource.java
/**
 * The source implementation of {@link Source}, used for proxy all {@link SeaTunnelSource} in flink.
 *
 * @param <SplitT> The generic type of source split
 * @param <EnumStateT> The generic type of enumerator state
 */
public class FlinkSource<SplitT extends SourceSplit, EnumStateT extends Serializable>
        implements Source<Row, SplitWrapper<SplitT>, EnumStateT>, ResultTypeQueryable<Row> {

    private final SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source;

    public FlinkSource(SeaTunnelSource<SeaTunnelRow, SplitT, EnumStateT> source) {
        this.source = source;
    }

    @Override
    public Boundedness getBoundedness() {
        org.apache.seatunnel.api.source.Boundedness boundedness = source.getBoundedness();
        return boundedness == org.apache.seatunnel.api.source.Boundedness.BOUNDED
                ? Boundedness.BOUNDED
                : Boundedness.CONTINUOUS_UNBOUNDED;
    }

    @Override
    public SourceReader<Row, SplitWrapper<SplitT>> createReader(SourceReaderContext readerContext)
            throws Exception {
        org.apache.seatunnel.api.source.SourceReader.Context context =
                new FlinkSourceReaderContext(readerContext, source);
        org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> reader =
                source.createReader(context);
        return new FlinkSourceReader<>(reader, (SeaTunnelRowType) source.getProducedType());
    }

    @Override
    public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> createEnumerator(
            SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) throws Exception {
        SourceSplitEnumerator.Context<SplitT> context =
                new FlinkSourceSplitEnumeratorContext<>(enumContext);
        SourceSplitEnumerator<SplitT, EnumStateT> enumerator = source.createEnumerator(context);
        return new FlinkSourceEnumerator<>(enumerator, enumContext);
    }

    @Override
    public SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> restoreEnumerator(
            SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext, EnumStateT checkpoint)
            throws Exception {
        FlinkSourceSplitEnumeratorContext<SplitT> context =
                new FlinkSourceSplitEnumeratorContext<>(enumContext);
        SourceSplitEnumerator<SplitT, EnumStateT> enumerator =
                source.restoreEnumerator(context, checkpoint);
        return new FlinkSourceEnumerator<>(enumerator, enumContext);
    }

    @Override
    public SimpleVersionedSerializer<SplitWrapper<SplitT>> getSplitSerializer() {
        return new SplitWrapperSerializer<>(source.getSplitSerializer());
    }

    @Override
    public SimpleVersionedSerializer<EnumStateT> getEnumeratorCheckpointSerializer() {
        Serializer<EnumStateT> enumeratorStateSerializer = source.getEnumeratorStateSerializer();
        return new FlinkSimpleVersionedSerializer<>(enumeratorStateSerializer);
    }

    @Override
    public TypeInformation<Row> getProducedType() {
        return (TypeInformation<Row>) TypeConverterUtils.convert(source.getProducedType());
    }
}

FlinkSourceEnumerator

FlinkSourceEnumerator.java
/**
 * The implementation of {@link SplitEnumerator}, used for proxy all {@link SourceSplitEnumerator}
 * in flink.
 *
 * @param <SplitT> The generic type of source split
 * @param <EnumStateT> The generic type of enumerator state
 */
public class FlinkSourceEnumerator<SplitT extends SourceSplit, EnumStateT>
        implements SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> {

    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceEnumerator.class);

    private final SourceSplitEnumerator<SplitT, EnumStateT> sourceSplitEnumerator;

    private final SplitEnumeratorContext<SplitWrapper<SplitT>> enumeratorContext;

    private final int parallelism;

    private final Object lock = new Object();

    private volatile boolean isRun = false;

    private volatile int currentRegisterReaders = 0;

    public FlinkSourceEnumerator(
            SourceSplitEnumerator<SplitT, EnumStateT> enumerator,
            SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
        this.sourceSplitEnumerator = enumerator;
        this.enumeratorContext = enumContext;
        this.parallelism = enumeratorContext.currentParallelism();
    }

    @Override
    public void start() {
        sourceSplitEnumerator.open();
    }

    @Override
    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        sourceSplitEnumerator.handleSplitRequest(subtaskId);
    }

    @Override
    public void addSplitsBack(List<SplitWrapper<SplitT>> splits, int subtaskId) {
        sourceSplitEnumerator.addSplitsBack(
                splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()),
                subtaskId);
    }

    @Override
    public void addReader(int subtaskId) {
        sourceSplitEnumerator.registerReader(subtaskId);
        synchronized (lock) {
            currentRegisterReaders++;
            if (!isRun && currentRegisterReaders == parallelism) {
                try {
                    sourceSplitEnumerator.run();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
                isRun = true;
            }
        }
    }

    @Override
    public EnumStateT snapshotState(long checkpointId) throws Exception {
        return sourceSplitEnumerator.snapshotState(checkpointId);
    }

    @Override
    public void close() throws IOException {
        sourceSplitEnumerator.close();
    }

    @Override
    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof NoMoreElementEvent) {
            LOGGER.info(
                    "Received NoMoreElementEvent from reader [{}], current registered readers [{}]",
                    subtaskId,
                    enumeratorContext.currentParallelism());
            enumeratorContext.sendEventToSourceReader(subtaskId, sourceEvent);
        }
        if (sourceEvent instanceof SourceEventWrapper) {
            sourceSplitEnumerator.handleSourceEvent(
                    subtaskId, (((SourceEventWrapper) sourceEvent).getSourceEvent()));
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        sourceSplitEnumerator.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        sourceSplitEnumerator.notifyCheckpointAborted(checkpointId);
    }
}

FlinkSourceReader

FlinkSourceReader.java
/**
 * The implementation of {@link SourceReader}, used for proxy all {@link
 * org.apache.seatunnel.api.source.SourceReader} in flink.
 *
 * @param <SplitT>
 */
public class FlinkSourceReader<SplitT extends SourceSplit>
        implements SourceReader<Row, SplitWrapper<SplitT>> {

    private final org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> sourceReader;

    private final FlinkRowCollector flinkRowCollector;

    private InputStatus inputStatus = InputStatus.MORE_AVAILABLE;

    public FlinkSourceReader(
            org.apache.seatunnel.api.source.SourceReader<SeaTunnelRow, SplitT> sourceReader,
            SeaTunnelRowType seaTunnelRowType) {
        this.sourceReader = sourceReader;
        this.flinkRowCollector = new FlinkRowCollector(seaTunnelRowType);
    }

    @Override
    public void start() {
        try {
            sourceReader.open();
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public InputStatus pollNext(ReaderOutput<Row> output) throws Exception {
        sourceReader.pollNext(flinkRowCollector.withReaderOutput(output));
        return inputStatus;
    }

    @Override
    public List<SplitWrapper<SplitT>> snapshotState(long checkpointId) {
        try {
            List<SplitT> splitTS = sourceReader.snapshotState(checkpointId);
            return splitTS.stream().map(SplitWrapper::new).collect(Collectors.toList());
        } catch (Exception e) {
            throw new RuntimeException(e);
        }
    }

    @Override
    public CompletableFuture<Void> isAvailable() {
        return CompletableFuture.completedFuture(null);
    }

    @Override
    public void addSplits(List<SplitWrapper<SplitT>> splits) {
        sourceReader.addSplits(
                splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()));
    }

    @Override
    public void notifyNoMoreSplits() {
        sourceReader.handleNoMoreSplits();
    }

    @Override
    public void handleSourceEvents(SourceEvent sourceEvent) {
        if (sourceEvent instanceof NoMoreElementEvent) {
            inputStatus = InputStatus.END_OF_INPUT;
        }
        if (sourceEvent instanceof SourceEventWrapper) {
            sourceReader.handleSourceEvent((((SourceEventWrapper) sourceEvent).getSourceEvent()));
        }
    }

    @Override
    public void close() throws Exception {
        sourceReader.close();
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        sourceReader.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        sourceReader.notifyCheckpointAborted(checkpointId);
    }
}

For more code details, you can reference: https://github.com/apache/seatunnel/pull/5100

Technical difficulties

Due to the close integration of Zeta engine side and SeaTunnel Source API, after the Reader sends a singleNoMoreElement signal, the engine side can perceive that the current Reader Task has no data to read and will close it.

However, for the Flink engine, after the Reader sends out the singleNoMoreElement signal, it can only notify the ReaderContext that we have encapsulated ourselves, but cannot notify the outer Flink Reader. How to transmit the signal that the current Reader data has been read to the end to the Flink engine has become a challenge. Thanks to the custom event rpc communication design of the Reader and Enumerator, signal transmission has become very simple, The overall process is as follows:

This model may seem perfect, but in reality, the effect is not as good as we expected. Due to the network overhead in RPC communication, the engine side will always call the reader's pullNext method during information transmission. In some connectors that do not achieve split deduplication, the reader will repeatedly issue data before receiving the end signal.

To solve this problem, after sending the end signal, the reader side will actively sleep for 2 seconds to avoid the risk of duplicate data issuance. However, this is not an optimal solution, and the optimal solution is to rewrite all Source reader logic.

Renovation plan

Stage 1:Refactor Flink translation layer,to achieve functional alignment,temporarily using thread sleep to avoid the logic of duplicate data distribution.

Stage 2:Refactor the logic of source reader & source enumerator, ensure the correctness of the model.

Stage 3:Flink engine supports rate limiting function.

Stage 4:Flink engine supports metric reporting function.

Stage 5:Flink engine supports multi table read/sink function.

  • No labels