Discussion thread | https://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987 | ||||||||
---|---|---|---|---|---|---|---|---|---|
Vote thread | https://lists.apache.org/thread/t1zff21z440pvv48jyhm8pgtqsyplchn | ||||||||
JIRA |
| ||||||||
Release | 1.19 |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
...
Why is it necessary to make SingleThreadFetcherManager PublicEvolving?
- Though the SingleThreadFetcherManager is annotated as Internal, it actually acts as some-degree public API, which is widely used in many connector projects:
...
...
...
- kafka, flink-connector-mongodb, flink-cdc-connectors and soon. In flink-connector-kafka, KafkaSourceFetcherManager even extends SingleThreadFetcherManager.
More over, even the constructor
...
of
SingleThreadMultiplexSourceReaderBase
...
(which is PublicEvolving) includes the params of
SingleThreadFetcherManager
...
and
FutureCompletingBlockingQueue
...
. That means that the
SingleThreadFetcherManager
...
and
FutureCompletingBlockingQueue
...
have already been exposed to users for a long time and are widely used.As shown in
FLINK-31324,FLINK-28853used to change the default constructor of SingleThreadFetcherManager.However, it influenced a lot. Finally, the former constructor was added back and marked asDeprecated.Code Block public SingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SingleThreadFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { super(elementsQueue, splitFetcherManager, recordEmitter, config, context); }
...
The original design of SplitFetcherManager and its subclasses was to make them public to the Source developers. The
As shown in FLINK-31324, FLINK-28853 used to change the default constructor of SingleThreadFetcherManager.However, it influenced a lot. Finally, the former constructor was added back and marked asDeprecated。
Therefore, why not make SingleThreadFetcherManager PublicEvolving?
More contexts from the origin design:
...
- goal is to let us take care of the threading model, while the Source developers can just focus on the SplitReader implementation. Therefore, I think making SplitFetcherManater / SingleThreadFetcherManager public aligns with the original design. That is also why these classes are exposed in the constructor of SourceReaderBase.
2. For FutureCompletingBlockingQueue, as a hindsight, it might be better to not expose it to the Source developers. They are unlikely to use it anywhere other than just constructing it. The reason that FutureCompletingBlockingQueue is currently exposed in the SourceReaderBase constructor is because both the SplitFetcherManager and SourceReaderBase need it. One way to hide the FutureCompletingBlockingQueue from the public API is to make SplitFetcherManager the only owner class of the queue, and expose some of its methods via SplitFetcherManager. This way, the SourceReaderBase can invoke the methods via SplitFetcherManager. I believe this also makes . This will make the code slightly cleaner.
In summary, this flip has 2 goals:
...
- To expose the SplitFetcherManager / SingleThreadFetcheManager as Public, allowing connector developers to easily create their own threading models in the SourceReaderBase.
- To hide the element queue from the connector developers and make SplitFetcherManager the only owner class of the queue
...
Public Interfaces
SplitFetcherManager
- Change
...
- SplitFetcherManager
...
- from Internal
...
- to PublicEvolving.
Proposed Changes
...
- Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance internally.
- Add a new internal method SplitFetcherManager#getQueue() for SourceReaderBase usage.
Code Block |
---|
@PublicEvolving public abstract class SourceReaderBase<ESplitFetcherManager<E, T, SplitT extends SourceSplit, SplitStateT>SourceSplit> { public SingleThreadMultiplexSourceReaderBase( implements SourceReader<T, SplitT> { Supplier<SplitReader<E, SplitT>> @DeprecatedsplitReaderSupplier, public SourceReaderBase( RecordEmitter<E, T, SplitStateT> FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueuerecordEmitter, SplitFetcherManager<E, SplitT> splitFetcherManagerConfiguration config, RecordEmitter<E, T, SplitStateT> recordEmitter,SourceReaderContext context) { super( Configuration config, new SourceReaderContext context) { SingleThreadFetcherManager<>(splitReaderSupplier, config), this(elementsQueue, splitFetcherManager, recordEmitter, null, config, context); recordEmitter, } @Deprecated public SourceReaderBase(config, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, context); } @Deprecated SplitFetcherManager<E, SplitT>public splitFetcherManager,SplitFetcherManager( RecordEmitter<E,FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> TelementsQueue, SplitStateT> recordEmitter, @NullableSupplier<SplitReader<E, RecordEvaluator<T>SplitT>> eofRecordEvaluatorsplitReaderFactory, Configuration config,configuration) { this(elementsQueue, splitReaderFactory, configuration, (ignore) SourceReaderContext-> context) { //this.elementsQueue = elementsQueue; }); } this.splitFetcherManager = splitFetcherManager;@Deprecated @VisibleForTesting this.recordEmitter = recordEmitter;public SplitFetcherManager( this.splitStates = new HashMap<>(); FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, this.options = new SourceReaderOptions(config); Supplier<SplitReader<E, SplitT>> splitReaderFactory, this.config = config; Configuration configuration, this.context = context; this.noMoreSplitsAssignment = false; this.eofRecordEvaluator = eofRecordEvaluator; Consumer<Collection<String>> splitFinishedHook) { } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager( numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); Supplier<SplitReader<E, SplitT>> splitReaderFactory, } // todo: provideConfiguration aconfiguration) new{ constructor without FutureCompletingBlockingQueue public SourceReaderBasethis( splitReaderFactory, configuration, (ignore) -> { SplitFetcherManager<E, SplitT> splitFetcherManager, }); } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager( RecordEmitter<E, TSupplier<SplitReader<E, SplitStateT>SplitT>> recordEmittersplitReaderFactory, Configuration configconfiguration, SourceReaderContextConsumer<Collection<String>> contextsplitFinishedHook) { this(splitFetcherManager, recordEmitter, null, config, context);.elementsQueue = new FutureCompletingBlockingQueue<>( } public SourceReaderBase( configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)); // ...... } @Internal public SplitFetcherManager<E, SplitT> splitFetcherManager, FutureCompletingBlockingQueue getQueue(); } |
SingleThreadFetcherManager
- Change SingleThreadFetcherManager from Internal to PublicEvolving.
- Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors without FutureCompletingBlockingQueue as replacements which creates the FutureCompletingBlockingQueue instance by its parent class(SplitFetcherManager) internally.
Code Block |
---|
@PublicEvolving public class SingleThreadFetcherManager<E, SplitT extends SourceSplit> RecordEmitter<E, T, SplitStateT>extends recordEmitterSplitFetcherManager<E, SplitT> { @Deprecated @Nullable RecordEvaluator<T> eofRecordEvaluator,public SingleThreadFetcherManager( ConfigurationFutureCompletingBlockingQueue<RecordsWithSplitIds<E>> configelementsQueue, Supplier<SplitReader<E, SourceReaderContextSplitT>> contextsplitReaderSupplier) { this.splitFetcherManager = splitFetcherManager(elementsQueue, splitReaderSupplier, new Configuration()); } @Deprecated this.recordEmitter =public recordEmitter;SingleThreadFetcherManager( this.splitStates = new HashMap<>();FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, this.options = new SourceReaderOptions(config); Supplier<SplitReader<E, SplitT>> splitReaderSupplier, this.config = config; Configuration configuration) { this.context = contextsuper(elementsQueue, splitReaderSupplier, configuration); } this.noMoreSplitsAssignment = false;@Deprecated @VisibleForTesting this.eofRecordEvaluator = eofRecordEvaluator; public SingleThreadFetcherManager( numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, } } |
Code Block |
@PublicEvolving public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<ESupplier<SplitReader<E, T,SplitT>> SplitTsplitReaderSupplier, SplitStateT> { @Depricated public SingleThreadMultiplexSourceReaderBase( Configuration FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueueconfiguration, SingleThreadFetcherManager<E,Consumer<Collection<String>> SplitT>splitFinishedHook) splitFetcherManager,{ super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook); RecordEmitter<E, T, SplitStateT> recordEmitter,} // todo: provide a new constructor without FutureCompletingBlockingQueue. Configurationpublic config,SingleThreadFetcherManager( SourceReaderContext contextSupplier<SplitReader<E, SplitT>> splitReaderSupplier) { super(elementsQueuesplitReaderSupplier, splitFetcherManager, recordEmitter, config, context); } @Depricated public SingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,new Configuration()); } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderSupplier, RecordEmitter<E, T, SplitStateT> recordEmitter,Configuration configuration) { super(splitReaderSupplier, configuration); Configuration config,} // todo: provide a new constructor SourceReaderContext context) {without FutureCompletingBlockingQueue. public superSingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> elementsQueuesplitReaderSupplier, new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config)Configuration configuration, Consumer<Collection<String>> recordEmitter,splitFinishedHook) { super(splitReaderSupplier, configuration, splitFinishedHook); config, context); } // todo: Add new constructors without FutureCompletingBlockingQueue public SingleThreadMultiplexSourceReaderBase(} } |
SplitFetcherTask
Change SplitFetcherTask from Internal to PublicEvolving.
Code Block |
---|
@PublicEvolving
public interface SplitFetcherTask {} |
SplitFetcher
Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher, so it can only created by org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#createSplitFetcher
Code Block |
---|
@PublicEvolving public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { SplitFetcher( int id, Supplier<SplitReader<E, SplitT>> splitReaderSupplierFutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, RecordEmitter<ESplitReader<E, T, SplitStateT> recordEmitterSplitT> splitReader, ConfigurationConsumer<Throwable> configerrorHandler, SourceReaderContext context) { Runnable shutdownHook, super( Consumer<Collection<String>> splitFinishedHook, boolean allowUnalignedSourceSplits); } |
the constructor of SplitFetcher to the end users?
SourceReaderBase
Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.
Code Block |
---|
@PublicEvolving public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>new SingleThreadFetcherManager<>(splitReaderSupplier, config),, recordEmitter, implements SourceReader<T, SplitT> { config, @Deprecated public SourceReaderBase( context); } publicFutureCompletingBlockingQueue<RecordsWithSplitIds<E>> SingleThreadMultiplexSourceReaderBase(elementsQueue, SingleThreadFetcherManager<ESplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context) { superthis( elementsQueue, splitFetcherManager, recordEmitter, null, config, context); } splitFetcherManager,@Deprecated public SourceReaderBase( recordEmitter FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<E, SplitT> splitFetcherManager, eofRecordEvaluatorRecordEmitter<E, T, SplitStateT> recordEmitter, @Nullable configRecordEvaluator<T> eofRecordEvaluator, Configuration config, context); } } |
3. Mark SplitFetcherManager andSingleThreadFetcherManager as `@PublicEvolving`, mark constructor of SplitFetcherManager and SingleThreadFetcherManager as @Depricated
and provide a new constructor without FutureCompletingBlockingQueue.
Code Block | ||
---|---|---|
| ||
@PublicEvolving public abstract class SplitFetcherManager<E, SplitT extendsSourceReaderContext SourceSplit>context) { @Deprecated //this.elementsQueue public SplitFetcherManager(= elementsQueue; this.splitFetcherManager FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,= splitFetcherManager; this.recordEmitter Supplier<SplitReader<E, SplitT>> splitReaderFactory,= recordEmitter; this.splitStates = Configuration configuration) {new HashMap<>(); this(elementsQueue, splitReaderFactory, configuration, (ignore) -> { .options = new SourceReaderOptions(config); this.config = })config; } this.context = context; @Deprecated @VisibleForTesting this.noMoreSplitsAssignment public SplitFetcherManager(= false; this.eofRecordEvaluator = eofRecordEvaluator; FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); Supplier<SplitReader<E,} SplitT>> splitReaderFactory, // todo: provide a new constructor without Configuration configuration,FutureCompletingBlockingQueue public SourceReaderBase( Consumer<Collection<String>>SplitFetcherManager<E, splitFinishedHook)SplitT> { } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager(splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Supplier<SplitReader<E, SplitT>> splitReaderFactoryConfiguration config, ConfigurationSourceReaderContext configurationcontext) { this(splitReaderFactorysplitFetcherManager, configurationrecordEmitter, (ignore) -> { null, config, context); } }); // todo: provide a new constructor without FutureCompletingBlockingQueue } public SplitFetcherManagerSourceReaderBase( Supplier<SplitReader<ESplitFetcherManager<E, SplitT>>SplitT> splitReaderFactorysplitFetcherManager, RecordEmitter<E, T, ConfigurationSplitStateT> configurationrecordEmitter, Consumer<Collection<String>>@Nullable splitFinishedHook)RecordEvaluator<T> {eofRecordEvaluator, this.elementsQueue = new FutureCompletingBlockingQueue<>( Configuration config, configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)); // ...... } | ||
Code Block | ||
| ||
@PublicEvolving public class SingleThreadFetcherManager<E, SplitT extends SourceSplit> extends SplitFetcherManager<E, SplitT> { @DeprecatedSourceReaderContext context) { this.splitFetcherManager = splitFetcherManager; this.recordEmitter = recordEmitter; public SingleThreadFetcherManager( this.splitStates = new HashMap<>(); FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, this.options = new SourceReaderOptions(config); Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {this.config = config; this(elementsQueue, splitReaderSupplier, new Configuration()).context = context; } @Deprecated this.noMoreSplitsAssignment public SingleThreadFetcherManager(= false; this.eofRecordEvaluator FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier,= eofRecordEvaluator; numRecordsInCounter Configuration configuration) { super(elementsQueue, splitReaderSupplier, configuration= context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); } } |
SingleThreadMultiplexSourceReaderBase
Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.
Code Block |
---|
@PublicEvolving public abstract class SingleThreadMultiplexSourceReaderBase<E, T, @Deprecated @VisibleForTesting public SingleThreadFetcherManagerSplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { @Depricated public SingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<ESingleThreadFetcherManager<E, SplitT>> splitReaderSupplierSplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration configurationconfig, Consumer<Collection<String>>SourceReaderContext splitFinishedHookcontext) { super(elementsQueue, splitFetcherManager, splitReaderSupplierrecordEmitter, configurationconfig, splitFinishedHookcontext); } @Depricated public // todo: provide a new constructor without FutureCompletingBlockingQueue. public SingleThreadFetcherManager( SingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier) {, this(splitReaderSupplier, new Configuration()); RecordEmitter<E, } public SingleThreadFetcherManager(T, SplitStateT> recordEmitter, Supplier<SplitReader<E, SplitT>> splitReaderSupplierConfiguration config, ConfigurationSourceReaderContext configurationcontext) { super(splitReaderSupplier, configuration); } public SingleThreadFetcherManager( elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config), Configuration configurationrecordEmitter, Consumer<Collection<String>> splitFinishedHook) {config, super(splitReaderSupplier, configuration, splitFinishedHook context); } // todo: Add new constructors } } |
4. SplitFetcherManager provides wrapper methods for FutureCompletingBlockingQueue to replace its usage in SourceReaderBase.
Code Block |
---|
@PublicEvolving public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { /** * returns the RecordsWithSplitIds produced by SplitReader. **/ public RecordsWithSplitIds<E> poll(){ return elementsQueue.poll(); } /** without FutureCompletingBlockingQueue public SingleThreadMultiplexSourceReaderBase( Supplier<SplitReader<E, SplitT>> splitReaderSupplier, * Returns the availability future. If the queue is non-empty, then this future will already be RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration *config, complete. Otherwise the obtained future is guaranteed to get completed the nextSourceReaderContext timecontext) the{ * queue becomes non-empty, or a notification happens via {@link #notifyAvailable()}. super( * new * <p>It is important that a completed future is no guarantee that the next call to {@linkSingleThreadFetcherManager<>(splitReaderSupplier, config),, recordEmitter, * #poll()} will return a non-null element. If there are concurrent consumerconfig, another consumer * may have taken the available element. Or there was no elementcontext); in the first place,} because the // todo: provide *a futurenew wasconstructor completedwithout throughFutureCompletingBlockingQueue a call to {@link #notifyAvailable()}.public SingleThreadMultiplexSourceReaderBase( * * <p>ForSingleThreadFetcherManager<E, thatSplitT> reasonsplitFetcherManager, it is important to call this method (to obtain a new future) every timeRecordEmitter<E, T, SplitStateT> recordEmitter, * again after {@link #poll()} returned null and@Nullable youRecordEvaluator<T> wanteofRecordEvaluator, to wait for data. */ public CompletableFuture<Void> getAvailabilityFuture(){ return elementsQueue.getAvailabilityFuture(); } /** Configuration config, * Makes sure the availabilitySourceReaderContext future is complete, if it is not complete already. All futurescontext) { super( * returned by previous calls to {@link #getAvailabilityFuture()} are guaranteed to besplitFetcherManager, * completed. * recordEmitter, * <p>All future calls to the method will return a completed futureeofRecordEvaluator, until the point that the * availability is reset viaconfig, calls to {@link #poll()} that leave the queue empty. */ public void notifyAvailable(context){ elementsQueue.notifyAvailable(); } /** Checks whether is no data available. */ public boolean noAvailableElement(){ return elementsQueue.isEmpty(); } } |
5. Mark SplitFetcher and SplitFetcherTask as PublicEvolving.
Code Block |
---|
@PublicEvolving
public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable {
FetchTask(
SplitReader<E, SplitT> splitReader,
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
Consumer<Collection<String>> splitFinishedCallback,
int fetcherIndex) {
this.splitReader = splitReader;
this.elementsQueue = elementsQueue;
this.splitFinishedCallback = splitFinishedCallback;
this.lastRecords = null;
this.fetcherIndex = fetcherIndex;
this.wakeup = false;
}
} |
Code Block |
---|
@PublicEvolving
public interface SplitFetcherTask {} |
;
}
} |
Proposed Changes
- By exposing the SplitFetcherManager / SingleThreadFetcheManager, connector developers can easily create their own threading models in the SourceReaderBase, by implementing addSplits(), removeSplits() and
maybeShutdownFinishedFetchers() functions.
- Note that the SplitFetcher constructor is package private, so users can only create SplitFetchers via SplitFetcherManager.createSplitFetcher(). This ensures each SplitFetcher is always owned by the SplitFetcherManager.
- This FLIP essentially embedded the element queue (a FutureCompletingBlockingQueue) instance into the SplitFetcherManager. This hides the element queue from the connector developers and simplifies the SourceReaderBase to consist of only SplitFetcherManager and RecordEmitter as major components.
Compatibility, Deprecation, Compatibility, Deprecation, and Migration Plan
The Connectors that utilize constructors (including param elementsQueue) of SingleThreadMultiplexSourceReaderBase, SourceReaderBase, and SingleThreadFetcherManager should be migrated to the new constructor that does not have elementsQueue parameter.
Mark the impacted constructors as deprecated in 1.19, and remove them in release of 2.0.
Test Plan
nothing else to do.
Rejected Alternatives
change SplitFetcher to a public Interface
If SplitFetcherManager becomes PublicEvolving, that also means SplitFetcher needs to be PublicEvolving, because it is returned by the protected method SplitFetcherManagerIf SplitFetcherManager becomes PublicEvolving, that also means SplitFetcher needs to be PublicEvolving, because it is returned by the protected method SplitFetcherManager.createSplitFetcher()
However, SplitFetcher requires requires FutureCompletingBlockingQueue as a constructor parameter. SplitFetcher SplitFetcher is a class rather than Interface. Therefore, I want to change SplitFetcher to a public Interface and moving its implementation details to an implement SplitFetcher to a public Interface and moving its implementation details to an implement subclass .
Reject Reason
The constructor of the SplitFetcher is already package private. So it can only be accessed from the classes in the package org.apache.flink.connector.basebase.source.reader.fetcher. And apparently, user classes should not be in this package. Therefore, even if we mark the
SplitFetcher class as PublicEvolving, the constructor is not available to the users. Only the public and protected methods are considered public API
in this case. Private / package private methods and fields are still user classes should not be in this package. Therefore, even if we mark the
SplitFetcher class as PublicEvolving, the constructor is not available to the users. Only the public and protected methods are considered public API
in this case. Private / package private methods and fields are still internal.
SplitFetcherManager expose poll / getAvailabilityFuture / notifyAvailable / noAvailableElement
- Change SplitFetcherManager from Internal to PublicEvolving.
- Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance internally.
- Add a new internal method SplitFetcherManager#getQueue() for SourceReaderBase usage.
|
Reject Reason
SplitFetcherManager exposes 4 more methods, poll / getAvailabilityFuture / notifyAvailable / noAvailableElement, which are tightly coupled with the implementation of the elementQueue. The naming of these methods look weird, like what does it mean to "poll from a SplitFetcherManager" / "notify a SplitFetcherManager available"? To clarify these methods we have to explain to developers that "well we hide a queue inside SplitFetcherMamager and the poll method is actually polling from the queue". I'm afraid these methods will implicitly expose the concept and the implementation of the queue to developers.
A cleaner solution would be having a new interface that extracts SplitFetcher creating and managing logic from the current SplitFetcherManager, but having too many concepts might make the entire Source API even harder to understand. To make a compromise, only expose constructors of SplitFetcherManager as public APIs, and adding a new internal method SplitFetcherManager#getQueue() for SourceReaderBase (well it's a bit hacky , more worthy than exposing methods like poll and notifyAvailable on SplitFetcherManager )internal.