Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987
Vote threadhttps://lists.apache.org/thread/t1zff21z440pvv48jyhm8pgtqsyplchn

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-
33465
32417

Release1.19

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

For FutureCompletingBlockingQueue, as a hindsight, it might be better to not expose it to the Source developers. They are unlikely to use it anywhere other than just constructing it. The reason that FutureCompletingBlockingQueue is currently exposed in the SourceReaderBase constructor is because both the SplitFetcherManager and SourceReaderBase need it. One way to hide the FutureCompletingBlockingQueue from the public API is to make SplitFetcherManager the only owner class of the queue, and expose some of its methods via SplitFetcherManager. This way, the SourceReaderBase can invoke the methods via SplitFetcherManager. I believe this also makes .  This will make the code slightly cleaner.

...

  • To expose the SplitFetcherManager / SingleThreadFetcheManager as Public, allowing connector developers to easily create their own threading models in the SourceReaderBase.
  • To hide the element queue from the connector developers and simplify the SourceReaderBase to consist of only SplitFetcherManager and RecordEmitter as major components. make SplitFetcherManager the only owner class of the queue

Public Interfaces

SplitFetcherManager

  • Change SplitFetcherManager from Internal to PublicEvolving.
  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance internally.
  • Add a few new methods to expose the functionality of the internal FutureCompletingBlockingQueue via the SplitFetcherManagerinternal method SplitFetcherManager#getQueue() for SourceReaderBase usage.
Code Block
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {    
    
	@Deprecated
    public SplitFetcherManagerSingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueueSupplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Supplier<SplitReader<ERecordEmitter<E, T, SplitT>>SplitStateT> splitReaderFactoryrecordEmitter,
            Configuration configuration) {
config,
          this(elementsQueue, splitReaderFactory, configuration,SourceReaderContext (ignorecontext) -> {
        });super(
    }       

    @Deprecated
    @VisibleForTesting
  new SingleThreadFetcherManager<>(splitReaderSupplier, config),
  public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueuerecordEmitter,
            Supplier<SplitReader<E,   SplitT>> splitReaderFactoryconfig,
            Configuration configuration,
     context);
       Consumer<Collection<String>> splitFinishedHook) {

	}



 // todo: provide a new constructor without FutureCompletingBlockingQueue.     
    @Deprecated
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {
        });
    }       

    }@Deprecated
    @VisibleForTesting
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {

	}



 // todo: provide a new constructor without this.elementsQueue = new FutureCompletingBlockingQueue<>(
   FutureCompletingBlockingQueue.     
 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
		// ......
	}

	/**
	 * returns the RecordsWithSplitIds produced by SplitReader.
	**/
	public RecordsWithSplitIds<E> poll(){
		return elementsQueue.poll();
	}

 	/**) {
        this(splitReaderFactory, configuration, (ignore) -> {
     * Returns the availability future. If the queue is non-empty, then this future will already be
     * complete. Otherwise the obtained future is guaranteed to get completed the next time the
     * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
     *});
        
  }

  // todo: provide a new constructor without FutureCompletingBlockingQueue.
  public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
     * <p>It is important that a completed future is no guarantee that the next call to {@link
  Consumer<Collection<String>> splitFinishedHook) {
   * #poll()} will return a non-null elementthis.elementsQueue If= there are concurrent consumer, another consumernew FutureCompletingBlockingQueue<>(
     * may have taken the available element. Or there was no element in the first place, because the
     * future was completed through a call to {@link #notifyAvailable()}.
     *
     * <p>For that reason, it is important to call this method (to obtain a new future) every time configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
		// ......
	}

	
	@Internal
	public FutureCompletingBlockingQueue getQueue();
 }


SingleThreadFetcherManager

  • Change SingleThreadFetcherManager from Internal to PublicEvolving.
  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors without FutureCompletingBlockingQueue as replacements which creates the FutureCompletingBlockingQueue instance by its parent class(SplitFetcherManager) internally.
Code Block
@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
        extends SplitFetcherManager<E, SplitT> {  

   	@Deprecated
    public SingleThreadFetcherManager(
     *   again after {@link #poll()} returnedFutureCompletingBlockingQueue<RecordsWithSplitIds<E>> nullelementsQueue,
 and you want to wait for data.
     */
	public CompletableFuture<Void> getAvailabilityFuture(){
 		return elementsQueue.getAvailabilityFuture();
	Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        this(elementsQueue, splitReaderSupplier, new Configuration());
    }

     /** 	@Deprecated
    public *SingleThreadFetcherManager(
 Makes sure the availability future is complete, if it is not complete already. All futures
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
       * returned by previous calls toSupplier<SplitReader<E, {@link #getAvailabilityFuture()} are guaranteed to be
SplitT>> splitReaderSupplier,
         * completed.
  Configuration configuration) {
 *
     * <p>All future calls to the method will return a completed future, until the point that thesuper(elementsQueue, splitReaderSupplier, configuration);
    }

    @Deprecated
    @VisibleForTesting
    public *SingleThreadFetcherManager(
 availability is reset via calls to {@link #poll()} that leave the queueFutureCompletingBlockingQueue<RecordsWithSplitIds<E>> empty.elementsQueue,
      */
	public void notifyAvailable(){
		elementsQueue.notifyAvailable();
	}

   /** Checks whether is no data available. */
 	public boolean noAvailableElement(){
		return elementsQueue.isEmpty();
 	}
}

