You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 8 Next »

Status

Current stateUnder Discussion

Discussion threadTo be added

JIRA: To be added

Released: Not released yet.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

There exists use-case where the users want to stop a Flink job gracefully based on the content of de-serialized records observed in the KafkaSource. For example, users might want to use a Flink job to process stock transaction data from an unbounded Kafka topic in real time. Suppose the stock market closes at 4 pm, users would want the Flink job to stop once the job has processed all the transaction data of that day. 

One intuitive way to support this use-case is to allow users to specify a lambda function that evaluates whether a de-serialized record indicates EOF for a given split. FlinkKafkaConsumer supports this solution by handling the EOF-detection logic specified in the KafkaDeserializationSchema::isEndOfStream(...). However, FlinkKafkaConsumer has been deprecated and it is supposed to be replaced by KafkaSource going forward. And KafkaSource currently can not address this use-case.

This FLIP aims to address this use-case so that users who currently depend on KafkaDeserializationSchema::isEndOfStream() can migrate their Flink job from FlinkKafkaConsumer to KafkaSource.

Public Interfaces

1) Adds the EofOnRecordEvaluator interface to the package org.apache.flink.connector.base.source.reader.

package org.apache.flink.connector.base.source.reader;

/** A interface that determines the end of stream for a split based the de-serialized record. */
@PublicEvolving
@FunctionalInterface
public interface EofOnRecordEvaluator<T> extends Serializable {
    /**
     * Determines whether to stop consuming from the current split based on the content of the
     * de-serialized record. The given record wouldn't be emitted from the source if the returned
     * result is true.
     *
     * @param record a de-serialized record from the split.
     * @return a boolean indicating whether the split has reached EOF.
     */
    boolean isEndOfStream(T record);
} 


2) Adds the following method to KafkaSourceBuilder 

public class KafkaSourceBuilder<OUT> {
    ... // Skip the existing methods

    /**
     * Sets the optional {@link EofOnRecordEvaluator eofOnRecordEvaluator} for KafkaSource.
     *
     * <p>When the evaluator is specified, it is invoked for each de-serialized record to determine
     * whether to stop consuming from the current split based. If a record is matched by the
     * evaluator, the source would not emit this record as well as the following records in the same
     * split.
     *
     * <p>Note that the evaluator works jointly with the stopping offsets specified by the {@link
     * #setBounded(OffsetsInitializer)} or the {@link #setUnbounded(OffsetsInitializer)}. The source
     * stops consuming from a split when any of these conditions is met.
     *
     * @param eofOnRecordEvaluator a {@link EofOnRecordEvaluator eofOnRecordEvaluator}
     * @return this KafkaSourceBuilder.
     */
    public KafkaSourceBuilder<OUT> setEofOnRecordEvaluator(
            EofOnRecordEvaluator<OUT> eofOnRecordEvaluator) {
        this.eofOnRecordEvaluator = eofOnRecordEvaluator;
        return this;
    }
} 

Proposed Changes

We expect user to specify the EOF-detection logic in an EofOnRecordEvaluator instance and pass this instance to KafkaSourceBuilder::setEofOnRecordEvaluator. Then KafkaSource would enforce the EOF-detection logic in the following way:

1) The EofOnRecordEvaluator would be passed from KafkaSource to KafkaSourceReader and SourceReaderBase.

2) SourceReaderBase would create a wrapper SourceOutput instance to intercept the records emitted by RecordEmitter. EofOnRecordEvaluator is invoked on every intercepted records. 

3) When a record is matched by EofOnRecordEvaluator, SourceReaderBase stops emitting records from this split and informs SplitFetcherManager to stop fetching from this split.


Note that the EofOnRecordEvaluator as well as the SourceReaderBase changes proposed above could also be used by other sources to detect EOF based on de-serialized records.

Compatibility, Deprecation, and Migration Plan

The APIs added in this FLIP is backward compatible with the existing KafkaSource.

The KafkaSource (added by FLIP-27) is not backward compatible with FlinkKafkaConsumer. This FLIP intends to provide a migration path for users to migrate from FlinkKafkaConsumer to KafkaSource.

Users who currently uses FlinkKafkaConsumer together with KafkaDeserializationSchema::isEndOfStream(...) can migrate to KafkaSource by moving the isEndOfStream(...) logic into the EofOnRecordEvaluator added in this FLIP.

Test Plan

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

1) Merge EofOnRecordEvaluator and stoppingOffsetsInitializer (currently provided via KafkaSourceBuilder's setBounded() or setUnbounded()) into one class.

For example, we can add a KafkaStopCursor class (similar to PulsarSource's StopCursor) which contains all possible stopping criteria (e.g. based on the offset, the de-serialized message, and the ConsumerRecord).

In comparison to the proposed approach, this alternative could provide a more concise and consolidated interface for users to specify the stopping criteria (i.e. via one KafkaSourceBuilder API).

  • This alternative has two disadvantages compared to the proposed approach:
    It requires backward compatible changes to KafkaSource and thus impose inconvenience on users. This is because we will need to replace KafkaSourceBuilder::setBounded(...) with the new API.
  • Users might want to stop the job based on the offset, de-serialized message, or both. The offset-based criteria should ideally be evaluated before the message is de-serialized for performance-reason; and criteria based on the de-serialized message should be evaluated after the message is de-serialized. Thus these criteria ideally should be evaluated at difference position in the code path. It could be awkward to put these logics in the same class.

  • KafkaStopCursor can not be shared with other source types because different sources have different raw message formats. For example, KafkaSource uses offset and ConsumerRecord, whereas PulsarSource uses MessageId and Message. In comparison, the EofOnRecordEvaluator proposed in this FLIP (as well as the proposed implementation changes in the SourceReaderBase) could be used by other sources (e.g. PulsarSource) to detect EOF based on de-serialized records.

  • No labels