Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.


Discussion threadhttps://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987
Vote thread

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33465

Release1.19

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Though the SingleThreadFetcherManager is annotated as Internal, it actually acts as some-degree public API, which is widely used in many connector projects:
flink-cdc-connectorflink-connector-mongodb and soon.

...

Therefore, why not make SingleThreadFetcherManager PublicEvolving?


More Let's go back to more contexts from the origin design: 

...

2. For FutureCompletingBlockingQueue, as a hindsight, it might be better to not expose it to the Source developers. They are unlikely to use it anywhere other than just constructing it. The reason that FutureCompletingBlockingQueue is currently exposed in the SourceReaderBase constructor is because both the SplitFetcherManager and SourceReaderBase need it. One way to hide the FutureCompletingBlockingQueue from the public API is to make SplitFetcherManager the only owner class of the queue, and expose some of its methods via SplitFetcherManager. This way, the SourceReaderBase can invoke the methods via SplitFetcherManager. I believe this also makes the code slightly cleaner.


In summary, this flip has 2 goals:

  • Annotate SingleThreadFetcherManager as PublicEvolving.
  • Shield FutureCompletingBlockingQueue from users and limit all operations on FutureCompletingBlockingQueue in FetcherManager.


Public Interfaces

SplitFetcherManager

  • Change

...

  • SplitFetcherManager

...

  • from Internal

...

  • to PublicEvolving.

Proposed Changes

...

  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance internally.
  • Add a few new methods to expose the functionality of the internal FutureCompletingBlockingQueue via the SplitFetcherManager.
