Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion threadhttps://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987
Vote thread

J

IRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33465

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).Motivation

...

  1. Mark constructor of SingleThreadMultiplexSourceReaderBase as @Depricated . Add new constructors without FutureCompletingBlockingQueue
Code Block
// SingleThreadMultiplexSourceReaderBase

@Depricated
public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
 }

@Depricated
public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                elementsQueue,
                new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config),
                recordEmitter,
                config,
                context);
}


// todo: Add new constructors without FutureCompletingBlockingQueue
 public SingleThreadMultiplexSourceReaderBase(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                new SingleThreadFetcherManager<>(splitReaderSupplier, config),,
                recordEmitter,
                config,
                context);
    }

    public SingleThreadMultiplexSourceReaderBase(
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        super(
                splitFetcherManager,
                recordEmitter,
                eofRecordEvaluator,
                config,
                context);
    }

2. Mark constructor of SourceReaderBase as @Depricated and provide a new constructor without 

FutureCompletingBlockingQueue

Code Block
@Depricated  
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
        implements SourceReader<T, SplitT> {   


 	@Deprecated
    public SourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        this(elementsQueue, splitFetcherManager, recordEmitter, null, config, context);
    }


    @Deprecated
    public SourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        //this.elementsQueue = elementsQueue;
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.splitStates = new HashMap<>();
        this.options = new SourceReaderOptions(config);
        this.config = config;
        this.context = context;
        this.noMoreSplitsAssignment = false;
        this.eofRecordEvaluator = eofRecordEvaluator;

        numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }  

	// todo: provide Adda new constructor without FutureCompletingBlockingQueue
 without FutureCompletingBlockingQueue
	public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        this(splitFetcherManager, recordEmitter, null, config, context);
    }


	public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.splitStates = new HashMap<>();
        this.options = new SourceReaderOptions(config);
        this.config = config;
        this.context = context;
        this.noMoreSplitsAssignment = false;
        this.eofRecordEvaluator = eofRecordEvaluator;

        numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }
}

3. Mark constructor of SplitFetcherManager andSingleThreadFetcherManager as  @Depricated and provide a new constructor without FutureCompletingBlockingQueue. Mark SplitFetcherManager andSingleThreadFetcherManager as `@PublicEvolving`.

Code Block
titleSingleThreadFetcherManagerSplitFetcherManager
@PublicEvolving
public abstract class SingleThreadFetcherManager<ESplitFetcherManager<E, SplitT extends SourceSplit>
 {        
	@Deprecated
 extends SplitFetcherManager<E, SplitT> {

	@Depricated   
	public SingleThreadFetcherManagerSplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSuppliersplitReaderFactory,
            Configuration configuration) {
        	superthis(elementsQueue, splitReaderSuppliersplitReaderFactory, configuration, (ignore) -> {
        });
  	 }

	//todo: Add  }       

    @Deprecated
    @VisibleForTesting
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {

	}



 // todo: provide a new constructor without FutureCompletingBlockingQueue
	without FutureCompletingBlockingQueue.     
 public SingleThreadFetcherManagerSplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSuppliersplitReaderFactory,
            Configuration configuration) {
        	superthis(splitReaderSuppliersplitReaderFactory, configuration, (ignore);
	}
 -> {
        });
        
  }

 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        this.elementsQueue = new FutureCompletingBlockingQueue<>(
                configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
		// ......
}


Code Block
titleSplitFetcherManagerSingleThreadFetcherManager
@PublicEvolving
public abstract class SplitFetcherManager<ESingleThreadFetcherManager<E, SplitT extends SourceSplit>
        extends SplitFetcherManager<E, SplitT> {  
   	
	@Depricated    @Deprecated
    public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        this(elementsQueue, splitReaderSupplier, new Configuration());
    }

  	@Deprecated
    public SplitFetcherManagerSingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactorysplitReaderSupplier,
            Configuration configuration) {
        thissuper(elementsQueue, splitReaderFactorysplitReaderSupplier, configuration, (ignore) -> {});
    }
	
  	
	// todo: provide@Deprecated
 a new constructor without FutureCompletingBlockingQueue.@VisibleForTesting
    public SplitFetcherManagerSingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactorysplitReaderSupplier,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        this.elementsQueue = new FutureCompletingBlockingQueue<>super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
    }

 	public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        this(splitReaderSupplier, new Configuration());
    }


    public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration  	config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),;configuration) {
        super(splitReaderSupplier, configuration);
    }

    public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        super(splitReaderSupplier, configuration, splitFinishedHook);
    }
}



4. SplitFetcherManager provides  wrapper methods for FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.

Code Block
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
	
	/**
	 * returns the RecordsWithSplitIds produced by public T poll();
 public CompletableFuture<Void> getAvailabilityFuture();
 SplitReader.
	**/
	public RecordsWithSplitIds<E> poll(){
		return elementsQueue.poll();
	}

 	/**
     * Returns the availability future. If the queue is non-empty, then this future will already be
     * complete. Otherwise the obtained future is guaranteed to get completed the next time the
     * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
     *
     * <p>It is important that a completed future is no guarantee that the next call to {@link
     * #poll()} will return a non-null element. If there are concurrent consumer, another consumer
     * may have taken the available element. Or there was no element in the first place, because the
     * future was completed through a call to {@link #notifyAvailable()}.
     *
     * <p>For that reason, it is important to call this method (to obtain a new future) every time
     * again after {@link #poll()} returned null and you want to wait for data.
     */
	public CompletableFuture<Void> getAvailabilityFuture(){
 		return elementsQueue.getAvailabilityFuture();
	}

     /**
     * Makes sure the availability future is complete, if it is not complete already. All futures
     * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be
     * completed.
     *
     * <p>All future calls to the method will return a completed future, until the point that the
     * availability is reset via calls to {@link #poll()} that leave the queue empty.
     */
	public void notifyAvailable(){
		elementsQueue.notifyAvailable();
	}

   /** Checks whether is no data available. */
 	public boolean noAvailableElement(){
		return elementsQueue.isEmpty();

}

...

 	}
}




Compatibility, Deprecation, and Migration Plan

Any change to  SingleThreadFetcherManager , FutureCompletingBlockingQueue   and parent class SplitFetcherManager  later will be careful for compatilityThe Connectors that utilize constructors (including param elementsQueue) of SingleThreadMultiplexSourceReaderBase, SourceReaderBase, and SingleThreadFetcherManager should be migrated to the new constructor that does not have elementsQueue parameter.

Test Plan

nothing else to do.

...