THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block |
---|
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
implements SourceReader<T, SplitT> {
@Deprecated
public SourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
this(elementsQueue, splitFetcherManager, recordEmitter, null, config, context);
}
@Deprecated
public SourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context) {
//this.elementsQueue = elementsQueue;
this.splitFetcherManager = splitFetcherManager;
this.recordEmitter = recordEmitter;
this.splitStates = new HashMap<>();
this.options = new SourceReaderOptions(config);
this.config = config;
this.context = context;
this.noMoreSplitsAssignment = false;
this.eofRecordEvaluator = eofRecordEvaluator;
numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
}
// todo: provide a new constructor without FutureCompletingBlockingQueue
public SourceReaderBase(
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
this(splitFetcherManager, recordEmitter, null, config, context);
}
// todo: provide a new constructor without FutureCompletingBlockingQueue
public SourceReaderBase(
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context) {
this.splitFetcherManager = splitFetcherManager;
this.recordEmitter = recordEmitter;
this.splitStates = new HashMap<>();
this.options = new SourceReaderOptions(config);
this.config = config;
this.context = context;
this.noMoreSplitsAssignment = false;
this.eofRecordEvaluator = eofRecordEvaluator;
numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
}
} |
...
Code Block |
---|
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> {
@Depricated
public SingleThreadMultiplexSourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
}
@Depricated
public SingleThreadMultiplexSourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
super(
elementsQueue,
new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config),
recordEmitter,
config,
context);
}
// todo: Add new constructors without FutureCompletingBlockingQueue
public SingleThreadMultiplexSourceReaderBase(
Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
RecordEmitter<E, T, SplitStateT> recordEmitter,
Configuration config,
SourceReaderContext context) {
super(
new SingleThreadFetcherManager<>(splitReaderSupplier, config),,
recordEmitter,
config,
context);
}
// todo: provide a new constructor without FutureCompletingBlockingQueue
public SingleThreadMultiplexSourceReaderBase(
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context) {
super(
splitFetcherManager,
recordEmitter,
eofRecordEvaluator,
config,
context);
}
} |
...