Code Block
@PublicEvolving
public abstract class SourceReaderBase<E, TSplitFetcherManager<E, SplitT extends SourceSplit,SourceSplit> SplitStateT>
{          implements SourceReader<T, SplitT> {  
 
 
    
	@Deprecated
    public SourceReaderBaseSplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<ESupplier<SplitReader<E, SplitT>SplitT>> splitFetcherManagersplitReaderFactory,
            RecordEmitter<E, T, SplitStateT> recordEmitter,Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) Configuration-> config,{
            SourceReaderContext context) {});
    }      this(elementsQueue, splitFetcherManager, recordEmitter, null, config, context);  

    }@Deprecated
 
 
    @Deprecated@VisibleForTesting
    public SourceReaderBaseSplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<ESupplier<SplitReader<E, SplitT>SplitT>> splitFetcherManagersplitReaderFactory,
            RecordEmitter<E,Configuration Tconfiguration, SplitStateT> recordEmitter,
            @NullableConsumer<Collection<String>> RecordEvaluator<T>splitFinishedHook) eofRecordEvaluator,
        {

	}



 // todo: provide a new constructor without FutureCompletingBlockingQueue.     
 Configurationpublic config,SplitFetcherManager(
            SourceReaderContextSupplier<SplitReader<E, context)SplitT>> {splitReaderFactory,
        //this.elementsQueue = elementsQueue;    Configuration configuration) {
        this.splitFetcherManager = splitFetcherManager;(splitReaderFactory, configuration, (ignore) -> {
        this.recordEmitter = recordEmitter});
        
 this.splitStates = new HashMap<>(); }

 public SplitFetcherManager(
        this.options = new SourceReaderOptions(config);
    Supplier<SplitReader<E, SplitT>> splitReaderFactory,
     this.config = config;
     Configuration configuration,
  this.context = context;
        this.noMoreSplitsAssignment = false;Consumer<Collection<String>> splitFinishedHook) {
        this.eofRecordEvaluatorelementsQueue = eofRecordEvaluator;
 
new FutureCompletingBlockingQueue<>(
              numRecordsInCounter = contextconfiguration.metricGroupgetInteger().getIOMetricGroup().getNumRecordsInCounter(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
    } 
 
    // todo: provide a new constructor without FutureCompletingBlockingQueue
    public SourceReaderBase(		// ......
	}

	/**
	 * returns the RecordsWithSplitIds produced by SplitReader.
	**/
	public RecordsWithSplitIds<E> poll(){
		return elementsQueue.poll();
	}

 	/**
     * Returns the availability future. If the SplitFetcherManager<E,queue SplitT>is splitFetcherManagernon-empty,
 then this future will already be
     * RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {complete. Otherwise the obtained future is guaranteed to get completed the next time the
     * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
     *
   this(splitFetcherManager, recordEmitter, null,* config, context);
    }
 
 
    public SourceReaderBase(
   <p>It is important that a completed future is no guarantee that the next call to {@link
     * #poll()} will return SplitFetcherManager<E, SplitT> splitFetcherManager,
       a non-null element. If there are concurrent consumer, another consumer
     RecordEmitter<E,* T,may SplitStateT>have recordEmitter,
taken the available element. Or there was no element in the first @Nullableplace, RecordEvaluator<T>because eofRecordEvaluator,the
     * future was completed through a call Configuration config,to {@link #notifyAvailable()}.
     *
     * <p>For SourceReaderContextthat context) {
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.splitStates = new HashMap<>();
        this.options = new SourceReaderOptions(config);
        this.config = config;
        this.context = context;
        this.noMoreSplitsAssignment = false;
        this.eofRecordEvaluator = eofRecordEvaluator;
 
        numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }
}reason, it is important to call this method (to obtain a new future) every time
     * again after {@link #poll()} returned null and you want to wait for data.
     */
	public CompletableFuture<Void> getAvailabilityFuture(){
 		return elementsQueue.getAvailabilityFuture();
	}

     /**
     * Makes sure the availability future is complete, if it is not complete already. All futures
     * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be
     * completed.
     *
     * <p>All future calls to the method will return a completed future, until the point that the
     * availability is reset via calls to {@link #poll()} that leave the queue empty.
     */
	public void notifyAvailable(){
		elementsQueue.notifyAvailable();
	}

   /** Checks whether is no data available. */
 	public boolean noAvailableElement(){
		return elementsQueue.isEmpty();
 	}
}


SingleThreadFetcherManager

  • Change SingleThreadFetcherManager from Internal to PublicEvolving.
  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors without FutureCompletingBlockingQueue as replacements which creates the FutureCompletingBlockingQueue instance by its parent class(SplitFetcherManager) internally.
Code Block
@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
        extends SplitFetcherManager<E, SplitT> {  

   	@Deprecated
    public SingleThreadFetcherManager
Code Block
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { 

	@Depricated
	public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
 	}

	@Depricated
	public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,) {
        this(elementsQueue, splitReaderSupplier, new  RecordEmitter<E, T, SplitStateT> recordEmitter,Configuration());
    }

  	@Deprecated
    public SingleThreadFetcherManager(
     Configuration config,
      FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
     SourceReaderContext context) {
     Supplier<SplitReader<E, SplitT>>  super(splitReaderSupplier,
            Configuration configuration) {
  elementsQueue,
      super(elementsQueue, splitReaderSupplier, configuration);
    }

    new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config),
   @Deprecated
    @VisibleForTesting
    public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> recordEmitterelementsQueue,
            Supplier<SplitReader<E, SplitT>>   configsplitReaderSupplier,
            Configuration configuration,
    context);
	}


	        Consumer<Collection<String>> splitFinishedHook) {
        super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
    }

  // todo: Addprovide a new constructors without FutureCompletingBlockingQueue
 	public SingleThreadMultiplexSourceReaderBase constructor without FutureCompletingBlockingQueue.     
  public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,) {
         this(splitReaderSupplier, new Configuration());
   RecordEmitter<E, T, SplitStateT> recordEmitter,
 }


    public SingleThreadFetcherManager(
            Supplier<SplitReader<E, ConfigurationSplitT>> configsplitReaderSupplier,
            SourceReaderContextConfiguration contextconfiguration) {
        	super(splitReaderSupplier, configuration);
    }

    public SingleThreadFetcherManager(
           new SingleThreadFetcherManager<>(splitReaderSupplierSupplier<SplitReader<E, SplitT>> config),splitReaderSupplier,
            Configuration    recordEmitterconfiguration,
            Consumer<Collection<String>>    config,splitFinishedHook) {
        super(splitReaderSupplier,        contextconfiguration, splitFinishedHook);
    }

    public SingleThreadMultiplexSourceReaderBase(
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,}


