THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- Change SplitFetcherManager from Internal to PublicEvolving.
- Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance internally.
- Add a few new methods to expose the functionality of the internal FutureCompletingBlockingQueue via the SplitFetcherManagerinternal method SplitFetcherManager#getQueue() for SourceReaderBase usage.
Code Block |
---|
@PublicEvolving public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { @Deprecated public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration) { this(elementsQueue, splitReaderFactory, configuration, (ignore) -> { }); } @Deprecated @VisibleForTesting public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration, Consumer<Collection<String>> splitFinishedHook) { } // todo: provide a new constructor without FutureCompletingBlockingQueuewithout FutureCompletingBlockingQueue. public SplitFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration) { this(splitReaderFactory, configuration, (ignore) -> { }); } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration, Consumer<Collection<String>> splitFinishedHook) { this.elementsQueue = new FutureCompletingBlockingQueue<>( configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)); // ...... } /** * returns the RecordsWithSplitIds produced by SplitReader. **/ public RecordsWithSplitIds<E> poll(){ return elementsQueue.poll(); } /** * Returns the availability future. If the queue is non-empty, then this future will already be * complete. Otherwise the obtained future is guaranteed to get completed the next time the * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}. * * <p>It is important that a completed future is no guarantee that the next call to {@link * #poll()} will return a non-null element. If there are concurrent consumer, another consumer * may have taken the available element. Or there was no element in the first place, because the * future was completed through a call to {@link #notifyAvailable()}. * * <p>For that reason, it is important to call this method (to obtain a new future) every time * again after {@link #poll()} returned null and you want to wait for data. */ public CompletableFuture<Void> getAvailabilityFuture(){ return elementsQueue.getAvailabilityFuture(); } /** * Makes sure the availability future is complete, if it is not complete already. All futures * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be * completed. * * <p>All future calls to the method will return a completed future, until the point that the * availability is reset via calls to {@link #poll()} that leave the queue empty. */ public void notifyAvailable(){ elementsQueue.notifyAvailable(); } /** Checks whether is no data available. */ public boolean noAvailableElement(){configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)); return elementsQueue.isEmpty// ...... } @Internal public SplitFetcherManager#getQueue(); } } |
SingleThreadFetcherManager
...
the constructor of SplitFetcher to the end users?
SourceReaderBase
Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally
...
.
Code Block |
---|
@PublicEvolving public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> implements SourceReader<T, SplitT> { @Deprecated public SourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { this(elementsQueue, splitFetcherManager, recordEmitter, null, config, context); } @Deprecated public SourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context) { //this.elementsQueue = elementsQueue; this.splitFetcherManager = splitFetcherManager; this.recordEmitter = recordEmitter; this.splitStates = new HashMap<>(); this.options = new SourceReaderOptions(config); this.config = config; this.context = context; this.noMoreSplitsAssignment = false; this.eofRecordEvaluator = eofRecordEvaluator; numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); } // todo: provide a new constructor without FutureCompletingBlockingQueue public SourceReaderBase( SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { this(splitFetcherManager, recordEmitter, null, config, context); } // todo: provide a new constructor without FutureCompletingBlockingQueue public SourceReaderBase( SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context) { this.splitFetcherManager = splitFetcherManager; this.recordEmitter = recordEmitter; this.splitStates = new HashMap<>(); this.options = new SourceReaderOptions(config); this.config = config; this.context = context; this.noMoreSplitsAssignment = false; this.eofRecordEvaluator = eofRecordEvaluator; numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); } } |
...