THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
@PublicEvolving public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { @Depricated public SingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SingleThreadFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { super(elementsQueue, splitFetcherManager, recordEmitter, config, context); } @Depricated public SingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { super( elementsQueue, new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config), recordEmitter, config, context); } // todo: Add new constructors without FutureCompletingBlockingQueue public SingleThreadMultiplexSourceReaderBase( Supplier<SplitReader<E, SplitT>> splitReaderSupplier, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { super( new SingleThreadFetcherManager<>(splitReaderSupplier, config),, recordEmitter, config, context); } public SingleThreadMultiplexSourceReaderBase( SingleThreadFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context) { super( splitFetcherManager, recordEmitter, eofRecordEvaluator, config, context); } } |
3. Mark SplitFetcherManager andSingleThreadFetcherManager as `@PublicEvolving`, mark constructor of SplitFetcherManager andSingleThreadFetcherManager and SingleThreadFetcherManager as @Depricated
and provide a new constructor without FutureCompletingBlockingQueue. Mark SplitFetcherManager andSingleThreadFetcherManager as `@PublicEvolving`.without FutureCompletingBlockingQueue.
Code Block | ||
---|---|---|
| ||
@PublicEvolving public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { @Deprecated public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration) { this(elementsQueue, splitReaderFactory, configuration, (ignore) -> { }); } @Deprecated @VisibleForTesting public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration, Consumer<Collection<String>> splitFinishedHook) { } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration) { this(splitReaderFactory, configuration, (ignore) -> { }); } public SplitFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration, Consumer<Collection<String>> splitFinishedHook) { this.elementsQueue = new FutureCompletingBlockingQueue<>( configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)); // ...... } |
...