You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Discussion threadhttps://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987
Vote thread

J

IRA

Unable to render Jira issues macro, execution error.

Release<Flink Version>

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).Motivation


Though the SingleThreadFetcherManager is annotated as Internal, it actually acts as some-degree public API, which is widely used in many connector projects:
flink-cdc-connectorflink-connector-mongodb and soon.


More over, even the constructor of 
SingleThreadMultiplexSourceReaderBase   (which is PublicEvolving) includes the params of SingleThreadFetcherManager   and FutureCompletingBlockingQueue  . That means that the SingleThreadFetcherManager  and 
FutureCompletingBlockingQueue  have already been exposed to users for a long time and are widely used.

public SingleThreadMultiplexSourceReaderBase(
    FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
    SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
    RecordEmitter<E, T, SplitStateT> recordEmitter,
    Configuration config,
    SourceReaderContext context)
{ super(elementsQueue, splitFetcherManager, recordEmitter, config, context); }


As shown in FLINK-31324, FLINK-28853 used to change the default constructor of SingleThreadFetcherManager.However, it influenced a lot. Finally, the former constructor was added back and marked asDeprecated。

Therefore, why not make SingleThreadFetcherManager PublicEvolving?


More contexts from the origin design: 

1. The original design of SplitFetcherManager and its subclasses was to make them public to the Source developers. The goal is to let us take care of the threading model, while the Source developers can just focus on the SplitReader implementation. Therefore, I think making SplitFetcherManater / SingleThreadFetcherManager public aligns with the original design. That is also why these classes are exposed in the constructor of SourceReaderBase.

2. For FutureCompletingBlockingQueue, as a hindsight, it might be better to not expose it to the Source developers. They are unlikely to use it anywhere other than just constructing it. The reason that FutureCompletingBlockingQueue is currently exposed in the SourceReaderBase constructor is because both the SplitFetcherManager and SourceReaderBase need it. One way to hide the FutureCompletingBlockingQueue from the public API is to make SplitFetcherManager the only owner class of the queue, and expose some of its methods via SplitFetcherManager. This way, the SourceReaderBase can invoke the methods via SplitFetcherManager. I believe this also makes the code slightly cleaner.


Public Interfaces

Change SingleThreadFetcherManager ,  FutureCompletingBlockingQueue   and parent class SplitFetcherManager  from Internal  to PublicEvolving .


Proposed Changes

  1. Mark constructor of SingleThreadMultiplexSourceReaderBase as @Depricated .
// SingleThreadMultiplexSourceReaderBase

@Depricated
public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(elementsQueue, splitFetcherManager, recordEmitter, config, context);
 }

@Depricated
public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
        super(
                elementsQueue,
                new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config),
                recordEmitter,
                config,
                context);
}

2. Mark constructor of SourceReaderBase as @Depricated and provide a new constructor without 

FutureCompletingBlockingQueue

@Depricated  
public SourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
       
}

//todo: Add new constructor without FutureCompletingBlockingQueue
public SourceReaderBase(
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            Configuration config,
            SourceReaderContext context) {
       
}

3. Mark constructor of SplitFetcherManager andSingleThreadFetcherManager as  @Depricated and provide a new constructor without FutureCompletingBlockingQueue. Mark SplitFetcherManager andSingleThreadFetcherManager as `@PublicEvolving`.

SingleThreadFetcherManager
@PublicEvolving
public class SingleThreadFetcherManager<E, SplitT extends SourceSplit>
        extends SplitFetcherManager<E, SplitT> {

	@Depricated   
	public SingleThreadFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration) {
        	super(elementsQueue, splitReaderSupplier, configuration);
  	 }

	//todo: Add new constructor without FutureCompletingBlockingQueue
	public SingleThreadFetcherManager(
            Supplier<SplitReader<E, SplitT>> splitReaderSupplier,
            Configuration configuration) {
        	super(splitReaderSupplier, configuration);
	}

}
SplitFetcherManager
@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
 	
	@Depricated    
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration) {
        this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {});
    }
	
  	
	// todo: provide a new constructor without FutureCompletingBlockingQueue.
    public SplitFetcherManager(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            Supplier<SplitReader<E, SplitT>> splitReaderFactory,
            Configuration configuration,
            Consumer<Collection<String>> splitFinishedHook) {
        this.elementsQueue = new FutureCompletingBlockingQueue<>(
                        	config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),;
}

4. SplitFetcherManager provides  wrapper methods for FutureCompletingBlockingQueue  to replace its usage in SourceReaderBase.

@PublicEvolving
public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> {
	
 public T poll();
 public CompletableFuture<Void> getAvailabilityFuture();
 public void notifyAvailable();
 public boolean isEmpty();

}

However, it will be tricky because SplitFetcherManager includes <E, SplitT extends SourceSplit>, while FutureCompletingBlockingQueue includes <T>. This means that SplitFetcherManager would have to be modified to <T, E, SplitT extends SourceSplit>, which would affect the compatibility of the SplitFetcherManager class.



Compatibility, Deprecation, and Migration Plan

Any change to  SingleThreadFetcherManager , FutureCompletingBlockingQueue   and parent class SplitFetcherManager  later will be careful for compatility.


Test Plan

nothing else to do.

  • No labels