SingleThreadFetcherManager

  • Change SingleThreadFetcherManager from Internal to PublicEvolving.
  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors without FutureCompletingBlockingQueue as replacements which creates the FutureCompletingBlockingQueue instance by its parent class(SplitFetcherManager) internally.
Code Block
@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
      Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
    }

  // todo: provide a new constructor without FutureCompletingBlockingQueue.     
  public SingleThreadFetcherManager(
            extendsSupplier<SplitReader<E, SplitFetcherManager<E,SplitT>> SplitT>splitReaderSupplier) {
  

    	@Deprecated
   super(splitReaderSupplier, publicnew SingleThreadFetcherManagerConfiguration());
    }  

 // todo: provide a new  FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,constructor without FutureCompletingBlockingQueue. 
     public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration) {
        thissuper(elementsQueue, splitReaderSupplier, new Configuration()configuration);
    }

 // todo: provide a  	@Deprecatednew constructor without FutureCompletingBlockingQueue. 
      public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueueSupplier<SplitReader<E,
 SplitT>> splitReaderSupplier,
          Supplier<SplitReader<E,  SplitT>>Configuration splitReaderSupplierconfiguration,
            ConfigurationConsumer<Collection<String>> configurationsplitFinishedHook) {
        super(elementsQueuesplitReaderSupplier, splitReaderSupplierconfiguration, configurationsplitFinishedHook);
    }

    @Deprecated
    @VisibleForTesting
    public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
}


SplitFetcherTask

Change SplitFetcherTask from Internal to PublicEvolving.

Code Block
@PublicEvolving
public interface SplitFetcherTask {}



SplitFetcher

Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher, so it can only created by org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#createSplitFetcher

Code Block
@PublicEvolving
 public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {    
	SplitFetcher(
            int id,
     Configuration configuration       FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Consumer<Collection<String>>SplitReader<E, splitFinishedHook)SplitT> {splitReader,
        super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
 Consumer<Throwable> errorHandler,
  }

  // todo: provide a new constructor without FutureCompletingBlockingQueue.    Runnable shutdownHook,
  public SingleThreadFetcherManager(
         Consumer<Collection<String>> splitFinishedHook,
  Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
       boolean allowUnalignedSourceSplits);
}


the constructor of SplitFetcher to the end users?

SourceReaderBase

Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.

