THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
@PublicEvolving public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { @Deprecated public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration) { this(elementsQueue, splitReaderFactory, configuration, (ignore) -> { }); } @Deprecated @VisibleForTesting public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration, Consumer<Collection<String>> splitFinishedHook) { } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration) { this(splitReaderFactory, configuration, (ignore) -> { }); } public // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration, Consumer<Collection<String>> splitFinishedHook) { this.elementsQueue = new FutureCompletingBlockingQueue<>( configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)); // ...... } /** * returns the RecordsWithSplitIds produced by SplitReader. **/ public RecordsWithSplitIds<E> poll(){ return elementsQueue.poll(); } /** * Returns the availability future. If the queue is non-empty, then this future will already be * complete. Otherwise the obtained future is guaranteed to get completed the next time the * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}. * * <p>It is important that a completed future is no guarantee that the next call to {@link * #poll()} will return a non-null element. If there are concurrent consumer, another consumer * may have taken the available element. Or there was no element in the first place, because the * future was completed through a call to {@link #notifyAvailable()}. * * <p>For that reason, it is important to call this method (to obtain a new future) every time * again after {@link #poll()} returned null and you want to wait for data. */ public CompletableFuture<Void> getAvailabilityFuture(){ return elementsQueue.getAvailabilityFuture(); } /** * Makes sure the availability future is complete, if it is not complete already. All futures * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be * completed. * * <p>All future calls to the method will return a completed future, until the point that the * availability is reset via calls to {@link #poll()} that leave the queue empty. */ public void notifyAvailable(){ elementsQueue.notifyAvailable(); } /** Checks whether is no data available. */ public boolean noAvailableElement(){ return elementsQueue.isEmpty(); } } |
...
Code Block |
---|
@PublicEvolving public class SingleThreadFetcherManager<E, SplitT extends SourceSplit> extends SplitFetcherManager<E, SplitT> { @Deprecated public SingleThreadFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier) { this(elementsQueue, splitReaderSupplier, new Configuration()); } @Deprecated public SingleThreadFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration) { super(elementsQueue, splitReaderSupplier, configuration); } @Deprecated @VisibleForTesting public SingleThreadFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration, Consumer<Collection<String>> splitFinishedHook) { super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook); } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderSupplier) { this(splitReaderSupplier, new Configuration()); } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration) { super(splitReaderSupplier, configuration); } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration, Consumer<Collection<String>> splitFinishedHook) { super(splitReaderSupplier, configuration, splitFinishedHook); } } |
...
Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher, so it can only created by org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#createSplitFetcher
Code Block |
---|
@Internal@PublicEvolving public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { SplitFetcher( int id, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitReader<E, SplitT> splitReader, Consumer<Throwable> errorHandler, Runnable shutdownHook, Consumer<Collection<String>> splitFinishedHook, boolean allowUnalignedSourceSplits); } |
...