Discussion thread | https://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987 |
---|---|
Vote thread | |
J IRA |
|
Release | <Flink Version> |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).Motivation
Though the SingleThreadFetcherManager is annotated as Internal, it actually acts as some-degree public API, which is widely used in many connector projects:
flink-cdc-connector, flink-connector-mongodb and soon.
More over, even the constructor of SingleThreadMultiplexSourceReaderBase
(which is PublicEvolving) includes the params of SingleThreadFetcherManager
and FutureCompletingBlockingQueue
. That means that the SingleThreadFetcherManager
and FutureCompletingBlockingQueue
have already been exposed to users for a long time and are widely used.
public SingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SingleThreadFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { super(elementsQueue, splitFetcherManager, recordEmitter, config, context); }
As shown in FLINK-31324, FLINK-28853 used to change the default constructor of SingleThreadFetcherManager.However, it influenced a lot. Finally, the former constructor was added back and marked asDeprecated。
Therefore, why not make SingleThreadFetcherManager PublicEvolving?
More contexts from the origin design:
1. The original design of SplitFetcherManager and its subclasses was to make them public to the Source developers. The goal is to let us take care of the threading model, while the Source developers can just focus on the SplitReader implementation. Therefore, I think making SplitFetcherManater / SingleThreadFetcherManager public aligns with the original design. That is also why these classes are exposed in the constructor of SourceReaderBase.
2. For FutureCompletingBlockingQueue, as a hindsight, it might be better to not expose it to the Source developers. They are unlikely to use it anywhere other than just constructing it. The reason that FutureCompletingBlockingQueue is currently exposed in the SourceReaderBase constructor is because both the SplitFetcherManager and SourceReaderBase need it. One way to hide the FutureCompletingBlockingQueue from the public API is to make SplitFetcherManager the only owner class of the queue, and expose some of its methods via SplitFetcherManager. This way, the SourceReaderBase can invoke the methods via SplitFetcherManager. I believe this also makes the code slightly cleaner.
Public Interfaces
Change SingleThreadFetcherManager
, FutureCompletingBlockingQueue
and parent class SplitFetcherManager
from Internal
to PublicEvolving
.
Proposed Changes
- Mark constructor of SingleThreadMultiplexSourceReaderBase as
@Depricated
.
// SingleThreadMultiplexSourceReaderBase @Depricated public SingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SingleThreadFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { super(elementsQueue, splitFetcherManager, recordEmitter, config, context); } @Depricated public SingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { super( elementsQueue, new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config), recordEmitter, config, context); }
2. Mark constructor of SourceReaderBase as @Depricated
and provide a new constructor without
FutureCompletingBlockingQueue
@Depricated public SourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { } //todo: Add new constructor without FutureCompletingBlockingQueue public SourceReaderBase( SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { }
3. Mark constructor of SplitFetcherManager andSingleThreadFetcherManager as @Depricated
and provide a new constructor without FutureCompletingBlockingQueue. Mark SplitFetcherManager andSingleThreadFetcherManager as `@PublicEvolving`.
@PublicEvolving public class SingleThreadFetcherManager<E, SplitT extends SourceSplit> extends SplitFetcherManager<E, SplitT> { @Depricated public SingleThreadFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration) { super(elementsQueue, splitReaderSupplier, configuration); } //todo: Add new constructor without FutureCompletingBlockingQueue public SingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration) { super(splitReaderSupplier, configuration); } }
@PublicEvolving public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { @Depricated public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration) { this(elementsQueue, splitReaderFactory, configuration, (ignore) -> {}); } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration, Consumer<Collection<String>> splitFinishedHook) { this.elementsQueue = new FutureCompletingBlockingQueue<>( config.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)),; }
4. SplitFetcherManager provides wrapper methods for FutureCompletingBlockingQueue to replace its usage in SourceReaderBase.
@PublicEvolving public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { public T poll(); public CompletableFuture<Void> getAvailabilityFuture(); public void notifyAvailable(); public boolean isEmpty(); }
However, it will be tricky because SplitFetcherManager includes <E, SplitT extends SourceSplit>
, while FutureCompletingBlockingQueue includes <T>
. This means that SplitFetcherManager would have to be modified to <T, E, SplitT extends SourceSplit>
, which would affect the compatibility of the SplitFetcherManager class.
Compatibility, Deprecation, and Migration Plan
Any change to SingleThreadFetcherManager
, FutureCompletingBlockingQueue
and parent class SplitFetcherManager
later will be careful for compatility.
Test Plan
nothing else to do.