THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Proposed Changes
- Mark constructor of SourceReaderBase and SingleThreadMultiplexSourceReaderBase as
@Depricated
. Add new constructors without and provide a new constructor without FutureCompletingBlockingQueue
Code Block |
---|
// SingleThreadMultiplexSourceReaderBase @Depricated public SingleThreadMultiplexSourceReaderBase( @PublicEvolving public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>implements elementsQueueSourceReader<T, SplitT> { @Deprecated SingleThreadFetcherManager<E, SplitT>public splitFetcherManager,SourceReaderBase( RecordEmitter<E,FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> TelementsQueue, SplitStateT> SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { superthis(elementsQueue, splitFetcherManager, recordEmitter, null, config, context); } @Depricated @Deprecated public SingleThreadMultiplexSourceReaderBaseSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<ESplitFetcherManager<E, SplitT>>SplitT> splitReaderSuppliersplitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration@Nullable RecordEvaluator<T> configeofRecordEvaluator, SourceReaderContext context) {Configuration config, super( SourceReaderContext context) { //this.elementsQueue = elementsQueue,; this.splitFetcherManager = splitFetcherManager; new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config), this.recordEmitter = recordEmitter; this.splitStates = new HashMap<>(); recordEmitter, this.options = new SourceReaderOptions(config); this.config = config,; this.context = context; this.noMoreSplitsAssignment = context)false; } // todo: Add new constructors without FutureCompletingBlockingQueue public SingleThreadMultiplexSourceReaderBase( this.eofRecordEvaluator = eofRecordEvaluator; numRecordsInCounter Supplier<SplitReader<E, SplitT>> splitReaderSupplier,= context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); } // todo: RecordEmitter<E,provide T,a SplitStateT>new recordEmitter, constructor without FutureCompletingBlockingQueue public SourceReaderBase( Configuration config, SplitFetcherManager<E, SplitT> splitFetcherManager, SourceReaderContext context) { RecordEmitter<E, super(T, SplitStateT> recordEmitter, new SingleThreadFetcherManager<>(splitReaderSupplier, config),Configuration config, SourceReaderContext recordEmitter,context) { this(splitFetcherManager, recordEmitter, null, config, context); } public SingleThreadMultiplexSourceReaderBaseSourceReaderBase( SingleThreadFetcherManager<ESplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context) { super(this.splitFetcherManager = splitFetcherManager; this.recordEmitter = recordEmitter; splitFetcherManager, this.splitStates = new HashMap<>(); this.options = recordEmitter,new SourceReaderOptions(config); this.config = config; eofRecordEvaluator, this.context = context; this.noMoreSplitsAssignment config,= false; this.eofRecordEvaluator = eofRecordEvaluator; numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); } |
2. Mark constructor of SourceReaderBase as @Depricated
and provide a new constructor without
...
}
} |
Code Block |
---|
@PublicEvolving public abstract class SourceReaderBase<ESingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, implements SourceReader<TT, SplitT, SplitT>SplitStateT> { @Depricated @Deprecated public SourceReaderBaseSingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<ESingleThreadFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { thissuper(elementsQueue, splitFetcherManager, recordEmitter, null, config, context); } @Deprecated @Depricated public SourceReaderBaseSingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<ESupplier<SplitReader<E, SplitT>SplitT>> splitFetcherManagersplitReaderSupplier, RecordEmitter<E, T, SplitStateT> recordEmitter, @NullableConfiguration RecordEvaluator<T> eofRecordEvaluatorconfig, SourceReaderContext Configurationcontext) config,{ super( SourceReaderContext context) { //this.elementsQueue = elementsQueue;, this.splitFetcherManager = splitFetcherManager; new this.recordEmitter = recordEmitter;SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config), this.splitStates = new HashMap<>(); recordEmitter, this.options = new SourceReaderOptions(config); this.config = config;, this.context = context); } // todo: Add new constructors without FutureCompletingBlockingQueue this.noMoreSplitsAssignment = false; public SingleThreadMultiplexSourceReaderBase( this.eofRecordEvaluator = eofRecordEvaluator; Supplier<SplitReader<E, SplitT>> splitReaderSupplier, numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); }RecordEmitter<E, T, SplitStateT> // todo: provide a new constructor without FutureCompletingBlockingQueue public SourceReaderBase(recordEmitter, Configuration config, SplitFetcherManager<E,SourceReaderContext SplitT>context) splitFetcherManager,{ super( RecordEmitter<E, T, SplitStateT> recordEmitter, Configurationnew SingleThreadFetcherManager<>(splitReaderSupplier, config),, SourceReaderContext context) { recordEmitter, this(splitFetcherManager, recordEmitter, null, config, context); } public SourceReaderBaseSingleThreadMultiplexSourceReaderBase( SplitFetcherManager<ESingleThreadFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context) { this.splitFetcherManager = splitFetcherManager; super( this.recordEmitter = recordEmitter; splitFetcherManager, this.splitStates = new HashMap<>(); this.options = new SourceReaderOptions(config); recordEmitter, this.config = config; eofRecordEvaluator, this.context = context; this.noMoreSplitsAssignment = false;config, this.eofRecordEvaluator = eofRecordEvaluator; context); numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); } } } |
3. Mark constructor of SplitFetcherManager andSingleThreadFetcherManager as @Depricated
and provide a new constructor without FutureCompletingBlockingQueue. Mark SplitFetcherManager andSingleThreadFetcherManager as `@PublicEvolving`.
...
5. Mark SplitFetcher and SplitFetcherTask as PublicEvolving.
Code Block |
---|
@PublicEvolving
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
FetchTask(
SplitReader<E, SplitT> splitReader,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Consumer<Collection<String>> splitFinishedCallback,
int fetcherIndex) {
this.splitReader = splitReader;
this.elementsQueue = elementsQueue;
this.splitFinishedCallback = splitFinishedCallback;
this.lastRecords = null;
this.fetcherIndex = fetcherIndex;
this.wakeup = false;
}
} |
Code Block |
---|
@PublicEvolving
public interface SplitFetcherTask {} |
Compatibility, Deprecation, and Migration Plan
...