This proposes adding a method that would determine the offsets for a list of topic partitions based on offset and update the consumer assignment to seek to those offsets.

Status

Current state: Under Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Currently, if I wanted to subscribe to a topic at the offset closest to a specified timestamp, I'd need to construct a set of TopicPartition objects using information gathered from the KafkaConsumer#partitionsFor method, construct a Map<TopicPartition, Long> with the timestamps, and then call KafkaConsumer#offsetsForTime. Once I do that, id need to call KafkaConsumer#seek for each entry in the map.

Public Interfaces

  • Add method seekToOffsetsForTime(List<TopicPartition> partitions, Long timestampToStart)

Proposed Changes

The new method would first set the topic assignment, then use the offsetsForTimes method to generate  a Map<TopicPartition, OffsetAndTimestamp>, and would for each Entry in that map, seek to the offset specified.

Compatibility, Deprecation, and Migration Plan

  • This will change no existing code and will be backwards compatible.

Rejected Alternatives

The alternative is for clients to manually do this, seems error prone and unnecessarily cluttered.

  • No labels