You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 10 Next »

Discussion threadhttps://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987
Vote thread

JIRA

Unable to render Jira issues macro, execution error.

Release1.19

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).Motivation


Though the SingleThreadFetcherManager is annotated as Internal, it actually acts as some-degree public API, which is widely used in many connector projects:
flink-cdc-connectorflink-connector-mongodb and soon.


More over, even the constructor of 
SingleThreadMultiplexSourceReaderBase   (which is PublicEvolving) includes the params of SingleThreadFetcherManager   and FutureCompletingBlockingQueue  . That means that the SingleThreadFetcherManager  and 
FutureCompletingBlockingQueue  have already been exposed to users for a long time and are widely used.

public SingleThreadMultiplexSourceReaderBase(
    FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
    SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
    RecordEmitter<E, T, SplitStateT> recordEmitter,
    Configuration config,
    SourceReaderContext context)
{ super(elementsQueue, splitFetcherManager, recordEmitter, config, context); }


As shown in FLINK-31324, FLINK-28853 used to change the default constructor of SingleThreadFetcherManager.However, it influenced a lot. Finally, the former constructor was added back and marked asDeprecated。

Therefore, why not make SingleThreadFetcherManager PublicEvolving?


More contexts from the origin design: 

1. The original design of SplitFetcherManager and its subclasses was to make them public to the Source developers. The goal is to let us take care of the threading model, while the Source developers can just focus on the SplitReader implementation. Therefore, I think making SplitFetcherManater / SingleThreadFetcherManager public aligns with the original design. That is also why these classes are exposed in the constructor of SourceReaderBase.

2. For FutureCompletingBlockingQueue, as a hindsight, it might be better to not expose it to the Source developers. They are unlikely to use it anywhere other than just constructing it. The reason that FutureCompletingBlockingQueue is currently exposed in the SourceReaderBase constructor is because both the SplitFetcherManager and SourceReaderBase need it. One way to hide the FutureCompletingBlockingQueue from the public API is to make SplitFetcherManager the only owner class of the queue, and expose some of its methods via SplitFetcherManager. This way, the SourceReaderBase can invoke the methods via SplitFetcherManager. I believe this also makes the code slightly cleaner.


Public Interfaces

Change SingleThreadFetcherManager, SplitFetcherManager,  SplitFetcher and SplitFetcherTask from Internal  to PublicEvolving .


Proposed Changes

  1. Mark constructor of SourceReaderBase and SingleThreadMultiplexSourceReaderBase as @Depricated and provide a new constructor without  FutureCompletingBlockingQueue
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
        implements SourceReader<T, SplitT> {  
 
 
    @Deprecated
    public SourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        this(elementsQueue, splitFetcherManager, recordEmitter, null, config, context);
    }
 
 
    @Deprecated
    public SourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        //this.elementsQueue = elementsQueue;
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.splitStates = new HashMap<>();
        this.options = new SourceReaderOptions(config);
        this.config = config;
        this.context = context;
        this.noMoreSplitsAssignment = false;
        this.eofRecordEvaluator = eofRecordEvaluator;
 
        numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    } 
 
    // todo: provide a new constructor without FutureCompletingBlockingQueue
    public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        this(splitFetcherManager, recordEmitter, null, config, context);
    }
 
 
    public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        this.splitFetcherManager = splitFetcherManager;
        this.recordEmitter = recordEmitter;
        this.splitStates = new HashMap<>();
        this.options = new SourceReaderOptions(config);
        this.config = config;
        this.context = context;
        this.noMoreSplitsAssignment = false;
        this.eofRecordEvaluator = eofRecordEvaluator;
 
        numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter();
    }
}


@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { 

	@Depricated
	public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
 	}

	@Depricated
	public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                elementsQueue,
                new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config),
                recordEmitter,
                config,
                context);
	}


	// todo: Add new constructors without FutureCompletingBlockingQueue
 	public SingleThreadMultiplexSourceReaderBase(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        	super(
                new SingleThreadFetcherManager<>(splitReaderSupplier, config),,
                recordEmitter,
                config,
                context);
    }

    public SingleThreadMultiplexSourceReaderBase(
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        super(
                splitFetcherManager,
                recordEmitter,
                eofRecordEvaluator,
                config,
                context);
    }
}


