Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
package org.apache.flink.connector.base.source.reader;

/**
 * An interface that evaluates whether a de-serialized record should trigger certain control-flow
 * operations (e.g. end of stream).
 */
@PublicEvolving
@FunctionalInterface
public interface RecordEvaluator<T> extends Serializable {
    /**
     * Determines whether a record should trigger the end of stream for its split. The given record
     * wouldn't be emitted from the source if the returned result is true.
     *
     * @param record a de-serialized record from the split.
     * @return a boolean indicating whether the split has reached end of stream.
     */
    boolean isEndOfStream(T record);
}


2) Adds new constructors for SingleThreadMultiplexSourceReaderBase and SourceReaderBase.

The respective new constructors should accept a parameter "@Nullable RecordEvaluator<T> eofRecordEvaluator". When this eofRecordEvaluator is not null, it will be used to determine EOF.


Code Block
languagejava
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<
                E, T, SplitT extends SourceSplit, SplitStateT>
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {
    public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        super(...)
    }
}

@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
        implements SourceReader<T, SplitT> {
    public SourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
    }
}


3) Adds the following method to KafkaSourceBuilder 

Code Block
languagejava
public class KafkaSourceBuilder<OUT> {
    ... // Skip the existing methods

    /**
     * Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource.
     *
     * <p>When the evaluator is specified, it is invoked for each de-serialized record to determine
     * whether the corresponding split has reached end of stream. If a record is matched by the
     * evaluator, the source would not emit this record as well as the following records in the same
     * split.
     *
     * <p>Note that the evaluator works jointly with the stopping offsets specified by the {@link
     * #setBounded(OffsetsInitializer)} or the {@link #setUnbounded(OffsetsInitializer)}. The source
     * stops consuming from a split when any of these conditions is met.
     *
     * @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator}
     * @return this KafkaSourceBuilder.
     */
    public KafkaSourceBuilder<OUT> setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) {
        this.eofRecordEvaluator = eofRecordEvaluator;
        return this;
    }
} 


34) Adds the following method to PulsarSourceBuilder 

...