Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-12541
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Confirming topic/partition "liveness" (i.e. whether a topic is being actively written to) can currently be difficult in Kafka. Approaches include:
- consuming from the partition - this is heavyweight for an indicator.
- using metrics - this can be hard to access remotely.
- monitoring changes in offsets - this can be difficult with partitions that are not often written to.
In a lot of cases the highest timestamp on the partition provides a good indication of "liveness" and the AdminClient provides an existing good interface to collect this information.
Note: this proposal returns the highest message timestamp but this may not correspond to the time at which the latest message was produced. Replication tooling such a MM2 may preserve the source cluster timestamp in the message. However, it's understood that this will provide a good indication in most cases.
In Kafka 2.7 the following method was added to AdminClient that provides offset and timestamp information for records at various positions in a partition:
|
This returns offsets and timestamps of records in the partitions that match the OffsetSpec.
OffsetSpec can be:
OffsetSpec.EarliestSpec - The first offset on the partition and a timestamp of -1
OffsetSpec.LatestSpec - The offset of the next message that will be appended to the log and a timestamp of -1
OffsetSpec.TimestampSpec - The earliest offset whose timestamp is greater than or equal to the given timestamp and the timestamp of that record.
This proposal adds an additional offset spec:
|
This returns the offset and timestamp corresponding to the record with the highest timestamp on the partition.
Public Interfaces
We will add the new offset spec in org.apache.kafka.clients.admin.OffsetSpec:
|
We will need to bump the ListOffsets API version to do this to ensure that requests made to earlier brokers that do not understand this specification are failed. This would be enforced in ListOffsetsRequest.Builder with something similar to the below (where 3 is the new version):
|
Proposed Changes
OffsetSpecs are mapped to Long values for timestamp fetches in org.apache.kafka.clients.admin.KafkaAdminClient:
|
this will be refactored to:
|
This corresponds to a new constant in org.apache.kafka.common.requests.ListOffsetRequest:
|
This follows the existing pattern of using negative timestamps to indicate the ends of the log
This new timestamp will be handled as normal all the way through the following calls
|
LogSegments track the highest timestamp and associated offset so we don't have to go to disk to fetch this. An extra path in this block: https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/log/Log.scala#L1737 is added to handle the MAX_TIMESTAMP behaviour:
|
Note: an OffsetSpec of Latest actually returns the next offset for the partition (i.e. the one beyond the last committed one) whereas Max Timestamp is expected not to move beyond the end of the log. This means that even when messages are committed in timestamp order MAX_TIMESTAMP and LATEST will return different offsets, e.g. given 30 offsets on a partition in timestamp order:
OffsetSpec | Offset | Timestamp |
---|---|---|
MAX_TIMESTAMP | 30 | > 0 |
LATEST | 31 | -1 |
Compatibility, Deprecation, and Migration Plan
The only behaviour changes created by this KIP are for requests to AdminClient.listOffsets that are looking for a timestamp of -3L. As this is not a valid timestamp there should be no migration, compatibility or deprecation concerns. As mentioned in public interfaces,
Rejected Alternatives
- Create a new API to retrieve the maximum timestamp record information, e.g. AdminClient.getMaxTimestampAndOffset(TopicPartition topicpartition). Rejected as AdminClient.listOffsets is already a good match for this functionality.