...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * The implementation of {@link SplitEnumerator}, used for proxy all {@link SourceSplitEnumerator} * in flink. * * @param <SplitT> The generic type of source split * @param <EnumStateT> The generic type of enumerator state */ public class FlinkSourceEnumerator<SplitT extends SourceSplit, EnumStateT> implements SplitEnumerator<SplitWrapper<SplitT>, EnumStateT> { private static final Logger LOGGER = LoggerFactory.getLogger(FlinkSourceEnumerator.class); private final SourceSplitEnumerator<SplitT, EnumStateT> sourceSplitEnumerator; private final SplitEnumeratorContext<SplitWrapper<SplitT>> enumeratorContext; private final int parallelism; private final Object lock = new Object(); private volatile boolean isRun = false; private volatile int currentRegisterReaders = 0; public FlinkSourceEnumerator( SourceSplitEnumerator<SplitT, EnumStateT> enumerator, SplitEnumeratorContext<SplitWrapper<SplitT>> enumContext) { this.sourceSplitEnumerator = enumerator; this.enumeratorContext = enumContext; this.parallelism = enumeratorContext.currentParallelism(); } @Override public void start() { sourceSplitEnumerator.open(); } @Override public void handleSplitRequest(int subtaskId, @Nullable String requesterHostname) { sourceSplitEnumerator.handleSplitRequest(subtaskId); } @Override public void addSplitsBack(List<SplitWrapper<SplitT>> splits, int subtaskId) { sourceSplitEnumerator.addSplitsBack( splits.stream().map(SplitWrapper::getSourceSplit).collect(Collectors.toList()), subtaskId); } @Override public void addReader(int subtaskId) { sourceSplitEnumerator.registerReader(subtaskId); synchronized (lock) { currentRegisterReaders++; if (!isRun && currentRegisterReaders == parallelism) { try { sourceSplitEnumerator.run(); } catch (Exception e) { throw new RuntimeException(e); } isRun = true; } } } @Override public EnumStateT snapshotState(long checkpointId) throws Exception { return sourceSplitEnumerator.snapshotState(checkpointId); } @Override public void close() throws IOException { sourceSplitEnumerator.close(); } @Override public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) { if (sourceEvent instanceof NoMoreElementEvent) { LOGGER.info( "Received NoMoreElementEvent from reader [{}], current registered readers [{}]", subtaskId, enumeratorContext.currentParallelism()); enumeratorContext.sendEventToSourceReader(subtaskId, sourceEvent); } if (sourceEvent instanceof SourceEventWrapper) { sourceSplitEnumerator.handleSourceEvent( subtaskId, (((SourceEventWrapper) sourceEvent).getSourceEvent())); } } @Override public void notifyCheckpointComplete(long checkpointId) throws Exception { sourceSplitEnumerator.notifyCheckpointComplete(checkpointId); } @Override public void notifyCheckpointAborted(long checkpointId) throws Exception { sourceSplitEnumerator.notifyCheckpointAborted(checkpointId); } } |
...
Due to the close integration of Zeta engine side and SeaTunnel Source API, after the Reader sends a singleNoMoreElement signal, the engine side can perceive that the current Reader Task has no data to read and will close it.
However, for the Flink engine, after the Reader sends out the singleNoMoreElement signal, it can only notify the ReaderContext that we have encapsulated ourselves, but cannot notify the outer Flink Reader. How to transmit the signal that the current Reader data has been read to the end to the Flink engine has become a challenge. Thanks to the custom event rpc communication design of the Reader and Enumerator, signal transmission has become very simple, The overall process is as follows:
Stage 1:Refactor Flink translation layer,to achieve functional alignment,temporarily using thread sleep to avoid the logic of duplicate data distribution.
...