Code Block
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> this(splitReaderSupplier, new Configuration());
    }


    public SingleThreadFetcherManager(
        implements SourceReader<T, SplitT> { Supplier<SplitReader<E, 
 SplitT>>
 splitReaderSupplier,
    @Deprecated
    public SourceReaderBase(
   Configuration configuration) {
       FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> super(splitReaderSupplierelementsQueue, configuration);

         }

   SplitFetcherManager<E, publicSplitT> SingleThreadFetcherManager(splitFetcherManager,
            Supplier<SplitReader<ERecordEmitter<E, T, SplitT>>SplitStateT> splitReaderSupplierrecordEmitter,
            Configuration configurationconfig,
            Consumer<Collection<String>>SourceReaderContext splitFinishedHookcontext) {
        superthis(splitReaderSupplierelementsQueue, configurationsplitFetcherManager, splitFinishedHookrecordEmitter, null, config, context);
    }
}

SplitFetcherTask

Change SplitFetcherTask from Internal to PublicEvolving.

Code Block
@PublicEvolving
public interface SplitFetcherTask {}

SplitFetcher

Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher.

Code Block
@PublicEvolving
class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
 
 
    @Deprecated
    public SourceReaderBase(
     private final SplitReader<E, SplitT> splitReader;
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;,
    private final Consumer<Collection<String>> splitFinishedCallback;
    private finalSplitFetcherManager<E, intSplitT> fetcherIndex;splitFetcherManager,
    private volatile RecordsWithSplitIds<E> lastRecords;
    private volatile boolean wakeup;

 RecordEmitter<E, T, SplitStateT> FetchTask(recordEmitter,
            SplitReader<E,@Nullable SplitT>RecordEvaluator<T> splitReadereofRecordEvaluator,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>Configuration elementsQueueconfig,
            Consumer<Collection<String>> splitFinishedCallback,
    SourceReaderContext context) {
        //this.elementsQueue = elementsQueue;
        int fetcherIndex) {this.splitFetcherManager = splitFetcherManager;
        this.splitReaderrecordEmitter = splitReaderrecordEmitter;
        this.elementsQueuesplitStates = new elementsQueueHashMap<>();
        this.splitFinishedCallbackoptions = splitFinishedCallback new SourceReaderOptions(config);
        this.lastRecordsconfig = nullconfig;
        this.fetcherIndexcontext = fetcherIndexcontext;
        this.wakeupnoMoreSplitsAssignment = false;
        }
}

the constructor of SplitFetcher to the end users?

SourceReaderBase

  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.
  • Remove the direct operations to FutureCompletingBlockingQueue and utilize the functionality of SplitFetcherManager as an alternative.
Code Block
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
        implements SourceReader<T, SplitT> {  
 
 
    @Deprecatedthis.eofRecordEvaluator = eofRecordEvaluator;
 
        numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    } 
 
    // todo: provide a new constructor without FutureCompletingBlockingQueue
    public SourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        this(elementsQueue, splitFetcherManager, recordEmitter, null, config, context);
    }
 
 
    @Deprecated
 // todo: provide publica SourceReaderBase(
new constructor without FutureCompletingBlockingQueue
           FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        //this.elementsQueue = elementsQueue;
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.splitStates = new HashMap<>();
        this.options = new SourceReaderOptions(config);
        this.config = config;
        this.context = context;
        this.noMoreSplitsAssignment = false;
        this.eofRecordEvaluator = eofRecordEvaluator;
 
        numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }
}

SingleThreadMultiplexSourceReaderBase

Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.

Code Block
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { 

	@Depricated
	public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, 
 
    // todo: provide a new constructor without FutureCompletingBlockingQueue
    public SourceReaderBase(
            SplitFetcherManager<ESingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        thissuper(elementsQueue, splitFetcherManager, recordEmitter, null, config, context);
    	}
 
 
    	@Depricated
	public SourceReaderBaseSingleThreadMultiplexSourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManagerFutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            RecordEmitter<ESupplier<SplitReader<E, T, SplitStateT> recordEmitterSplitT>> splitReaderSupplier,
            @Nullable RecordEvaluator<T> eofRecordEvaluatorRecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        this.splitFetcherManager = splitFetcherManager;
 super(
         this.recordEmitter = recordEmitter;
     elementsQueue,
   this.splitStates = new HashMap<>();
        this.options = new SourceReaderOptions(config);SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config),
        this.config = config;
      recordEmitter,
  this.context = context;
        this.noMoreSplitsAssignment = false;
   config,
     this.eofRecordEvaluator = eofRecordEvaluator;
 
        numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }
}

SingleThreadMultiplexSourceReaderBase

Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.

