Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleFlinkSourceEnumerator.java
linenumberstrue
collapsetrue
/**
 * The implementation of {@link SplitEnumerator}, used for proxy all {@link SourceSplitEnumerator}
 * in flink.
 *
 * @param <SplitT> The generic type of source split
 * @param <EnumStateT> The generic type of enumerator state
 */
public class FlinkSourceEnumerator<SplitT extends SourceSplit, EnumStateT>
        implements SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> {

    private static final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceEnumerator.class);

    private final SourceSplitEnumerator<SplitT, EnumStateT> sourceSplitEnumerator;

    private final SplitEnumeratorContext<SplitWrapper<SplitT>> enumeratorContext;

    private final int parallelism;

    private final Object lock = new Object();

    private volatile boolean isRun = false;

    private volatile int currentRegisterReaders = 0;

    public FlinkSourceEnumerator(
            SourceSplitEnumerator<SplitT, EnumStateT> enumerator,
            SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) {
        this.sourceSplitEnumerator = enumerator;
        this.enumeratorContext = enumContext;
        this.parallelism = enumeratorContext.currentParallelism();
    }

    @Override
    public void start() {
        sourceSplitEnumerator.open();
    }

    @Override
    public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) {
        sourceSplitEnumerator.handleSplitRequest(subtaskId);
    }

    @Override
    public void addSplitsBack(List<SplitWrapper<SplitT>> splits, int subtaskId) {
        sourceSplitEnumerator.addSplitsBack(
                splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()),
                subtaskId);
    }

    @Override
    public void addReader(int subtaskId) {
        sourceSplitEnumerator.registerReader(subtaskId);
        synchronized (lock) {
            currentRegisterReaders++;
            if (!isRun && currentRegisterReaders == parallelism) {
                try {
                    sourceSplitEnumerator.run();
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
                isRun = true;
            }
        }
    }

    @Override
    public EnumStateT snapshotState(long checkpointId) throws Exception {
        return sourceSplitEnumerator.snapshotState(checkpointId);
    }

    @Override
    public void close() throws IOException {
        sourceSplitEnumerator.close();
    }

    @Override
    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
        if (sourceEvent instanceof NoMoreElementEvent) {
            LOGGER.info(
                    "Received NoMoreElementEvent from reader [{}], current registered readers [{}]",
                    subtaskId,
                    enumeratorContext.currentParallelism());
            enumeratorContext.sendEventToSourceReader(subtaskId, sourceEvent);
        }
        if (sourceEvent instanceof SourceEventWrapper) {
            sourceSplitEnumerator.handleSourceEvent(
                    subtaskId, (((SourceEventWrapper) sourceEvent).getSourceEvent()));
        }
    }

    @Override
    public void notifyCheckpointComplete(long checkpointId) throws Exception {
        sourceSplitEnumerator.notifyCheckpointComplete(checkpointId);
    }

    @Override
    public void notifyCheckpointAborted(long checkpointId) throws Exception {
        sourceSplitEnumerator.notifyCheckpointAborted(checkpointId);
    }
}

...

For more code details, you can reference:https://github.com/apache/seatunnel/pull/5100

Technical difficulties

Due to the close integration of Zeta engine side and SeaTunnel Source API, after the Reader sends a singleNoMoreElement signal, the engine side can perceive that the current Reader Task has no data to read and will close it.

However, for the Flink engine, after the Reader sends out the singleNoMoreElement signal, it can only notify the ReaderContext that we have encapsulated ourselves, but cannot notify the outer Flink Reader. How to transmit the signal that the current Reader data has been read to the end to the Flink engine has become a challenge. Thanks to the custom event rpc communication design of the Reader and Enumerator, signal transmission has become very simple, The overall process is as follows:

Image Modified

This model may seem perfect, but in reality, the effect is not as good as we expected. Due to the network overhead in RPC communication, the engine side will always call the reader's pullNext method during information transmission. In some connectors that do not achieve split deduplication, the reader will repeatedly issue data before receiving the end signal.

To solve this problem, after sending the end signal, the reader side will actively sleep for 2 seconds to avoid the risk of duplicate data issuance. However, this is not an optimal solution, and the optimal solution is to rewrite all Source reader logic.

Renovation plan

Stage 1:Refactor Flink translation layer,to achieve functional alignment,temporarily using thread sleep to avoid the logic of duplicate data distribution.

...