Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


...

...

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25509

...

Release


Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
public class PulsarSourceBuilder<OUT> {
    ... // Skip the existing methods

    /**
     * Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource.
     *
     * <p>When the evaluator is specified, it is invoked for each de-serialized record to determine
     * whether the corresponding split has reached end of stream. If a record is matched by the
     * evaluator, the source would not emit this record as well as the following records in the same
     * split.
     *
     * <p>Note that the evaluator works jointly with the stopping criteria specified by the {@link
     * #setBoundedStopCursor(StopCursor)} or the {@link #setUnboundedStopCursor(StopCursor)}.
     * The source stops consuming from a split when any of these conditions is met.
     *
     * @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator}
     * @return this KafkaSourceBuilder.
     */
    public PulsarSourceBuilder<OUT> setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) {
        this.eofRecordEvaluator = eofRecordEvaluator;
        return this;
    } 
} 


5) For SQL users, a new connector option 'scan.record.evaluator.class' is added to provide the custom RecordEvaluator class.

Proposed Changes

We expect user to specify the EOF-detection logic in a RecordEvaluator instance and pass this instance to KafkaSourceBuilder::setEofRecordEvaluator. Then KafkaSource would enforce the EOF-detection logic in the following way:

...

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

...

Merge RecordEvaluator and stoppingOffsetsInitializer (currently provided via KafkaSourceBuilder's setBounded() or setUnbounded()) into one class.

For example, we can add a KafkaStopCursor class (similar to PulsarSource's StopCursor) which contains all possible stopping criteria (e.g. based on the offset, the de-serialized message, and the ConsumerRecord).

...

This alternative has the following disadvantages compared to the proposed approach:

a1) It introduces backward incompatible changes to KafkaSource. This is because we will need to replace KafkaSourceBuilder::setBounded(...) with the new API.

b2) KafkaStopCursor can not be shared with other source types because different sources have different raw message formats. For example, KafkaSource uses offset and ConsumerRecord, whereas PulsarSource uses MessageId and Message. In comparison, the RecordEvaluator proposed in this FLIP (as well as the proposed implementation changes in the SourceReaderBase) could be used by other sources (e.g. PulsarSource) to detect EOF based on de-serialized records.

c3) The implementation of this alternative approach will likely be harder to maintain. Note that users might want to stop the job based on the offset, de-serialized message, or both. The offset-based criteria should ideally be evaluated before records are de-serialized for performance-reason; and the criteria based on the de-serialized record should be evaluated after the record is de-serialized. Thus these criteria ideally should be evaluated at difference position in the code path. It could be inconvenient to achieve this goal while having these logics in the same class.

...

Let user specify eofRecordEvaluator via StreamExecutionEnvironment::fromSource(...).withEofRecordEvaluator(...)

The advantage of this approach that the feature can be used by all connectors without we having to change the implementation (e.g. KafkaSourceBuilder, PulsarSourceBuilder) of individual connectors. Thus it improves the connector developers' experience.

...