Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Discussion threadhttps://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987
Vote thread

J

IRA

JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-33465

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).Motivation


Though the SingleThreadFetcherManager is annotated as Internal, it actually acts as some-degree public API, which is widely used in many connector projects:
flink-cdc-connectorflink-connector-mongodb and soon.

...

As shown in FLINK-31324, FLINK-28853 used to change the default constructor of SingleThreadFetcherManager.However, it influenced a lot. Finally, the former constructor was added back and marked asDeprecated。

Therefore, why not make SingleThreadFetcherManager PublicEvolving?


More contexts from the origin design: 

1. The original design of SplitFetcherManager and its subclasses was to make them public to the Source developers. The goal is to let us take care of the threading model, while the Source developers can just focus on the SplitReader implementation. Therefore, I think making SplitFetcherManater / SingleThreadFetcherManager public aligns with the original design. That is also why these classes are exposed in the constructor of SourceReaderBase.

2. For FutureCompletingBlockingQueue, as a hindsight, it might be better to not expose it to the Source developers. They are unlikely to use it anywhere other than just constructing it. The reason that FutureCompletingBlockingQueue is currently exposed in the SourceReaderBase constructor is because both the SplitFetcherManager and FutureCompletingBlockingQueue PublicEvolving?SourceReaderBase need it. One way to hide the FutureCompletingBlockingQueue from the public API is to make SplitFetcherManager the only owner class of the queue, and expose some of its methods via SplitFetcherManager. This way, the SourceReaderBase can invoke the methods via SplitFetcherManager. I believe this also makes the code slightly cleaner.


Public Interfaces

Change SingleThreadFetcherManager ,  FutureCompletingBlockingQueue   and parent class SplitFetcherManager  from Internal  to PublicEvolving .


Proposed Changes

...

  1. Mark constructor of SingleThreadMultiplexSourceReaderBase as @Depricated .
Code Block
// SingleThreadMultiplexSourceReaderBase

@Depricated
public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
 }

@Depricated
public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                elementsQueue,
                new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config),
                recordEmitter,
                config,
                context);
}

2. Mark constructor of SourceReaderBase as @Depricated and provide a new constructor without 

FutureCompletingBlockingQueue

Code Block
@Depricated  
public SourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
       
}

//todo: Add new constructor without FutureCompletingBlockingQueue
public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
       
}

3. Mark constructor of SplitFetcherManager andSingleThreadFetcherManager as  @Depricated and provide a new constructor without FutureCompletingBlockingQueue. Mark SplitFetcherManager andSingleThreadFetcherManager as `@PublicEvolving`.

Code Block
titleSingleThreadFetcherManager
@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
        extends SplitFetcherManager<E, SplitT> {

	@Depricated   
	public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration) {
        	super(elementsQueue, splitReaderSupplier, configuration);
  	 }

	//todo: Add new constructor without FutureCompletingBlockingQueue
	public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration) {
        	super(splitReaderSupplier, configuration);
	}

}


Code Block
titleSplitFetcherManager
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
 	
	@Depricated    
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {});
    }
	
  	
	// todo: provide a new constructor without FutureCompletingBlockingQueue.
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        this.elementsQueue = new FutureCompletingBlockingQueue<>(
                        	config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),;
}

4. SplitFetcherManager provides  wrapper methods for FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.

Code Block
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
	
 public T poll();
 public CompletableFuture<Void> getAvailabilityFuture();
 public void notifyAvailable();
 public boolean isEmpty();

}

However, it will be tricky because SplitFetcherManager includes <E, SplitT extends SourceSplit>, while FutureCompletingBlockingQueue includes <T>. This means that SplitFetcherManager would have to be modified to <T, E, SplitT extends SourceSplit>, which would affect the compatibility of the SplitFetcherManager classnothing else to do.



Compatibility, Deprecation, and Migration Plan

Any change to  SingleThreadFetcherManager , FutureCompletingBlockingQueue   and parent class SplitFetcherManager  later will be careful for compatility.


Test Plan

nothing else to do.