This proposes adding a method that would determine the offsets for a list of topic partitions based on offset and update the consumer assignment to seek to those offsets.
Status
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here [Change the link from KAFKA-1 to your own ticket]
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Currently, if I wanted to subscribe to a topic at the offset closest to a specified timestamp, I'd need to construct a set of TopicPartition objects using information gathered from the KafkaConsumer#partitionsFor method, construct a Map<TopicPartition, Long> with the timestamps, and then call KafkaConsumer#offsetsForTime. Once I do that, id need to call KafkaConsumer#seek for each entry in the map.
Public Interfaces
- Add method seekToOffsetsForTime(List<TopicPartition> partitions, Long timestampToStart)
Proposed Changes
The new method would first set the topic assignment, then use the offsetsForTimes method to generate a Map<TopicPartition, OffsetAndTimestamp>, and would for each Entry in that map, seek to the offset specified.
Compatibility, Deprecation, and Migration Plan
- This will change no existing code and will be backwards compatible.
Rejected Alternatives
The alternative is for clients to manually do this, seems error prone and unnecessarily cluttered.