SplitFetcherTask

Change SplitFetcherTask from Internal to PublicEvolving.

Code Block
@PublicEvolving
public interface SplitFetcherTask {}



SplitFetcher

Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher.

Code Block
@PublicEvolving
class FetchTask<E, SplitT extends SourceSplit> implements SplitFetcherTask {
    private final SplitReader<E, SplitT> splitReader;
    private final FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue;
    private @Nullablefinal RecordEvaluator<T>Consumer<Collection<String>> eofRecordEvaluator,splitFinishedCallback;
    private final int fetcherIndex;
    private Configuration config,volatile RecordsWithSplitIds<E> lastRecords;
    private volatile boolean wakeup;

    FetchTask(
 SourceReaderContext context) {
        super(
 SplitReader<E, SplitT> splitReader,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> splitFetcherManagerelementsQueue,
            Consumer<Collection<String>>    recordEmittersplitFinishedCallback,
            int    eofRecordEvaluator,fetcherIndex) {
        this.splitReader = splitReader;
      config,
  this.elementsQueue = elementsQueue;
        this.splitFinishedCallback =   context)splitFinishedCallback;
    }
}

3. Mark SplitFetcherManager andSingleThreadFetcherManager as `@PublicEvolving`,  mark constructor of SplitFetcherManager and SingleThreadFetcherManager as  @Depricated and provide a new constructor without FutureCompletingBlockingQueue. 

Code Block
titleSplitFetcherManager
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitTthis.lastRecords extends= SourceSplit>null;
 {        
	@Deprecated
    public SplitFetcherManager(
 this.fetcherIndex = fetcherIndex;
        this.wakeup = false;
    }
}


the constructor of SplitFetcher to the end users?

SourceReaderBase

Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.

