Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

  1. Mark constructor of SourceReaderBase and SingleThreadMultiplexSourceReaderBase as @Depricated . Add new constructors without and provide a new constructor without  FutureCompletingBlockingQueue
Code Block
// SingleThreadMultiplexSourceReaderBase

@Depricated
public SingleThreadMultiplexSourceReaderBase(
    @PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
        FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>implements elementsQueueSourceReader<T,
 SplitT> {  
 
 
    @Deprecated
  SingleThreadFetcherManager<E,  SplitT>public splitFetcherManager,SourceReaderBase(
            RecordEmitter<E,FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> TelementsQueue,
 SplitStateT>            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        superthis(elementsQueue, splitFetcherManager, recordEmitter, null, config, context);
    }
 
 
@Depricated
    @Deprecated
    public SingleThreadMultiplexSourceReaderBaseSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<ESplitFetcherManager<E, SplitT>>SplitT> splitReaderSuppliersplitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration@Nullable RecordEvaluator<T> configeofRecordEvaluator,
            SourceReaderContext context) {Configuration config,
        super(
    SourceReaderContext context) {
        //this.elementsQueue = elementsQueue,;
        this.splitFetcherManager = splitFetcherManager;
       new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config),
 this.recordEmitter = recordEmitter;
        this.splitStates = new HashMap<>();
      recordEmitter,
  this.options = new SourceReaderOptions(config);
        this.config  = config,;
        this.context = context;
        this.noMoreSplitsAssignment = context)false;
}


// todo: Add new constructors without FutureCompletingBlockingQueue
 public SingleThreadMultiplexSourceReaderBase(
    this.eofRecordEvaluator = eofRecordEvaluator;
 
        numRecordsInCounter Supplier<SplitReader<E, SplitT>> splitReaderSupplier,= context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    } 
 
    // todo: RecordEmitter<E,provide T,a SplitStateT>new recordEmitter,
constructor without FutureCompletingBlockingQueue
    public SourceReaderBase(
     Configuration config,
      SplitFetcherManager<E, SplitT> splitFetcherManager,
    SourceReaderContext context) {
      RecordEmitter<E,  super(T, SplitStateT> recordEmitter,
                new SingleThreadFetcherManager<>(splitReaderSupplier, config),Configuration config,
            SourceReaderContext    recordEmitter,context) {
        this(splitFetcherManager, recordEmitter, null,      config,
                context);
    }
 
 
    public SingleThreadMultiplexSourceReaderBaseSourceReaderBase(
            SingleThreadFetcherManager<ESplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        super(this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
      splitFetcherManager,
  this.splitStates = new HashMap<>();
        this.options =  recordEmitter,new SourceReaderOptions(config);
        this.config = config;
      eofRecordEvaluator,
  this.context = context;
        this.noMoreSplitsAssignment    config,= false;
        this.eofRecordEvaluator = eofRecordEvaluator;
 
        numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }

2. Mark constructor of SourceReaderBase as @Depricated and provide a new constructor without 

...

}
}


Code Block
@PublicEvolving
public abstract class SourceReaderBase<ESingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
 extends SourceReaderBase<E,      implements SourceReader<TT, SplitT, SplitT>SplitStateT> {   

	@Depricated
 	@Deprecated
    public SourceReaderBaseSingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<ESingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        thissuper(elementsQueue, splitFetcherManager, recordEmitter, null, config, context);
    	}


    @Deprecated
    	@Depricated
	public SourceReaderBaseSingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<ESupplier<SplitReader<E, SplitT>SplitT>> splitFetcherManagersplitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @NullableConfiguration RecordEvaluator<T> eofRecordEvaluatorconfig,
            SourceReaderContext Configurationcontext) config,{
        super(
     SourceReaderContext context) {
        //this.elementsQueue = elementsQueue;,
        this.splitFetcherManager = splitFetcherManager;
      new  this.recordEmitter = recordEmitter;SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config),
        this.splitStates = new HashMap<>();
     recordEmitter,
   this.options = new SourceReaderOptions(config);
        this.config = config;,
        this.context =       context);
	}


	// todo: Add new constructors without FutureCompletingBlockingQueue
  this.noMoreSplitsAssignment = false;
	public SingleThreadMultiplexSourceReaderBase(
          this.eofRecordEvaluator = eofRecordEvaluator;

   Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
      numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }RecordEmitter<E, T, SplitStateT> 

	// todo: provide a new constructor without FutureCompletingBlockingQueue
	public SourceReaderBase(recordEmitter,
            Configuration config,
            SplitFetcherManager<E,SourceReaderContext SplitT>context) splitFetcherManager,{
        	super(
    RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configurationnew SingleThreadFetcherManager<>(splitReaderSupplier, config),,
            SourceReaderContext context) {
   recordEmitter,
         this(splitFetcherManager, recordEmitter, null,       config,
                context);
    }


	    public SourceReaderBaseSingleThreadMultiplexSourceReaderBase(
            SplitFetcherManager<ESingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        this.splitFetcherManager = splitFetcherManager;
super(
          this.recordEmitter = recordEmitter;
    splitFetcherManager,
    this.splitStates = new HashMap<>();
        this.options = new SourceReaderOptions(config);
 recordEmitter,
          this.config = config;
    eofRecordEvaluator,
    this.context = context;
        this.noMoreSplitsAssignment = false;config,
        this.eofRecordEvaluator = eofRecordEvaluator;

      context);
  numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }
 }
}



3. Mark constructor of SplitFetcherManager andSingleThreadFetcherManager as  @Depricated and provide a new constructor without FutureCompletingBlockingQueue. Mark SplitFetcherManager andSingleThreadFetcherManager as `@PublicEvolving`.

...

5. Mark SplitFetcher and SplitFetcherTask as PublicEvolving.

Code Block
@PublicEvolving
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
    FetchTask(
            SplitReader<E, SplitT> splitReader,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Consumer<Collection<String>> splitFinishedCallback,
            int fetcherIndex) {
        this.splitReader = splitReader;
        this.elementsQueue = elementsQueue;
        this.splitFinishedCallback = splitFinishedCallback;
        this.lastRecords = null;
        this.fetcherIndex = fetcherIndex;
        this.wakeup = false;
    }
}


Code Block
@PublicEvolving
public interface SplitFetcherTask {}




Compatibility, Deprecation, and Migration Plan

...