...
Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: KAFKA-12541
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
In Kafka 2.7 the following method was added to AdminClient that provides offset and timestamp information for records at various positions in a partition:
|
This returns offsets and timestamps of records in the partitions that match the OffsetSpec.
OffsetSpec can be:
OffsetSpec.EarliestSpec - The first offset on the partition and a timestamp of -1
OffsetSpec.LatestSpec - The offset of the next message that will be appended to the log and a timestamp of -1
OffsetSpec.TimestampSpec - The earliest offset whose timestamp is greater than or equal to the given timestamp and the timestamp of that record.
This proposal adds an additional offset spec:
|
This returns the offset and timestamp corresponding to the record with the highest timestamp on the partition.
Public Interfaces
The only change to the public interfaces is to add the new offset spec in org.apache.kafka.clients.admin.OffsetSpec:
|
Proposed Changes
OffsetSpecs are mapped to Long values for timestamp fetches in org.apache.kafka.clients.admin.KafkaAdminClient:
|
this will be refactored to:
|
This corresponds to a new constant in org.apache.kafka.common.requests.ListOffsetRequest:
|
This follows the existing pattern of using negative timestamps to indicate the ends of the log
This new timestamp will be handled as normal all the way through the following calls
|
LogSegments track the highest timestamp and associated offset so we don't have to go to disk to fetch this. An extra path in this block: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1737 is added to handle the MAX_TIMESTAMP behaviour:
|
Note: an OffsetSpec of Latest actually returns the next offset for the partition (i.e. the one beyond the last committed one) whereas Max Timestamp is expected not to move beyond the end of the log. This means that even when messages are committed in timestamp order MAX_TIMESTAMP and LATEST will return different offsets, e.g. given 30 offsets on a partition in timestamp order:
OffsetSpec | Offset | Timestamp |
---|---|---|
MAX_TIMESTAMP | 30 | > 0 |
LATEST | 31 | -1 |
...