Code Block
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { 

	@Depricated
	public SingleThreadMultiplexSourceReaderBase(;
	}


	// todo: Add new constructors without FutureCompletingBlockingQueue
 	public SingleThreadMultiplexSourceReaderBase(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>Configuration elementsQueueconfig,
            SingleThreadFetcherManager<E,SourceReaderContext SplitT>context) splitFetcherManager,{
        	super(
    RecordEmitter<E, T, SplitStateT> recordEmitter,
         new   Configuration configSingleThreadFetcherManager<>(splitReaderSupplier, config),,
             SourceReaderContext  context) {recordEmitter,
        super(elementsQueue, splitFetcherManager, recordEmitter, config,      config,
                context);
 	   }

	@Depricated
	public SingleThreadMultiplexSourceReaderBase(
    // todo: provide a new constructor without FutureCompletingBlockingQueue
     FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>public elementsQueue,SingleThreadMultiplexSourceReaderBase(
            Supplier<SplitReader<ESingleThreadFetcherManager<E, SplitT>>SplitT> splitReaderSuppliersplitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable ConfigurationRecordEvaluator<T> configeofRecordEvaluator,
            SourceReaderContext context) {Configuration config,
        super(
    SourceReaderContext context) {
          elementsQueue,super(
                new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config)splitFetcherManager,
                recordEmitter,
                configeofRecordEvaluator,
                context);
	}


	// todo: Add new constructors without FutureCompletingBlockingQueue
 	public SingleThreadMultiplexSourceReaderBase(
config,
                context);
  Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        	super(
                new SingleThreadFetcherManager<>(splitReaderSupplier, config),,
                recordEmitter,
                config,
                context);
    }

    public SingleThreadMultiplexSourceReaderBase(
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        super(
                splitFetcherManager,
                recordEmitter,
                eofRecordEvaluator,
                config,
                context);
    }
}

Proposed Changes

  • By exposing the SplitFetcherManager / SingleThreadFetcheManager, connector developers can easily create their own threading models in the SourceReaderBase, by implementing addSplits(), removeSplits() and 

    maybeShutdownFinishedFetchers() functions.

  • Note that the SplitFetcher constructor is package private, so users can only create SplitFetchers via SplitFetcherManager.createSplitFetcher(). This ensures each SplitFetcher is always owned by the SplitFetcherManager.
  •  This FLIP essentially embedded the element queue (a FutureCompletingBlockingQueue) instance into the SplitFetcherManager. This hides the element queue from the connector developers and simplifies the SourceReaderBase to consist of only SplitFetcherManager and RecordEmitter as major components.

Compatibility, Deprecation, and Migration Plan

The Connectors that utilize constructors (including param elementsQueue) of SingleThreadMultiplexSourceReaderBase, SourceReaderBase, and SingleThreadFetcherManager should be migrated to the new constructor that does not have elementsQueue parameter.

Mark the impacted constructors as deprecated in 1.19, and remove them in release of 2.0.

Test Plan

nothing else to do.

Rejected Alternatives

If SplitFetcherManager becomes PublicEvolving, that also means SplitFetcher needs to be PublicEvolving, because it is returned by the protected method SplitFetcherManager.createSplitFetcher()

However,  SplitFetcher requires FutureCompletingBlockingQueue as a constructor parameter. SplitFetcher is a class rather than Interface. Therefore, I want to  change SplitFetcher to a public Interface and moving its implementation details to an implement subclass .

Reject Reason

}
}


Proposed Changes

  • By exposing the SplitFetcherManager / SingleThreadFetcheManager, connector developers can easily create their own threading models in the SourceReaderBase, by implementing addSplits(), removeSplits() and 

    maybeShutdownFinishedFetchers() functions.

  • Note that the SplitFetcher constructor is package private, so users can only create SplitFetchers via SplitFetcherManager.createSplitFetcher(). This ensures each SplitFetcher is always owned by the SplitFetcherManager.
  •  This FLIP essentially embedded the element queue (a FutureCompletingBlockingQueue) instance into the SplitFetcherManager. This hides the element queue from the connector developers and simplifies the SourceReaderBase to consist of only SplitFetcherManager and RecordEmitter as major components.


Compatibility, Deprecation, and Migration Plan

The Connectors that utilize constructors (including param elementsQueue) of SingleThreadMultiplexSourceReaderBase, SourceReaderBase, and SingleThreadFetcherManager should be migrated to the new constructor that does not have elementsQueue parameter.

Mark the impacted constructors as deprecated in 1.19, and remove them in release of 2.0.


Test Plan

nothing else to do.


Rejected Alternatives

