THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||
---|---|---|
| ||
package org.apache.flink.connector.base.source.reader; /** * An interface that evaluates whether a de-serialized record should trigger certain control-flow * operations (e.g. end of stream). */ @PublicEvolving @FunctionalInterface public interface RecordEvaluator<T> extends Serializable { /** * Determines whether a record should trigger the end of stream for its split. The given record * wouldn't be emitted from the source if the returned result is true. * * @param record a de-serialized record from the split. * @return a boolean indicating whether the split has reached end of stream. */ boolean isEndOfStream(T record); } |
2) Adds new constructors for SingleThreadMultiplexSourceReaderBase and SourceReaderBase.
The respective new constructors should accept a parameter "@Nullable RecordEvaluator<T> eofRecordEvaluator". When this eofRecordEvaluator is not null, it will be used to determine EOF.
Code Block | ||
---|---|---|
| ||
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<
E, T, SplitT extends SourceSplit, SplitStateT>
extends SourceReaderBase<E, T, SplitT, SplitStateT> {
public SingleThreadMultiplexSourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context) {
super(...)
}
}
@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
implements SourceReader<T, SplitT> {
public SourceReaderBase(
FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
SplitFetcherManager<E, SplitT> splitFetcherManager,
RecordEmitter<E, T, SplitStateT> recordEmitter,
@Nullable RecordEvaluator<T> eofRecordEvaluator,
Configuration config,
SourceReaderContext context) {
}
} |
3) Adds the following method to KafkaSourceBuilder
Code Block | ||
---|---|---|
| ||
public class KafkaSourceBuilder<OUT> { ... // Skip the existing methods /** * Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource. * * <p>When the evaluator is specified, it is invoked for each de-serialized record to determine * whether the corresponding split has reached end of stream. If a record is matched by the * evaluator, the source would not emit this record as well as the following records in the same * split. * * <p>Note that the evaluator works jointly with the stopping offsets specified by the {@link * #setBounded(OffsetsInitializer)} or the {@link #setUnbounded(OffsetsInitializer)}. The source * stops consuming from a split when any of these conditions is met. * * @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator} * @return this KafkaSourceBuilder. */ public KafkaSourceBuilder<OUT> setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) { this.eofRecordEvaluator = eofRecordEvaluator; return this; } } |
34) Adds the following method to PulsarSourceBuilder
...