...
Discussion thread | https://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987 | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
Vote thread | https://lists.apache.org/thread/t1zff21z440pvv48jyhm8pgtqsyplchn | ||||||||||
JIRA |
| ||||||||||
Release | 1.19 |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
For FutureCompletingBlockingQueue, as a hindsight, it might be better to not expose it to the Source developers. They are unlikely to use it anywhere other than just constructing it. The reason that FutureCompletingBlockingQueue is currently exposed in the SourceReaderBase constructor is because both the SplitFetcherManager and SourceReaderBase need it. One way to hide the FutureCompletingBlockingQueue from the public API is to make SplitFetcherManager the only owner class of the queue, and expose some of its methods via SplitFetcherManager. This way, the SourceReaderBase can invoke the methods via SplitFetcherManager. I believe this also makes . This will make the code slightly cleaner.
...
- To expose the SplitFetcherManager / SingleThreadFetcheManager as Public, allowing connector developers to easily create their own threading models in the SourceReaderBase.
- To hide the element queue from the connector developers and simplify the SourceReaderBase to consist of only SplitFetcherManager and RecordEmitter as major components. make SplitFetcherManager the only owner class of the queue
Public Interfaces
SplitFetcherManager
...
Code Block |
---|
@PublicEvolving public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { @Deprecated public SplitFetcherManagerSingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueueSupplier<SplitReader<E, SplitT>> splitReaderSupplier, Supplier<SplitReader<ERecordEmitter<E, SplitT>> splitReaderFactoryT, SplitStateT> recordEmitter, Configuration config, configuration) { this(elementsQueue, splitReaderFactory, configuration,SourceReaderContext (ignorecontext) -> { });super( } @Deprecated @VisibleForTesting new SingleThreadFetcherManager<>(splitReaderSupplier, config), public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueuerecordEmitter, Supplier<SplitReader<E, SplitT>> splitReaderFactoryconfig, Configuration configuration, context); } @Deprecated Consumer<Collection<String>> splitFinishedHook) { } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager(public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration) { this(elementsQueue, splitReaderFactory, configuration, (ignore) -> { }); } } @Deprecated // todo: provide@VisibleForTesting a new constructor withoutpublic FutureCompletingBlockingQueue. public SplitFetcherManager(SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration, Consumer<Collection<String>> splitFinishedHook) { } // todo: provide a new constructor without this.elementsQueue = new FutureCompletingBlockingQueue<>FutureCompletingBlockingQueue. public SplitFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderFactory, configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)); // ...... } @Internal public SplitFetcherManager#getQueue(); } |
SingleThreadFetcherManager
- Change SingleThreadFetcherManager from Internal to PublicEvolving.
- Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors without FutureCompletingBlockingQueue as replacements which creates the FutureCompletingBlockingQueue instance by its parent class(SplitFetcherManager) internally.
Code Block |
---|
@PublicEvolving public class SingleThreadFetcherManager<E, SplitT extends SourceSplit> Configuration configuration) { this(splitReaderFactory, configuration, (ignore) -> { }); } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager( extends SplitFetcherManager<ESupplier<SplitReader<E, SplitT> { @Deprecated public SingleThreadFetcherManager(SplitT>> splitReaderFactory, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>Configuration elementsQueueconfiguration, Supplier<SplitReader<E, SplitT>> splitReaderSupplierConsumer<Collection<String>> splitFinishedHook) { this(.elementsQueue, splitReaderSupplier,= new ConfigurationFutureCompletingBlockingQueue<>()); } @Deprecated public SingleThreadFetcherManager( configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)); // ...... } @Internal public FutureCompletingBlockingQueue getQueue(); } |
SingleThreadFetcherManager
- Change SingleThreadFetcherManager from Internal to PublicEvolving.
- Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors without FutureCompletingBlockingQueue as replacements which creates the FutureCompletingBlockingQueue instance by its parent class(SplitFetcherManager) internally.
Code Block |
---|
@PublicEvolving public class SingleThreadFetcherManager<E, SplitT extends SourceSplit> extends SplitFetcherManager<E, SplitT> { @Deprecated FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration) { super(elementsQueue, splitReaderSupplier, configuration); } @Deprecated @VisibleForTesting public SingleThreadFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier,) { this(elementsQueue, splitReaderSupplier, new Configuration configuration,Configuration()); } @Deprecated public SingleThreadFetcherManager( Consumer<Collection<String>> splitFinishedHook) { super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook); FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, } // todo: provide a new constructor without FutureCompletingBlockingQueue.Supplier<SplitReader<E, public SingleThreadFetcherManager(SplitT>> splitReaderSupplier, Supplier<SplitReader<E, SplitT>> splitReaderSupplierConfiguration configuration) { thissuper(elementsQueue, splitReaderSupplier, new Configuration(configuration)); } // todo: provide a@Deprecated new constructor without FutureCompletingBlockingQueue. @VisibleForTesting public SingleThreadFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration), { super( Consumer<Collection<String>> splitFinishedHook) { super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook); } // todo: provide a new constructor without without FutureCompletingBlockingQueueFutureCompletingBlockingQueue. public SingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderSupplier,) { super(splitReaderSupplier, new Configuration configuration,Configuration()); } // todo: provide a new constructor Consumer<Collection<String>> splitFinishedHookwithout FutureCompletingBlockingQueue. public SingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration) { super(splitReaderSupplier, configuration, splitFinishedHook); } } |
SplitFetcherTask
Change SplitFetcherTask from Internal to PublicEvolving.
Code Block |
---|
@PublicEvolving public interface// SplitFetcherTask {} |
SplitFetcher
Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher, so it can only created by org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#createSplitFetcher
Code Block |
---|
@PublicEvolving public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { SplitFetchertodo: provide a new constructor without FutureCompletingBlockingQueue. public SingleThreadFetcherManager( int idSupplier<SplitReader<E, SplitT>> splitReaderSupplier, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>Configuration elementsQueueconfiguration, SplitReader<E,Consumer<Collection<String>> SplitT>splitFinishedHook) splitReader,{ super(splitReaderSupplier, Consumer<Throwable> errorHandler,configuration, splitFinishedHook); } } |
SplitFetcherTask
Change SplitFetcherTask from Internal to PublicEvolving.
Code Block |
---|
@PublicEvolving public interface SplitFetcherTask {} |
SplitFetcher
Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher, so it can only created by org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#createSplitFetcher
Code Block |
---|
@PublicEvolving public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { SplitFetcher( Runnable shutdownHook, Consumer<Collection<String>> splitFinishedHook, boolean allowUnalignedSourceSplits); } |
the constructor of SplitFetcher to the end users?
SourceReaderBase
Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.
Code Block |
---|
@PublicEvolving public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> int id, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, implements SourceReader<T, SplitT> { SplitReader<E, @Deprecated public SourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueueSplitT> splitReader, SplitFetcherManager<E, SplitT> splitFetcherManagerConsumer<Throwable> errorHandler, RecordEmitter<E, T, SplitStateT> recordEmitterRunnable shutdownHook, ConfigurationConsumer<Collection<String>> configsplitFinishedHook, SourceReaderContextboolean contextallowUnalignedSourceSplits); } |
the constructor of SplitFetcher to the end users?
SourceReaderBase
Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.
Code Block |
---|
@PublicEvolving public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> { this(elementsQueue, splitFetcherManager, recordEmitter, null, config, context); } implements SourceReader<T, @Deprecated SplitT> { @Deprecated public SourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, @NullableConfiguration RecordEvaluator<T> eofRecordEvaluatorconfig, SourceReaderContext Configurationcontext) config,{ this(elementsQueue, splitFetcherManager, recordEmitter, null, SourceReaderContextconfig, context) {; } //this.elementsQueue = elementsQueue;@Deprecated public SourceReaderBase( this.splitFetcherManager = splitFetcherManager; this.recordEmitter = recordEmitter; FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, this.splitStates =SplitFetcherManager<E, new HashMap<>();SplitT> splitFetcherManager, this.options = new SourceReaderOptions(config); RecordEmitter<E, T, SplitStateT> recordEmitter, this.config = config; @Nullable RecordEvaluator<T> eofRecordEvaluator, this.context = context; this.noMoreSplitsAssignment =Configuration false;config, this.eofRecordEvaluator = eofRecordEvaluator; SourceReaderContext context) { numRecordsInCounter//this.elementsQueue = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter()elementsQueue; } this.splitFetcherManager = // todo: provide a new constructor without FutureCompletingBlockingQueue public SourceReaderBase(splitFetcherManager; this.recordEmitter = recordEmitter; this.splitStates = SplitFetcherManager<E, SplitT> splitFetcherManager,new HashMap<>(); this.options = new RecordEmitter<E, T, SplitStateT> recordEmitter, SourceReaderOptions(config); this.config = config; Configuration config, this.context = context; this.noMoreSplitsAssignment SourceReaderContext context) {= false; this(splitFetcherManager, recordEmitter, null, config, context).eofRecordEvaluator = eofRecordEvaluator; numRecordsInCounter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); } // todo: provide a new constructor without FutureCompletingBlockingQueue public SourceReaderBase( SplitFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context) { this.(splitFetcherManager = splitFetcherManager, recordEmitter, null, config, context); } this.recordEmitter = recordEmitter; this.splitStates = new HashMap<>(); // todo: provide a new constructor without FutureCompletingBlockingQueue public SourceReaderBase( this.options = new SourceReaderOptions(config); SplitFetcherManager<E, SplitT> splitFetcherManager, this.config = config; RecordEmitter<E, T, this.context = context; SplitStateT> recordEmitter, this.noMoreSplitsAssignment = false; @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration config, SourceReaderContext context) { this.eofRecordEvaluatorsplitFetcherManager = eofRecordEvaluatorsplitFetcherManager; numRecordsInCounterthis.recordEmitter = context.metricGroup().getIOMetricGroup().getNumRecordsInCounter()recordEmitter; } } |
SingleThreadMultiplexSourceReaderBase
Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.
Code Block |
---|
@PublicEvolving public abstract class SingleThreadMultiplexSourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { @Depricated public SingleThreadMultiplexSourceReaderBase( this.splitStates = new HashMap<>(); this.options = new SourceReaderOptions(config); this.config FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,= config; this.context SingleThreadFetcherManager<E, SplitT> splitFetcherManager,= context; this.noMoreSplitsAssignment RecordEmitter<E, T, SplitStateT> recordEmitter,= false; this.eofRecordEvaluator = Configuration config,eofRecordEvaluator; numRecordsInCounter SourceReaderContext context) {= context.metricGroup().getIOMetricGroup().getNumRecordsInCounter(); } } |
SingleThreadMultiplexSourceReaderBase
Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance in SplitFetcherManager internally.
Code Block |
---|
@PublicEvolving public abstract class SingleThreadMultiplexSourceReaderBase<E, super(elementsQueueT, splitFetcherManager, recordEmitterSplitT extends SourceSplit, config, context); }SplitStateT> extends SourceReaderBase<E, T, SplitT, SplitStateT> { @Depricated public SingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<ESingleThreadFetcherManager<E, SplitT>>SplitT> splitReaderSuppliersplitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context) { super(elementsQueue, splitFetcherManager, recordEmitter, config, context); } @Depricated public SingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, new SingleThreadFetcherManager<>(elementsQueue, splitReaderSupplier, config)SplitT>> splitReaderSupplier, RecordEmitter<E, T, SplitStateT> recordEmitter, Configuration config, SourceReaderContext context); } // todo: Add new constructors without FutureCompletingBlockingQueue public SingleThreadMultiplexSourceReaderBase { super( Supplier<SplitReader<E, SplitT>> splitReaderSupplierelementsQueue, RecordEmitter<Enew SingleThreadFetcherManager<>(elementsQueue, TsplitReaderSupplier, SplitStateT> recordEmitterconfig), Configuration config, recordEmitter, SourceReaderContext context) { config, super( context); } // todo: Add new SingleThreadFetcherManager<>(splitReaderSupplier, config),,constructors without FutureCompletingBlockingQueue public SingleThreadMultiplexSourceReaderBase( Supplier<SplitReader<E, SplitT>> recordEmittersplitReaderSupplier, RecordEmitter<E, T, SplitStateT> configrecordEmitter, Configuration config, context); } // todo:SourceReaderContext providecontext) a{ new constructor without FutureCompletingBlockingQueue public SingleThreadMultiplexSourceReaderBase super( new SingleThreadFetcherManager<ESingleThreadFetcherManager<>(splitReaderSupplier, SplitT> splitFetcherManagerconfig),, RecordEmitter<E, T, SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluatorconfig, Configuration config, context); } // SourceReaderContexttodo: context)provide { a new constructor without FutureCompletingBlockingQueue public superSingleThreadMultiplexSourceReaderBase( SingleThreadFetcherManager<E, SplitT> splitFetcherManager, RecordEmitter<E, T, SplitStateT> recordEmitter, @Nullable RecordEvaluator<T> eofRecordEvaluator, Configuration eofRecordEvaluatorconfig, SourceReaderContext context) { config, super( context);splitFetcherManager, } } recordEmitter, eofRecordEvaluator, config, context); } } |
Proposed Changes
- By exposing the SplitFetcherManager / SingleThreadFetcheManager, connector developers can easily create their own threading models in the SourceReaderBase, by implementing addSplits(), removeSplits() and
maybeShutdownFinishedFetchers() functions.
- Note that the SplitFetcher constructor is package private, so users can only create SplitFetchers via SplitFetcherManager.createSplitFetcher(). This ensures each SplitFetcher is always owned by the SplitFetcherManager.
- This FLIP essentially embedded the element queue (a FutureCompletingBlockingQueue) instance into the SplitFetcherManager. This hides the element queue from the connector developers and simplifies the SourceReaderBase to consist of only SplitFetcherManager and RecordEmitter as major components.
...
SplitFetcherManager expose poll / getAvailabilityFuture / notifyAvailable / noAvailableElement
- Change SplitFetcherManager from Internal to PublicEvolving.
- Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance internally.
- Add a new internal method SplitFetcherManager#getQueue() for SourceReaderBase usage.
|
|
Configuration configuration) {
this
(splitReaderFactory, configuration, (ignore) -> {
});
}
// todo: provide a new constructor without FutureCompletingBlockingQueue.
public
SplitFetcherManager(
Supplier<SplitReader<E, SplitT>> splitReaderFactory,
Configuration configuration,
Consumer<Collection<String>> splitFinishedHook) {
this
.elementsQueue =
new
FutureCompletingBlockingQueue<>(
configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY));
// ......
}
@Internal
public
SplitFetcherManager#getQueue();
|
Reject Reason
SplitFetcherManager exposes 4 more methods, poll / getAvailabilityFuture / notifyAvailable / noAvailableElement, which are tightly coupled with the implementation of the elementQueue. The naming of these methods look weird, like what does it mean to "poll from a SplitFetcherManager" / "notify a SplitFetcherManager available"? To clarify these methods we have to explain to developers that "well we hide a queue inside SplitFetcherMamager and the poll method is actually polling from the queue". I'm afraid these methods will implicitly expose the concept and the implementation of the queue to developers.
...