Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Though the SingleThreadFetcherManager is annotated as Internal, it actually acts as some-degree public API, which is widely used in many connector projects:
flink-cdc-connectorflink-connector-mongodb and soon.

...

More over, even the constructor of 
of SingleThreadMultiplexSourceReaderBase   (which is PublicEvolving) includes the params of SingleThreadFetcherManager   and FutureCompletingBlockingQueue  . That means that the SingleThreadFetcherManager  and 
FutureCompletingBlockingQueue  have already been exposed to users for a long time and are widely used.

...

  • Change SplitFetcherManager from Internal to PublicEvolving.
  • Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance internally.
  • Add a few new methods to expose the functionality of the internal FutureCompletingBlockingQueue via the SplitFetcherManager.
Code Block
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {        
	@Deprecated
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {
        });
    }       

    @Deprecated
    @VisibleForTesting
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {

	}



 // todo: provide a new constructor without FutureCompletingBlockingQueue.     
 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(splitReaderFactory, configuration, (ignore) -> {
        });
        
  }

 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        this.elementsQueue = new FutureCompletingBlockingQueue<>(
                configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
		// ......
	}

	/**
	 * returns the RecordsWithSplitIds produced by SplitReader.
	**/
	public RecordsWithSplitIds<E> poll(){
		return elementsQueue.poll();
	}

 	/**
     * Returns the availability future. If the queue is non-empty, then this future will already be
     * complete. Otherwise the obtained future is guaranteed to get completed the next time the
     * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
     *
     * <p>It is important that a completed future is no guarantee that the next call to {@link
     * #poll()} will return a non-null element. If there are concurrent consumer, another consumer
     * may have taken the available element. Or there was no element in the first place, because the
     * future was completed through a call to {@link #notifyAvailable()}.
     *
     * <p>For that reason, it is important to call this method (to obtain a new future) every time
     * again after {@link #poll()} returned null and you want to wait for data.
     */
	public CompletableFuture<Void> getAvailabilityFuture(){
 		return elementsQueue.getAvailabilityFuture();
	}

     /**
     * Makes sure the availability future is complete, if it is not complete already. All futures
     * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be
     * completed.
     *
     * <p>All future calls to the method will return a completed future, until the point that the
     * availability is reset via calls to {@link #poll()} that leave the queue empty.
     */
	public void notifyAvailable(){
		elementsQueue.notifyAvailable();
	}

   /** Checks whether is no data available. */
 	public boolean noAvailableElement(){
		return elementsQueue.isEmpty();
 	}
}

...

Code Block
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { 

	@Depricated
	public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
 	}

	@Depricated
	public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                elementsQueue,
                new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config),
                recordEmitter,
                config,
                context);
	}


	// todo: Add new constructors without FutureCompletingBlockingQueue
 	public SingleThreadMultiplexSourceReaderBase(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        	super(
                new SingleThreadFetcherManager<>(splitReaderSupplier, config),,
                recordEmitter,
                config,
                context);
    }

    public SingleThreadMultiplexSourceReaderBase(
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        super(
                splitFetcherManager,
                recordEmitter,
                eofRecordEvaluator,
                config,
                context);
    }
}


Proposed Changes

  • By exposing the SplitFetcherManager / SingleThreadFetcheManager, by implementing addSplits() and removeSplits(), connector developers can easily create their own threading models in the SourceReaderBase.
  • Note that the SplitFetcher constructor is package private, so users can only create SplitFetchers via SplitFetcherManager.createSplitFetcher(). This ensures each SplitFetcher is always owned by the SplitFetcherManager.
  •  This FLIP essentially embedded the element queue (a FutureCompletingBlockingQueue) instance into the SplitFetcherManager. This hides the element queue from the connector developers and simplifies the SourceReaderBase to consist of only SplitFetcherManager and RecordEmitter as major components.


Compatibility, Deprecation, and Migration Plan

...