Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Status

...

Page properties


Discussion thread

...

https://lists.apache.org/thread/z87m68ggzkx0s427tmrllswm4l1g7owc
Vote threadhttps://lists.apache.org/thread/p51bjoyssm2ccx8sfyvtoll8oym15sy9
JIRA

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyFLINK-25509

Release


JIRA: To be added

Released: Not released yet.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Table of Contents

Motivation

There exists use-case where the users want to stop a Flink job gracefully based on the content of de-serialized records observed in the KafkaSource. For example, users might want to use a Flink job to process stock transaction data from an unbounded Kafka topic in real time. Suppose the stock market closes at 4 pm, users would want the Flink job to stop once the job has processed all the transaction data of that day. 

...

This FLIP aims to address this use-case so that users who currently depend on KafkaDeserializationSchema::isEndOfStream() can migrate their Flink job from FlinkKafkaConsumer to KafkaSource.

In order to minimize the feature gap between similar sources (e.g. Kafka and Pulsar), this FLIP also proposes to update PulsarSourceBuilder to support dynamically EOF.

Public Interfaces

1) Adds the EofOnRecordEvaluator RecordEvaluator interface to the package org.apache.flink.connector.base.source.reader.

Code Block
languagejava
package org.apache.flink.connector.base.source.reader;

/**
 * AAn interface that determines the end of stream for a split based the de-serialized record. evaluates whether a de-serialized record should trigger certain control-flow
 * operations (e.g. end of stream).
 */
@PublicEvolving
@FunctionalInterface
public interface EofOnRecordEvaluator<T>RecordEvaluator<T> extends Serializable {
    /**
     * Determines whether toa stoprecord consumingshould fromtrigger the end currentof splitstream basedfor onits thesplit. contentThe ofgiven therecord
     * de-serialized record. The given record wouldn't be emitted from the source if the returned
     * result is true.
     *
     * @param record a de-serialized record from the split.
     * @return a boolean indicating whether the split has reached end of EOFstream.
     */
    boolean isEndOfStream(T record);
}


2) Adds new constructors for SingleThreadMultiplexSourceReaderBase and SourceReaderBase.

The respective new constructors should accept a parameter "@Nullable RecordEvaluator<T> eofRecordEvaluator". When this eofRecordEvaluator is not null, it will be used to determine EOF.


Code Block
languagejava
@PublicEvolving
public abstract class SingleThreadMultiplexSourceReaderBase<
                E, T, SplitT extends SourceSplit, SplitStateT>
        extends SourceReaderBase<E, T, SplitT, SplitStateT> {
    public SingleThreadMultiplexSourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SingleThreadFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
        super(...)
    }
}

@PublicEvolving
public abstract class SourceReaderBase<E, T, SplitT extends SourceSplit, SplitStateT>
        implements SourceReader<T, SplitT> {
    public SourceReaderBase(
            FutureCompletingBlockingQueue<RecordsWithSplitIds<E>> elementsQueue,
            SplitFetcherManager<E, SplitT> splitFetcherManager,
            RecordEmitter<E, T, SplitStateT> recordEmitter,
            @Nullable RecordEvaluator<T> eofRecordEvaluator,
            Configuration config,
            SourceReaderContext context) {
    }
}


32) Adds the following method to KafkaSourceBuilder 

Code Block
languagejava
public class KafkaSourceBuilder<OUT> {
    ... // Skip the existing methods

    /**
     * Sets the optional {@link EofOnRecordEvaluatorRecordEvaluator eofOnRecordEvaluatoreofRecordEvaluator} for KafkaSource.
     *
     * <p>When the evaluator is specified, it is invoked for each de-serialized record to determine
     * whether tothe stopcorresponding consumingsplit fromhas thereached currentend splitof basedstream. If a record is matched by the
     * evaluator, the source would not emit this record as well as the following records in the same
     * split.
     *
     * <p>Note that the evaluator works jointly with the stopping offsets specified by the {@link
     * #setBounded(OffsetsInitializer)} or the {@link #setUnbounded(OffsetsInitializer)}. The source
     * stops consuming from a split when any of these conditions is met.
     *
     * @param eofOnRecordEvaluatoreofRecordEvaluator a {@link EofOnRecordEvaluatorRecordEvaluator eofOnRecordEvaluatorrecordEvaluator}
     * @return this KafkaSourceBuilder.
     */
    public KafkaSourceBuilder<OUT> setEofOnRecordEvaluator(
  setEofRecordEvaluator(RecordEvaluator<OUT> eofRecordEvaluator) {
        this.eofRecordEvaluator = eofRecordEvaluator;
        return this;
    }
} 


4) Adds the following method to PulsarSourceBuilder 

Code Block
languagejava
public class PulsarSourceBuilder<OUT> {
    ... // Skip the existing methods

