Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987
Vote threadhttps://lists.apache.org/thread/t1zff21z440pvv48jyhm8pgtqsyplchn

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-
33465
32417

Release1.19

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {    
    
	@Deprecated
    public SplitFetcherManagerSingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueueSupplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Supplier<SplitReader<ERecordEmitter<E, T, SplitT>>SplitStateT> splitReaderFactoryrecordEmitter,
            Configuration configuration) {
config,
          this(elementsQueue, splitReaderFactory, configuration,SourceReaderContext (ignorecontext) -> {
        });super(
    }       

    @Deprecated
    @VisibleForTesting
    public SplitFetcherManager(
new SingleThreadFetcherManager<>(splitReaderSupplier, config),
               FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueuerecordEmitter,
              Supplier<SplitReader<E, SplitT>> splitReaderFactoryconfig,
            Configuration configuration,
   context);
    }

    @Deprecated
   Consumer<Collection<String>> splitFinishedHook) {

	}



 // todo: provide a new constructor without FutureCompletingBlockingQueue.     
 public SplitFetcherManager(public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {
        });
    }       

    @Deprecated
  }

  // todo: provide a new constructor without FutureCompletingBlockingQueue.
  public SplitFetcherManager(  @VisibleForTesting
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {

	}



 // todo: provide a new constructor without thisFutureCompletingBlockingQueue.elementsQueue   = new FutureCompletingBlockingQueue<>(
 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
		// ......
	}

	
	@Internal
	public SplitFetcherManager#getQueue();
 }

SingleThreadFetcherManager

  • Change SingleThreadFetcherManager from Internal to PublicEvolving.
  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors without FutureCompletingBlockingQueue as replacements which creates the FutureCompletingBlockingQueue instance by its parent class(SplitFetcherManager) internally.
Code Block
@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
   ) {
        this(splitReaderFactory, configuration, (ignore) -> {
        });
     extends SplitFetcherManager<E, SplitT> {
   }

  // todo: provide a 	@Deprecated
new constructor without FutureCompletingBlockingQueue.
 public public SingleThreadFetcherManagerSplitFetcherManager(
            Supplier<SplitReader<E, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>SplitT>> elementsQueuesplitReaderFactory,
            Configuration Supplier<SplitReader<E, SplitT>> splitReaderSupplierconfiguration,
            Consumer<Collection<String>> splitFinishedHook) {
        this(.elementsQueue, splitReaderSupplier,= new ConfigurationFutureCompletingBlockingQueue<>());
    }

  	@Deprecated
    public SingleThreadFetcherManager(
     configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
		// ......
	}

	
	@Internal
	public FutureCompletingBlockingQueue getQueue();
 }


SingleThreadFetcherManager

  • Change SingleThreadFetcherManager from Internal to PublicEvolving.
  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors without FutureCompletingBlockingQueue as replacements which creates the FutureCompletingBlockingQueue instance by its parent class(SplitFetcherManager) internally.
Code Block
@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
       FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration)  extends SplitFetcherManager<E, SplitT> {  

   	@Deprecated
    public super(elementsQueue, splitReaderSupplier, configuration);SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
    }

    @Deprecated    this(elementsQueue, splitReaderSupplier, new Configuration());
    @VisibleForTesting}

  	@Deprecated
    public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration,) {
            Consumer<Collection<String>> splitFinishedHook) {
        supersuper(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
    }

  // todo: provide@Deprecated
 a new constructor without FutureCompletingBlockingQueue.@VisibleForTesting
     
  public SingleThreadFetcherManager(
            Supplier<SplitReader<EFutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
 SplitT>>  splitReaderSupplier) {
        this(splitReaderSupplierSupplier<SplitReader<E, new Configuration());SplitT>> splitReaderSupplier,
    }  

 // todo: provide a new constructor without FutureCompletingBlockingQueue. 
     public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,Configuration configuration,
            ConfigurationConsumer<Collection<String>> configurationsplitFinishedHook) {
        super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
    }

  // todo: provide a new constructor without FutureCompletingBlockingQueuewithout FutureCompletingBlockingQueue.     
     public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,) {
        super(splitReaderSupplier, new Configuration());
    }  

 // todo: provide a new constructor  Configuration configurationwithout FutureCompletingBlockingQueue. 
     public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Consumer<Collection<String>>Configuration splitFinishedHookconfiguration) {
        super(splitReaderSupplier, configuration, splitFinishedHook);
    }
}

SplitFetcherTask

