Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Although the above two parts can be two separate KIPs, but since part 2 will be heavily depending on part 1 and may also potentially impact the semantic for the ListOffsetRequest v1, it might be better to discuss them together to make sure the wire protocol change can satisfy the various use cases on the consumer side.

Public Interfaces

Add ListOffsetRequest(ListOffsetRequest) v1

// ListOffsetRequest v1 
ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]] 
  ReplicaId => int32 
  TopicName => string 
  Partition => int32 
  Time => int64
  MaxNumberOfOffsets => int32 

// ListOffsetResponse v1 
ListOffsetResponse => [TopicName [PartitionOffsets]] 
  PartitionOffsets => Partition ErrorCode Timestamp [Offset]
  Partition => int32 
  ErrorCode => int16 
  Timestamp => int64 
  Offset => int

Add a new method to o.a.k.c.consumer.Consumer to allow user search offsets by timestamp.

Code Block
languagejava
titleoffsetForTime() method
/**
 * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the 
 * earliest offset whose timestamp is greater than or equals to the given offset in the corresponding partition.
 * 
 * This is a blocking call. The consumer does not have to be assigned the partitions.
 *
 * @param timestampsToSearch the mapping from partition to the timestamp to look up.
 * @return For each partition, returns the timestamp and offset of the first message with timestamp greater 
than or equal to the target timestamp.
 */
Map<TopicPartition, TimestampOffset> offsetForTime(Map<TopicPartition, Long> timestampsToSearch);

...