Code Block
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
  implements SourceReader<T, SplitT> {  
 
 
   Configuration configuration) {@Deprecated
    public    this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {
SourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> });
  elementsQueue,
  }       

    @Deprecated
  SplitFetcherManager<E, SplitT> @VisibleForTestingsplitFetcherManager,
    public SplitFetcherManager(
       RecordEmitter<E, T, SplitStateT>   FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueuerecordEmitter,
            Supplier<SplitReader<E, SplitT>> splitReaderFactoryConfiguration config,
            Configuration configuration,SourceReaderContext context) {
        this(elementsQueue, splitFetcherManager, recordEmitter, null, Consumer<Collection<String>>config, splitFinishedHookcontext) {

	;
    }
 

 
 // todo: provide a@Deprecated
 new constructor without FutureCompletingBlockingQueue.     
 public SplitFetcherManagerpublic SourceReaderBase(
            Supplier<SplitReader<E, SplitT>> splitReaderFactoryFutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            ConfigurationSplitFetcherManager<E, configuration)SplitT> {splitFetcherManager,
        this(splitReaderFactory, configuration, (ignore) -> {
RecordEmitter<E, T, SplitStateT> recordEmitter,
     });
       @Nullable 
RecordEvaluator<T>  }eofRecordEvaluator,

 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactoryConfiguration config,
            Configuration configuration,SourceReaderContext context) {
        //this.elementsQueue = elementsQueue;
        this.splitFetcherManager = splitFetcherManager;
    Consumer<Collection<String>> splitFinishedHook) {    this.recordEmitter = recordEmitter;
        this.elementsQueuesplitStates = new FutureCompletingBlockingQueue<>HashMap<>();
        this.options = new SourceReaderOptions(config);
       configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
		// ......
}
Code Block
titleSingleThreadFetcherManager
@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
   this.config = config;
        this.context = context;
      extends SplitFetcherManager<E, SplitT>this.noMoreSplitsAssignment {=  false;

   	@Deprecated
     public SingleThreadFetcherManager(this.eofRecordEvaluator = eofRecordEvaluator;
 
        numRecordsInCounter =  FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
 context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    } 
 
    // todo: provide a new constructor without FutureCompletingBlockingQueue
  Supplier<SplitReader<E, SplitT>> splitReaderSupplier)public {SourceReaderBase(
        this(elementsQueue, splitReaderSupplier, new Configuration());
    }

  	@Deprecated
 SplitFetcherManager<E, SplitT> splitFetcherManager,
      public SingleThreadFetcherManager(
     RecordEmitter<E, T,      FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueueSplitStateT> recordEmitter,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplierConfiguration config,
            ConfigurationSourceReaderContext configurationcontext) {
        super(elementsQueuethis(splitFetcherManager, recordEmitter, null, splitReaderSupplierconfig, configurationcontext);
    }
 
 
   @Deprecated public SourceReaderBase(
    @VisibleForTesting
    public SingleThreadFetcherManager(    SplitFetcherManager<E, SplitT> splitFetcherManager,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueueRecordEmitter<E, T, SplitStateT> recordEmitter,
            Supplier<SplitReader<E,@Nullable SplitT>>RecordEvaluator<T> splitReaderSuppliereofRecordEvaluator,
            Configuration configurationconfig,
            Consumer<Collection<String>>SourceReaderContext splitFinishedHookcontext) {
        super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook)this.splitFetcherManager = splitFetcherManager;
    }

  // todo: providethis.recordEmitter a= new constructor without FutureCompletingBlockingQueue.  recordEmitter;
        
this.splitStates = publicnew SingleThreadFetcherManagerHashMap<>();
        this.options =   Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {new SourceReaderOptions(config);
        this(splitReaderSupplier, new Configuration()).config = config;
     }


   this.context public= SingleThreadFetcherManager(context;
        this.noMoreSplitsAssignment = false;
   Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
   this.eofRecordEvaluator = eofRecordEvaluator;
 
      Configuration configuration) {
numRecordsInCounter        super(splitReaderSupplier, configuration= context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }

    public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,}

SingleThreadMultiplexSourceReaderBase

Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.

Code Block
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { 

	@Depricated
	public SingleThreadMultiplexSourceReaderBase(
            ConfigurationFutureCompletingBlockingQueue<RecordsWithSplitIds<E>> configurationelementsQueue,
            Consumer<Collection<String>>SingleThreadFetcherManager<E, splitFinishedHook)SplitT> {splitFetcherManager,
         super(splitReaderSupplier   RecordEmitter<E, configurationT, splitFinishedHook);
 SplitStateT> recordEmitter,
      }
}

4. SplitFetcherManager provides  wrapper methods for FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.

Code Block
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit>Configuration {
	
	/**
	 * returns the RecordsWithSplitIds produced by SplitReader.
	**/
	public RecordsWithSplitIds<E> poll(){
		return elementsQueue.poll();
	}

 	/**
     * Returns the availability future. If the queue is non-empty, then this future will already be
     * complete. Otherwise the obtained future is guaranteed to get completed the next time the
     * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.config,
            SourceReaderContext context) {
        super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
 	}

	@Depricated
	public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
     *
     * <p>It isSourceReaderContext importantcontext) that{
 a completed future is no guarantee that the next call to {@link super(
     * #poll()} will return a non-null element. If there are concurrent consumerelementsQueue, another consumer
     * may have taken the available element. Or there was no element in the first place, because the
new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config),
       * future was completed through a call to {@link #notifyAvailable()}. recordEmitter,
     *
     * <p>For that reason, it is important to call this method (to obtain a new future) every time
 config,
            * again after {@link #poll()} returned null and you want to wait for data.
     */
	public CompletableFuture<Void> getAvailabilityFuture(){
 		return elementsQueue.getAvailabilityFuture();
	}

     /**
context);
	}


	// todo: Add new constructors without FutureCompletingBlockingQueue
 	public SingleThreadMultiplexSourceReaderBase(
          * Makes sure the availability future is complete, if it is not complete already. All futuresSupplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
     * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be
 Configuration config,
        * completed.
   SourceReaderContext context) *{
     * <p>All future calls	super(
 to the method will return a completed future, until the point that the
              new SingleThreadFetcherManager<>(splitReaderSupplier, config),,
              * availability isrecordEmitter,
 reset via calls to {@link #poll()} that leave the queue empty.
     */
	public void notifyAvailable(){
		elementsQueue.notifyAvailable();
	}

config,
      /** Checks whether is no data available. */
 	public boolean noAvailableElement(context){
		return elementsQueue.isEmpty();
 	}
}

5. Mark SplitFetcher and SplitFetcherTask as PublicEvolving.

@PublicEvolving public interface SplitFetcherTask {}
Code Block
@PublicEvolving
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
    FetchTask(
;
    }

    public SingleThreadMultiplexSourceReaderBase(
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, SplitReader<ET, SplitT>SplitStateT> splitReaderrecordEmitter,
            @Nullable FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>RecordEvaluator<T> elementsQueueeofRecordEvaluator,
            Consumer<Collection<String>>Configuration splitFinishedCallbackconfig,
            intSourceReaderContext fetcherIndexcontext) {
        super(
    this.splitReader = splitReader;
        this.elementsQueue = elementsQueue;splitFetcherManager,
        this.splitFinishedCallback = splitFinishedCallback;
        recordEmitter,
                eofRecordEvaluator,
         this.lastRecords = null;
       config,
              this.fetcherIndex = fetcherIndex;
        this.wakeup = false;
    }
}
Code Block
  context);
    }
}


