Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

nothing else to do.


Rejected Alternatives

change SplitFetcher to a public Interface

If SplitFetcherManager becomes PublicEvolving,  that also means SplitFetcher needs to be PublicEvolving, because it is returned by the protected method SplitFetcherManager.createSplitFetcher()

...

The constructor of the SplitFetcher is already package private. So it can only be accessed from the classes in the package org.apache.flink.connector.base.source.reader.fetcher.  And apparently,  user classes should not be in this package.  Therefore,  even if we mark the
SplitFetcher class as PublicEvolving, the constructor is not available to the users. Only the public and protected methods are considered public API
in this case. Private / package private methods and fields are still internal.



SplitFetcherManager expose poll / getAvailabilityFuture / notifyAvailable / noAvailableElement

  • Change SplitFetcherManager from Internal to PublicEvolving.
  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance internally.
  • Add a new internal method SplitFetcherManager#getQueue() for SourceReaderBase usage.

@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {       
    @Deprecated
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {
        });
    }      
 
    @Deprecated
    @VisibleForTesting
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
 
    }
 
 
 
 // todo: provide a new constructor without FutureCompletingBlockingQueue.    
 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(splitReaderFactory, configuration, (ignore) -> {
        });
         
  }
 
  // todo: provide a new constructor without FutureCompletingBlockingQueue.
  public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        this.elementsQueue = new FutureCompletingBlockingQueue<>(
                configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
        // ......
    }
 
     
    @Internal
    public SplitFetcherManager#getQueue();
 }

Reject Reason

SplitFetcherManager exposes 4 more methods, poll / getAvailabilityFuture / notifyAvailable / noAvailableElement, which are tightly coupled with the implementation of the elementQueue. The naming of these methods look weird, like what does it mean to "poll from a SplitFetcherManager" / "notify a SplitFetcherManager available"? To clarify these methods we have to explain to developers that "well we hide a queue inside SplitFetcherMamager and the poll method is actually polling from the queue". I'm afraid these methods will implicitly expose the concept and the implementation of the queue to developers. 


A cleaner solution would be having a new interface that extracts SplitFetcher creating and managing logic from the current SplitFetcherManager, but having too many concepts might make the entire Source API even harder to understand. To make a compromise,  only expose constructors of SplitFetcherManager as public APIs, and adding a new internal method SplitFetcherManager#getQueue() for SourceReaderBase (well it's a bit hacky , more worthy than exposing methods like poll and notifyAvailable on SplitFetcherManager ).