change SplitFetcher to a public Interface

If SplitFetcherManager becomes PublicEvolving,  that also means SplitFetcher needs to be PublicEvolving, because it is returned by the protected method SplitFetcherManager.createSplitFetcher()

However,  SplitFetcher requires FutureCompletingBlockingQueue as a constructor parameter. SplitFetcher is a class rather than Interface. Therefore, I want to  change SplitFetcher to a public Interface and moving its implementation details to an implement subclass .


Reject Reason

The constructor of the SplitFetcher is already package private. So it can only be accessed from the classes in the package org.apache.flink.connector.base.source.reader.fetcher.  And apparently,  user classes should not be in this package.  Therefore,  even if we mark the
SplitFetcher class as PublicEvolving, the constructor is not available to the users. Only the public and protected methods are considered public API
in this case. Private / package private methods and fields are still internal.



SplitFetcherManager expose poll / getAvailabilityFuture / notifyAvailable / noAvailableElement

  • Change SplitFetcherManager from Internal to PublicEvolving.
  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance internally.
  • Add a new internal method SplitFetcherManager#getQueue() for SourceReaderBase usage.


 @PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {       
    @Deprecated
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {
        });
    }      
 
    @Deprecated
    @VisibleForTesting
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
 
    }
 
 
 
 // todo: provide a new constructor without FutureCompletingBlockingQueue.    
 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(splitReaderFactory, configuration, (ignore) -> {
        });
         
  }
 
 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        this.elementsQueue = new FutureCompletingBlockingQueue<>(
                configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
        // ......
    }
 
    /**
     * returns the RecordsWithSplitIds produced by SplitReader.
    **/
    public RecordsWithSplitIds<E> poll(){
        return elementsQueue.poll();
    }
 
    /**
     * Returns the availability future. If the queue is non-empty, then this future will already be
     * complete. Otherwise the obtained future is guaranteed to get completed the next time the
     * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
     *
     * <p>It is important that a completed future is no guarantee that the next call to {@link
     * #poll()} will return a non-null element. If there are concurrent consumer, another consumer
     * may have taken the available element. Or there was no element in the first place, because the
     * future was completed through a call to {@link #notifyAvailable()}.
     *
     * <p>For that reason, it is important to call this method (to obtain a new future) every time
     * again after {@link #poll()} returned null and you want to wait for data.
     */
    public CompletableFuture<Void> getAvailabilityFuture(){
        return elementsQueue.getAvailabilityFuture();
    }
 
     /**
     * Makes sure the availability future is complete, if it is not complete already. All futures
     * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be
     * completed.
     *
     * <p>All future calls to the method will return a completed future, until the point that the
     * availability is reset via calls to {@link #poll()} that leave the queue empty.
     */
    public void notifyAvailable(){
        elementsQueue.notifyAvailable();
    }
 
   /** Checks whether is no data available. */
    public boolean noAvailableElement(){
        return elementsQueue.isEmpty();
    }
}

Reject Reason

SplitFetcherManager exposes 4 more methods, poll / getAvailabilityFuture / notifyAvailable / noAvailableElement, which are tightly coupled with the implementation of the elementQueue. The naming of these methods look weird, like what does it mean to "poll from a SplitFetcherManager" / "notify a SplitFetcherManager available"? To clarify these methods we have to explain to developers that "well we hide a queue inside SplitFetcherMamager and the poll method is actually polling from the queue". I'm afraid these methods will implicitly expose the concept and the implementation of the queue to developers. 


A cleaner solution would be having a new interface that extracts SplitFetcher creating and managing logic from the current SplitFetcherManager, but having too many concepts might make the entire Source API even harder to understand. To make a compromise,  only expose constructors of SplitFetcherManager as public APIs, and adding a new internal method SplitFetcherManager#getQueue() for SourceReaderBase (well it's a bit hacky , more worthy than exposing methods like poll and notifyAvailable on SplitFetcherManager )The constructor of the SplitFetcher is already package private. So it can only be accessed from the classes in the package org.apache.flink.connector.base.source.reader.fetcher. And apparently, user classes should not be in this package. Therefore, even if we mark the
SplitFetcher class as PublicEvolving, the constructor is not available to the users. Only the public and protected methods are considered public API
in this case. Private / package private methods and fields are still internal.