Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

Current stateUnder Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

There exists use-case where the users want to stop a Flink job gracefully based on the content of de-serialized records observed in the KafkaSource. For example, users might want to use a Flink job to process stock transaction data from an unbounded Kafka topic in real time. Suppose the stock market closes at 4 pm, users would want the Flink job to stop once the job has processed all the transaction data of that day. 

...

This FLIP aims to address this use-case so that users who currently depend on KafkaDeserializationSchema::isEndOfStream() can migrate their Flink job from FlinkKafkaConsumer to KafkaSource.

In order to minimize the feature gap between similar sources (e.g. Kafka and Pulsar), this FLIP also proposes to update PulsarSourceBuilder to support dynamically EOF.

Public Interfaces

1) Adds the RecordEvaluator interface to the package org.apache.flink.connector.base.source.reader.

...

Code Block
languagejava
public class KafkaSourceBuilder<OUT> {
    ... // Skip the existing methods

    /**
     * Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource.
     *
     * <p>When the evaluator is specified, it is invoked for each de-serialized record to determine
     * whether the corresponding split has reached end of stream. If a record is matched by the
     * evaluator, the source would not emit this record as well as the following records in the same
     * split.
     *
     * <p>Note that the evaluator works jointly with the stopping offsets specified by the {@link
     * #setBounded(OffsetsInitializer)} or the {@link #setUnbounded(OffsetsInitializer)}. The source
     * stops consuming from a split when any of these conditions is met.
     *
     * @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator}
     * @return this KafkaSourceBuilder.
     */
    public KafkaSourceBuilder<OUT> setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) {
        this.eofRecordEvaluator = eofRecordEvaluator;
        return this;
    }
} 


3) Adds the following method to PulsarSourceBuilder 

Code Block
languagejava
public class PulsarSourceBuilder<OUT> {
    ... // Skip the existing methods

    /**
     * Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource.
     *
     * <p>When the evaluator is specified, it is invoked for each de-serialized record to determine
     * whether the corresponding split has reached end of stream. If a record is matched by the
     * evaluator, the source would not emit this record as well as the following records in the same
     * split.
     *
     * <p>Note that the evaluator works jointly with the stopping criteria specified by the {@link
     * #setBoundedStopCursor(StopCursor)} or the {@link #setUnboundedStopCursor(StopCursor)}.
     * The source stops consuming from a split when any of these conditions is met.
     *
     * @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator}
     * @return this KafkaSourceBuilder.
     */
    public PulsarSourceBuilder<OUT> setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) {
        this.eofRecordEvaluator = eofRecordEvaluator;
        return this;
    } 
} 

Proposed Changes

We expect user to specify the EOF-detection logic in a RecordEvaluator instance and pass this instance to KafkaSourceBuilder::setEofRecordEvaluator. Then KafkaSource would enforce the EOF-detection logic in the following way:

...

3) When a record is matched by RecordEvaluator::isEndOfStream(...), SourceReaderBase stops emitting records from this split and informs SplitFetcherManager to stop reading this split.

Similar workflow can be made to support this feature for PulsarSource.

Note that the RecordEvaluator as well as the SourceReaderBase changes proposed above could also be used by other sources to detect end-of-stream based on de-serialized records.

Compatibility, Deprecation, and Migration Plan

The APIs added in this FLIP are backward compatible with the existing KafkaSource.

...

Users who currently uses FlinkKafkaConsumer together with KafkaDeserializationSchema::isEndOfStream(...) can migrate to KafkaSource by moving the isEndOfStream(...) logic into the RecordEvaluator added in this FLIP.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

1) Merge RecordEvaluator and stoppingOffsetsInitializer (currently provided via KafkaSourceBuilder's setBounded() or setUnbounded()) into one class.

...