THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Look up the offsets for the given partitions by timestamp. The returned offset for each partition is the * earliest offset whose timestamp is greater than or equals to the given timestamp in the corresponding partition. * * This is a blocking call. The consumer does not have to be assigned the partitions. * * @param timestampsToSearch the mapping from partition to the timestamp to look up. * @return For each partition, returns the timestamp and offset of the first message with timestamp greater * than or equal to the target timestamp. */ Map<TopicPartition, TimestampOffset>OffsetAndTimestamp> offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch); /** * Get the earliest available offsets for the given partitions. * * @param partitions the partitions to get the earliest offsets. * @return The earliest available offsets for the given partitions */ Map<TopicPartition, Long> earliestOffsets(Set<TopicPartition> partitions); /** * Get the latest offsets for the given partitions. * * @param partitions the partitions to get the latest offsets. * @return The latest available offsets for the given partitions. */ Map<TopicPartition, Long> latestOffsets(Set<TopicPartition> partitions); public class TimestampOffsetOffsetAndTimestamp { private final long timestamp; private final long offset; public TimestampOffsetOffsetAndTimestamp(long timestampoffset, long offsettimestamp) { this.timestamp = timestamp; this.offset = offset; } public long timestamp() { return timestamp; } public long offset() { return offset; } } |
...
Implementation wise, we will migrate to o.a.k.common.requests.ListOffsetRequest class on the broker side.
Because Kafka request schema uses an Array
to represent a Map
, it is possible that users form a ListOffsetRequest
that contains duplicate partitions with differen timestamps. In this case, the broker will return an InvalidRequestException(error code 42) for that partition in the ListOffsetResponse.
offsetsForTimes() method in Consumer
...