Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Page properties


Discussion threadhttps://lists.apache.org/thread/z87m68ggzkx0s427tmrllswm4l1g7owc
Vote threadhttps://lists.apache.org/thread/p51bjoyssm2ccx8sfyvtoll8oym15sy9
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25509

Release


...

Code Block
languagejava
public class PulsarSourceBuilder<OUT> {
    ... // Skip the existing methods

    /**
     * Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource.
     *
     * <p>When the evaluator is specified, it is invoked for each de-serialized record to determine
     * whether the corresponding split has reached end of stream. If a record is matched by the
     * evaluator, the source would not emit this record as well as the following records in the same
     * split.
     *
     * <p>Note that the evaluator works jointly with the stopping criteria specified by the {@link
     * #setBoundedStopCursor(StopCursor)} or the {@link #setUnboundedStopCursor(StopCursor)}.
     * The source stops consuming from a split when any of these conditions is met.
     *
     * @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator}
     * @return this KafkaSourceBuilder.
     */
    public PulsarSourceBuilder<OUT> setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) {
        this.eofRecordEvaluator = eofRecordEvaluator;
        return this;
    } 
} 


5) For SQL users, a new connector option 'scan.record.evaluator.class' is added to provide the custom RecordEvaluator class.

Proposed Changes

We expect user to specify the EOF-detection logic in a RecordEvaluator instance and pass this instance to KafkaSourceBuilder::setEofRecordEvaluator. Then KafkaSource would enforce the EOF-detection logic in the following way:

...

3) When a record is matched by RecordEvaluator::isEndOfStream(...), SourceReaderBase stops emitting records from this split and informs SplitFetcherManager to stop reading this split.4) For SQL users, a new connector option 'scan.record.evaluator.class' is added to provide the custom RecordEvaluator class.

Similar workflow can be made to support this feature for PulsarSource.

...