Change SplitFetcherTask from Internal to PublicEvolving.

Code Block
@PublicEvolving
public interface SplitFetcherTask {}

SplitFetcher

Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher, so it can only created by org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#createSplitFetcher

Code Block
@PublicEvolving
 public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {    
	SplitFetcher(
  

 // todo: provide a new constructor without FutureCompletingBlockingQueue. 
     public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration,
          int id,
 Consumer<Collection<String>> splitFinishedHook) {
         FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,super(splitReaderSupplier, configuration, splitFinishedHook);
    }
}


SplitFetcherTask

Change SplitFetcherTask from Internal to PublicEvolving.

Code Block
@PublicEvolving
public interface SplitFetcherTask {}



SplitFetcher

Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher, so it can only created by org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#createSplitFetcher

Code Block
@PublicEvolving
 public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {    
	SplitFetcher(
            int id        SplitReader<E, SplitT> splitReader,
            Consumer<Throwable> errorHandler,
            Runnable shutdownHook,
            Consumer<Collection<String>>FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> splitFinishedHookelementsQueue,
            boolean SplitReader<E, SplitT> splitReader,
            Consumer<Throwable> errorHandler,
            Runnable shutdownHook,
            Consumer<Collection<String>> splitFinishedHook,
            boolean allowUnalignedSourceSplits);
}

...

  • Change SplitFetcherManager from Internal to PublicEvolving.
  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance internally.
  • Add a new internal method SplitFetcherManager#getQueue() for SourceReaderBase usage.internal method SplitFetcherManager#getQueue() for SourceReaderBase usage.
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {       
    @Deprecated
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {
        });
    }      
 
    @Deprecated
    @VisibleForTesting
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
 
    }
 
 
 
 // todo: provide a new constructor without FutureCompletingBlockingQueue.    
 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(splitReaderFactory, configuration, (ignore) -> {
        });
         
  }
 
  // todo: provide a new constructor without FutureCompletingBlockingQueue.
  public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        this.elementsQueue = new FutureCompletingBlockingQueue<>(
                configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
        // ......
    }
 
     
    @Internal
    public SplitFetcherManager#getQueue();
 


 @PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {       
    @Deprecated
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {
        });
    }      
 
    @Deprecated
    @VisibleForTesting
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
 
    }
 
 
 
 // todo: provide a new constructor without FutureCompletingBlockingQueue.    
 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(splitReaderFactory, configuration, (ignore) -> {
        });
         
  }
 
 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        this.elementsQueue = new FutureCompletingBlockingQueue<>(
                configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
        // ......
    }
 
    /**
     * returns the RecordsWithSplitIds produced by SplitReader.
    **/
    public RecordsWithSplitIds<E> poll(){
        return elementsQueue.poll();
    }
 
    /**
     * Returns the availability future. If the queue is non-empty, then this future will already be
     * complete. Otherwise the obtained future is guaranteed to get completed the next time the
     * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
     *
     * <p>It is important that a completed future is no guarantee that the next call to {@link
     * #poll()} will return a non-null element. If there are concurrent consumer, another consumer
     * may have taken the available element. Or there was no element in the first place, because the
     * future was completed through a call to {@link #notifyAvailable()}.
     *
     * <p>For that reason, it is important to call this method (to obtain a new future) every time
     * again after {@link #poll()} returned null and you want to wait for data.
     */
    public CompletableFuture<Void> getAvailabilityFuture(){
        return elementsQueue.getAvailabilityFuture();
    }
 
     /**
     * Makes sure the availability future is complete, if it is not complete already. All futures
     * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be
     * completed.
     *
     * <p>All future calls to the method will return a completed future, until the point that the
     * availability is reset via calls to {@link #poll()} that leave the queue empty.
     */
    public void notifyAvailable(){
        elementsQueue.notifyAvailable();
    }
 
   /** Checks whether is no data available. */
    public boolean noAvailableElement(){
        return elementsQueue.isEmpty();
    }

}

Reject Reason

SplitFetcherManager exposes 4 more methods, poll / getAvailabilityFuture / notifyAvailable / noAvailableElement, which are tightly coupled with the implementation of the elementQueue. The naming of these methods look weird, like what does it mean to "poll from a SplitFetcherManager" / "notify a SplitFetcherManager available"? To clarify these methods we have to explain to developers that "well we hide a queue inside SplitFetcherMamager and the poll method is actually polling from the queue". I'm afraid these methods will implicitly expose the concept and the implementation of the queue to developers. 

...