...
Although the above two parts can be two separate KIPs, but since part 2 will be heavily depending on part 1 and may also potentially impact the semantic for the ListOffsetRequest v1, it might be better to discuss them together to make sure the wire protocol change can satisfy the various use cases on the consumer side.
Public Interfaces
Add ListOffsetRequest(ListOffsetRequest) v1
// ListOffsetRequest v1
ListOffsetRequest => ReplicaId [TopicName [Partition Time MaxNumberOfOffsets]]
ReplicaId => int32
TopicName => string
Partition => int32
Time => int64
MaxNumberOfOffsets => int32
// ListOffsetResponse v1
ListOffsetResponse => [TopicName [PartitionOffsets]]
PartitionOffsets => Partition ErrorCode Timestamp
[Offset] Partition => int32
ErrorCode => int16
Timestamp => int64
Offset => int
Add a new method to o.a.k.c.consumer.Consumer to allow user search offsets by timestamp.
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the * earliest offset whose timestamp is greater than or equals to the given offset in the corresponding partition. * * This is a blocking call. The consumer does not have to be assigned the partitions. * * @param timestampsToSearch the mapping from partition to the timestamp to look up. * @return For each partition, returns the timestamp and offset of the first message with timestamp greater than or equal to the target timestamp. */ Map<TopicPartition, TimestampOffset> offsetForTime(Map<TopicPartition, Long> timestampsToSearch); |
...