    /**
     * Sets the optional {@link RecordEvaluator eofRecordEvaluator} for KafkaSource.
     *
     * <p>When the evaluator is specified, it is invoked for each de-serialized record to determine
     * whether the corresponding split has reached end of stream. If a record is matched by the
     * evaluator, the source would not emit this record as well as the following records in the same
     * split.
     *
     * <p>Note that the evaluator works jointly with the stopping criteria specified by the {@link
     * #setBoundedStopCursor(StopCursor)} or the {@link #setUnboundedStopCursor(StopCursor)}.
     * The source stops consuming from a split when any of these conditions is met.
     *
     * @param eofRecordEvaluator a {@link RecordEvaluator recordEvaluator}
     * @return this KafkaSourceBuilder.
     */
    public PulsarSourceBuilder<OUT> EofOnRecordEvaluator<OUT>setEofRecordEvaluator(RecordEvaluator<OUT> eofOnRecordEvaluatoreofRecordEvaluator) {
        this.eofOnRecordEvaluatoreofRecordEvaluator = eofOnRecordEvaluatoreofRecordEvaluator;
        return this;
    } 
} 


5) For SQL users, a new connector option 'scan.record.evaluator.class' is added to provide the custom RecordEvaluator class.

Proposed Changes

We expect user to specify the EOF-detection logic in an EofOnRecordEvaluator a RecordEvaluator instance and pass this instance to KafkaSourceBuilder::setEofOnRecordEvaluatorsetEofRecordEvaluator. Then KafkaSource would enforce the EOF-detection logic in the following way:

1) The EofOnRecordEvaluator RecordEvaluator would be passed from KafkaSource to KafkaSourceReader and SourceReaderBase.

2) SourceReaderBase would create a wrapper SourceOutput instance to intercept the records emitted by RecordEmitter. EofOnRecordEvaluator . RecordEvaluator::isEndOfStream(...) is invoked on every intercepted records. 

3) When a record is matched by EofOnRecordEvaluator, SourceReaderBase  RecordEvaluator::isEndOfStream(...), SourceReaderBase stops emitting records from this split and informs SplitFetcherManager to stop fetching from reading this split.

Similar workflow can be made to support this feature for PulsarSource.

Note that the RecordEvaluator as well as the SourceReaderBase changes proposed above could also be used by other sources to detect end-of-stream based on de-serialized records.

Compatibility, Deprecation, and Migration Plan

The APIs added in this FLIP is are backward compatible with the existing KafkaSource.

The KafkaSource (added by FLIP-27) is not backward compatible with FlinkKafkaConsumer. This FLIP intends to provide a improve the migration path for users to migrate from FlinkKafkaConsumer to KafkaSource.

Users who currently uses FlinkKafkaConsumer together with KafkaDeserializationSchema::isEndOfStream(...) can migrate to KafkaSource by moving the isEndOfStream(...) logic into the EofOnRecordEvaluator RecordEvaluator added in this FLIP.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

...

Merge

...

RecordEvaluator and stoppingOffsetsInitializer (currently provided via KafkaSourceBuilder's setBounded() or setUnbounded()) into one class.

For example, we can add a KafkaStopCursor class (similar to PulsarSource's StopCursor) which contains all possible stopping criteria (e.g. based on the offset, the de-serialized message, and the ConsumerRecord).

In comparison to the proposed approach, this alternative could provide a more concise and consolidated interface for users to specify the stopping criteria (i.e. via one KafkaSourceBuilder API).

This alternative has two the following disadvantages compared to the proposed approach:

a1) It requires introduces backward compatible incompatible changes to KafkaSource and thus impose inconvenience on users. This is because we will need to replace KafkaSourceBuilder::setBounded(...) with the new API.b) Users

2) KafkaStopCursor can not be shared with other source types because different sources have different raw message formats. For example, KafkaSource uses offset and ConsumerRecord, whereas PulsarSource uses MessageId and Message. In comparison, the RecordEvaluator proposed in this FLIP (as well as the proposed implementation changes in the SourceReaderBase) could be used by other sources (e.g. PulsarSource) to detect EOF based on de-serialized records.

3) The implementation of this alternative approach will likely be harder to maintain. Note that users might want to stop the job based on the offset, de-serialized message, or both. The offset-based criteria should ideally be evaluated before the message is records are de-serialized for performance-reason; and the criteria based on the de-serialized message record should be evaluated after the message record is de-serialized. Thus these criteria ideally should be evaluated at difference position in the code path. It could be awkward to put inconvenient to achieve this goal while having these logics in the same class.c) KafkaStopCursor can not be shared with other source's implementation. For example, KafkaSource uses offset and ConsumerRecord, whereas PulsarSource uses MessageId and Message. In comparison, the EofOnRecordEvaluator proposed in this FLIP (as well as the proposed implementation changes in the SourceReaderBase) could be used by other sources (e.g. PulsarSource) to stop the split based on the de-serialized message

Let user specify eofRecordEvaluator via StreamExecutionEnvironment::fromSource(...).withEofRecordEvaluator(...)

The advantage of this approach that the feature can be used by all connectors without we having to change the implementation (e.g. KafkaSourceBuilder, PulsarSourceBuilder) of individual connectors. Thus it improves the connector developers' experience.

The disadvantage of this approach is that it requires users to pass some source configuration via StreamExecutionEnvironment::fromSource(...) and some other source configuration via e.g. KafkaSourceBuilder(...). This might create a sense of inconsistency/confusion for connectors' users.

Given that the number of connector users are much more than the number of connector developers, it is probably better to optimize the user experience in this case.