Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We will provide unit tests to validate the proposed changes.

Rejected Alternatives

1) Merge EofOnRecordEvaluator and stoppingOffsetsInitializer (currently provided via KafkaSourceBuilder's setBounded() or setUnbounded()) into one class.

For example, we can add a KafkaStopCursor class (similar to PulsarSource's StopCursor) which contains all possible stopping criteria (e.g. based on the offset, the de-serialized message, and the ConsumerRecord).

In comparison to the proposed approach, this alternative could provide a more concise and consolidated interface for users to specify the stopping criteria (i.e. via one KafkaSourceBuilder API).

This alternative has two disadvantages compared to the proposed approach:
a) It requires backward compatible changes to KafkaSource and thus impose inconvenience on users. This is because we will need to replace KafkaSourceBuilder::setBounded(...) with the new API.

b) Users might want to stop the job based on the offset, de-serialized message, or both. The offset-based criteria should ideally be evaluated before the message is de-serialized for performance-reason; and criteria based on the de-serialized message should be evaluated after the message is de-serialized. Thus these criteria ideally should be evaluated at difference position in the code path. It could be awkward to put these logics in the same class.

c) KafkaStopCursor can not be shared with other source's implementation. For example, KafkaSource uses offset and ConsumerRecord, whereas PulsarSource uses MessageId and Message. In comparison, the EofOnRecordEvaluator proposed in this FLIP (as well as the proposed implementation changes in the SourceReaderBase) could be used by other sources (e.g. PulsarSource) to stop the split based on the de-serialized message.