Proposed Changes

  • By exposing the SplitFetcherManager / SingleThreadFetcheManager, by implementing addSplits() and removeSplits(), connector developers can easily create their own threading models in the SourceReaderBase.
  • Note that the SplitFetcher constructor is package private, so users can only create SplitFetchers via SplitFetcherManager.createSplitFetcher(). This ensures each SplitFetcher is always owned by the SplitFetcherManager.
  •  This FLIP essentially embedded the element queue (a FutureCompletingBlockingQueue) instance into the SplitFetcherManager. This hides the element queue from the connector developers and simplifies the SourceReaderBase to consist of only SplitFetcherManager and RecordEmitter as major components.


Compatibility, Deprecation, and Migration Plan

The Connectors that utilize constructors (including param elementsQueue) of SingleThreadMultiplexSourceReaderBase, SourceReaderBase, and SingleThreadFetcherManager should be migrated to the new constructor that does not have elementsQueue parameter.

Mark the impacted constructors as deprecated in 1.19, and remove them in release of 2.0.


Test Plan

nothing else to do.

...

If SplitFetcherManager becomes PublicEvolving, that also means SplitFetcher needs to be PublicEvolving, because it is returned by the protected method SplitFetcherManager.createSplitFetcher()

However,  SplitFetcher requires  requires FutureCompletingBlockingQueue as a constructor parameter. SplitFetcher  SplitFetcher is a class rather than Interface. Therefore, I want to  change SplitFetcher to a public Interface and moving its implementation details to an implement SplitFetcher to a public Interface and moving its implementation details to an implement subclass .


Reject Reason

The constructor of the SplitFetcher is already package private. So it can only be accessed from the classes in the package org.apache.flink.connector.basebase.source.reader.fetcher. And apparently, user classes should not be in this package. Therefore, even if we mark the
SplitFetcher class as PublicEvolving, the constructor is not available to the users. Only the public and protected methods are considered public API
in this case. Private / package private methods and fields are still internal.