...
Discussion thread | https://lists.apache.org/thread/b8f509878ptwl3kmmgg95tv8sb1j5987 | ||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|
Vote thread | https://lists.apache.org/thread/t1zff21z440pvv48jyhm8pgtqsyplchn | ||||||||||
JIRA |
| ||||||||||
Release | 1.19 |
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block |
---|
@PublicEvolving public abstract class SplitFetcherManager<E, SplitT extends SourceSplit> { @Deprecated public SplitFetcherManagerSingleThreadMultiplexSourceReaderBase( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueueSupplier<SplitReader<E, SplitT>> splitReaderSupplier, Supplier<SplitReader<ERecordEmitter<E, T, SplitT>>SplitStateT> splitReaderFactoryrecordEmitter, Configuration configuration) { config, this(elementsQueue, splitReaderFactory, configuration,SourceReaderContext (ignorecontext) -> { });super( } @Deprecated @VisibleForTesting public SplitFetcherManager( new SingleThreadFetcherManager<>(splitReaderSupplier, config), FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueuerecordEmitter, Supplier<SplitReader<E, SplitT>> splitReaderFactoryconfig, Configuration configuration, context); } @Deprecated Consumer<Collection<String>> splitFinishedHook) { } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager(public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration) { this(elementsQueue, splitReaderFactory, configuration, (ignore) -> { }); } @Deprecated } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SplitFetcherManager( @VisibleForTesting public SplitFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration, Consumer<Collection<String>> splitFinishedHook) { } // todo: provide a new constructor without thisFutureCompletingBlockingQueue.elementsQueue = new FutureCompletingBlockingQueue<>( public SplitFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderFactory, Configuration configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)); // ...... } @Internal public FutureCompletingBlockingQueue getQueue(); } |
SingleThreadFetcherManager
- Change SingleThreadFetcherManager from Internal to PublicEvolving.
- Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors without FutureCompletingBlockingQueue as replacements which creates the FutureCompletingBlockingQueue instance by its parent class(SplitFetcherManager) internally.
Code Block |
---|
@PublicEvolving public class SingleThreadFetcherManager<E, SplitT extends SourceSplit> ) { this(splitReaderFactory, configuration, (ignore) -> { }); extends SplitFetcherManager<E, SplitT> { } // todo: provide a @Deprecated new constructor without FutureCompletingBlockingQueue. public public SingleThreadFetcherManagerSplitFetcherManager( Supplier<SplitReader<E, FutureCompletingBlockingQueue<RecordsWithSplitIds<E>>SplitT>> elementsQueuesplitReaderFactory, Configuration Supplier<SplitReader<E, SplitT>> splitReaderSupplierconfiguration, Consumer<Collection<String>> splitFinishedHook) { this(.elementsQueue, splitReaderSupplier,= new ConfigurationFutureCompletingBlockingQueue<>()); } @Deprecated public SingleThreadFetcherManager( configuration.getInteger(SourceReaderOptions.ELEMENT_QUEUE_CAPACITY)); // ...... } @Internal public FutureCompletingBlockingQueue getQueue(); } |
SingleThreadFetcherManager
- Change SingleThreadFetcherManager from Internal to PublicEvolving.
- Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors without FutureCompletingBlockingQueue as replacements which creates the FutureCompletingBlockingQueue instance by its parent class(SplitFetcherManager) internally.
Code Block |
---|
@PublicEvolving public class SingleThreadFetcherManager<E, SplitT extends SourceSplit> FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration) extends SplitFetcherManager<E, SplitT> { @Deprecated public super(elementsQueue, splitReaderSupplier, configuration);SingleThreadFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier) { } @Deprecated this(elementsQueue, splitReaderSupplier, new Configuration()); @VisibleForTesting} @Deprecated public SingleThreadFetcherManager( FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration,) { Consumer<Collection<String>> splitFinishedHook) { supersuper(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook); } // todo: provide@Deprecated a new constructor without FutureCompletingBlockingQueue.@VisibleForTesting public SingleThreadFetcherManager( Supplier<SplitReader<EFutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue, SplitT>> splitReaderSupplier) { this(splitReaderSupplierSupplier<SplitReader<E, new Configuration());SplitT>> splitReaderSupplier, } // todo: provide a new constructor without FutureCompletingBlockingQueue. public SingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderSupplier,Configuration configuration, ConfigurationConsumer<Collection<String>> configurationsplitFinishedHook) { super(elementsQueue, splitReaderSupplier, configuration, splitFinishedHook); } // todo: provide a new constructor without FutureCompletingBlockingQueuewithout FutureCompletingBlockingQueue. public SingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderSupplier,) { super(splitReaderSupplier, new Configuration()); } // todo: provide a new constructor Configuration configurationwithout FutureCompletingBlockingQueue. public SingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Consumer<Collection<String>>Configuration splitFinishedHookconfiguration) { super(splitReaderSupplier, configuration, splitFinishedHook); } } |
SplitFetcherTask
Change SplitFetcherTask from Internal to PublicEvolving.
Code Block |
---|
@PublicEvolving
public interface SplitFetcherTask {} |
SplitFetcher
Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher, so it can only created by org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#createSplitFetcher
Code Block |
---|
@PublicEvolving public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { SplitFetcher( // todo: provide a new constructor without FutureCompletingBlockingQueue. public SingleThreadFetcherManager( Supplier<SplitReader<E, SplitT>> splitReaderSupplier, Configuration configuration, int id, Consumer<Collection<String>> splitFinishedHook) { FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,super(splitReaderSupplier, configuration, splitFinishedHook); } } |
SplitFetcherTask
Change SplitFetcherTask from Internal to PublicEvolving.
Code Block |
---|
@PublicEvolving
public interface SplitFetcherTask {} |
SplitFetcher
Change SplitFetcher from Internal to PublicEvolving, but still not to expose the construct of SplitFetcher, so it can only created by org.apache.flink.connector.base.source.reader.fetcher.SplitFetcherManager#createSplitFetcher
Code Block |
---|
@PublicEvolving public class SplitFetcher<E, SplitT extends SourceSplit> implements Runnable { SplitFetcher( int id SplitReader<E, SplitT> splitReader, Consumer<Throwable> errorHandler, Runnable shutdownHook, Consumer<Collection<String>>FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> splitFinishedHookelementsQueue, boolean SplitReader<E, SplitT> splitReader, Consumer<Throwable> errorHandler, Runnable shutdownHook, Consumer<Collection<String>> splitFinishedHook, boolean allowUnalignedSourceSplits); } |
...
- Change SplitFetcherManager from Internal to PublicEvolving.
- Deprecate the old constructor exposing the FutureCompletingBlockingQueue, and add new constructors as replacements which creates the FutureCompletingBlockingQueue instance internally.
- Add a new internal method SplitFetcherManager#getQueue() for SourceReaderBase usage.internal method SplitFetcherManager#getQueue() for SourceReaderBase usage.
|
|
Reject Reason
SplitFetcherManager exposes 4 more methods, poll / getAvailabilityFuture / notifyAvailable / noAvailableElement, which are tightly coupled with the implementation of the elementQueue. The naming of these methods look weird, like what does it mean to "poll from a SplitFetcherManager" / "notify a SplitFetcherManager available"? To clarify these methods we have to explain to developers that "well we hide a queue inside SplitFetcherMamager and the poll method is actually polling from the queue". I'm afraid these methods will implicitly expose the concept and the implementation of the queue to developers.
...