3. Mark SplitFetcherManager andSingleThreadFetcherManager as `@PublicEvolving`,  mark constructor of SplitFetcherManager and SingleThreadFetcherManager as  @Depricated and provide a new constructor without FutureCompletingBlockingQueue. 

SplitFetcherManager
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {        
	@Deprecated
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {
        });
    }       

    @Deprecated
    @VisibleForTesting
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {

	}



 // todo: provide a new constructor without FutureCompletingBlockingQueue.     
 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(splitReaderFactory, configuration, (ignore) -> {
        });
        
  }

 public SplitFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        this.elementsQueue = new FutureCompletingBlockingQueue<>(
                configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
		// ......
}
SingleThreadFetcherManager
@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
        extends SplitFetcherManager<E, SplitT> {  
   	@Deprecated
    public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        this(elementsQueue, splitReaderSupplier, new Configuration());
    }

  	@Deprecated
    public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration) {
        super(elementsQueue, splitReaderSupplier, configuration);
    }

    @Deprecated
    @VisibleForTesting
    public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook);
    }

 	public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {
        this(splitReaderSupplier, new Configuration());
    }


    public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration) {
        super(splitReaderSupplier, configuration);
    }

    public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        super(splitReaderSupplier, configuration, splitFinishedHook);
    }
}



4. SplitFetcherManager provides  wrapper methods for FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.

@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
	
	/**
	 * returns the RecordsWithSplitIds produced by SplitReader.
	**/
	public RecordsWithSplitIds<E> poll(){
		return elementsQueue.poll();
	}

 	/**
     * Returns the availability future. If the queue is non-empty, then this future will already be
     * complete. Otherwise the obtained future is guaranteed to get completed the next time the
     * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}.
     *
     * <p>It is important that a completed future is no guarantee that the next call to {@link
     * #poll()} will return a non-null element. If there are concurrent consumer, another consumer
     * may have taken the available element. Or there was no element in the first place, because the
     * future was completed through a call to {@link #notifyAvailable()}.
     *
     * <p>For that reason, it is important to call this method (to obtain a new future) every time
     * again after {@link #poll()} returned null and you want to wait for data.
     */
	public CompletableFuture<Void> getAvailabilityFuture(){
 		return elementsQueue.getAvailabilityFuture();
	}

     /**
     * Makes sure the availability future is complete, if it is not complete already. All futures
     * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to be
     * completed.
     *
     * <p>All future calls to the method will return a completed future, until the point that the
     * availability is reset via calls to {@link #poll()} that leave the queue empty.
     */
	public void notifyAvailable(){
		elementsQueue.notifyAvailable();
	}

   /** Checks whether is no data available. */
 	public boolean noAvailableElement(){
		return elementsQueue.isEmpty();
 	}
}

5. Mark SplitFetcher and SplitFetcherTask as PublicEvolving.

@PublicEvolving
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
    FetchTask(
            SplitReader<E, SplitT> splitReader,
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Consumer<Collection<String>> splitFinishedCallback,
            int fetcherIndex) {
        this.splitReader = splitReader;
        this.elementsQueue = elementsQueue;
        this.splitFinishedCallback = splitFinishedCallback;
        this.lastRecords = null;
        this.fetcherIndex = fetcherIndex;
        this.wakeup = false;
    }
}
@PublicEvolving
public interface SplitFetcherTask {}




Compatibility, Deprecation, and Migration Plan

The Connectors that utilize constructors (including param elementsQueue) of SingleThreadMultiplexSourceReaderBase, SourceReaderBase, and SingleThreadFetcherManager should be migrated to the new constructor that does not have elementsQueue parameter.

Test Plan

nothing else to do.


Rejected Alternatives

If SplitFetcherManager becomes PublicEvolving, that also means SplitFetcher needs to be PublicEvolving, because it is returned by the protected method SplitFetcherManager.createSplitFetcher()

However,  SplitFetcher requires FutureCompletingBlockingQueue as a constructor parameter. SplitFetcher is a class rather than Interface. Therefore, I want to  change SplitFetcher to a public Interface and moving its implementation details to an implement subclass .


Reject Reason

The constructor of the SplitFetcher is already package private. So it can only be accessed from the classes in the package org.apache.flink.connector.base.source.reader.fetcher. And apparently, user classes should not be in this package. Therefore, even if we mark the
SplitFetcher class as PublicEvolving, the constructor is not available to the users. Only the public and protected methods are considered public API
in this case. Private / package private methods and fields are still internal.



  • No labels