Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {        
	@Deprecated
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {
        });
    }       

    @Deprecated
    @VisibleForTesting
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {

	}



 // todo: provide a new constructor without FutureCompletingBlockingQueue.     
 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(splitReaderFactory, configuration, (ignore) -> {
        });
        
  }

 public // todo: provide a new constructor without FutureCompletingBlockingQueue.
  public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        this.elementsQueue = new FutureCompletingBlockingQueue<>(
                configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
		// ......
	}

	/**
	 * returns the RecordsWithSplitIds produced by SplitReader.
	**/
	public RecordsWithSplitIds<E> poll(){
		return elementsQueue.poll();
	}

 	/**
     * Returns the availability future. If the queue is non-empty, then this future will already be
     * complete. Otherwise the obtained future is guaranteed to get completed the next time the
     * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
     *
     * <p>It is important that a completed future is no guarantee that the next call to {@link
     * #poll()} will return a non-null element. If there are concurrent consumer, another consumer
     * may have taken the available element. Or there was no element in the first place, because the
     * future was completed through a call to {@link #notifyAvailable()}.
     *
     * <p>For that reason, it is important to call this method (to obtain a new future) every time
     * again after {@link #poll()} returned null and you want to wait for data.
     */
	public CompletableFuture<Void> getAvailabilityFuture(){
 		return elementsQueue.getAvailabilityFuture();
	}

     /**
     * Makes sure the availability future is complete, if it is not complete already. All futures
     * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be
     * completed.
     *
     * <p>All future calls to the method will return a completed future, until the point that the
     * availability is reset via calls to {@link #poll()} that leave the queue empty.
     */
	public void notifyAvailable(){
		elementsQueue.notifyAvailable();
	}

   /** Checks whether is no data available. */
 	public boolean noAvailableElement(){
		return elementsQueue.isEmpty();
 	}
}

...

Code Block
@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
        extends SplitFetcherManager<E, SplitT> {  

   	@Deprecated
    public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        this(elementsQueue, splitReaderSupplier, new Configuration());
    }

  	@Deprecated
    public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration) {
        super(elementsQueue, splitReaderSupplier, configuration);
    }

    @Deprecated
    @VisibleForTesting
    public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
    }

  // todo: provide a new constructor without FutureCompletingBlockingQueue.     
  public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        this(splitReaderSupplier, new Configuration());
    }  

 // todo: provide a new constructor without FutureCompletingBlockingQueue. 
      public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration) {
        super(splitReaderSupplier, configuration);
    }

 // todo: provide a new constructor without FutureCompletingBlockingQueue. 
     public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        super(splitReaderSupplier, configuration, splitFinishedHook);
    }
}

...

Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher, so it can only created by org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#createSplitFetcher

Code Block
@Internal@PublicEvolving
 public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {    
	SplitFetcher(
            int id,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitReader<E, SplitT> splitReader,
            Consumer<Throwable> errorHandler,
            Runnable shutdownHook,
            Consumer<Collection<String>> splitFinishedHook,
            boolean allowUnalignedSourceSplits);
}

...