...
- Add a CreateTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
- Add both CreateTime and LogAppendTime to ConsumerRecord.
- The CreateTime is useful in use cases such as stream processing
- The LogAppendTime is useful in use cases such as log rolling and log retention.
Use case discussion
Option discussions with use cases
This section
Mirror maker
The broker does not distinguish mirror maker from other producers. The following example explains what will the timestamp look like when there is mirror maker in the picture.(CT - CreateTime, LAT - LogAppendTime)
...
To discuss the usage of CreateTime and LogAppendTime, it is useful to summarize the latency pattern of the messages flowing through the pipeline. The latency can be summarized to the following pattern:
- the messages flow through the pipeline with small latency.
- the messages flow through the pipeline with similar large latency.
- the messages flow through the pipeline with large latency difference.
Also it would be useful to think about the impact of a completely wrong timestamp set by client. i.e. the robustness of the system.
...
The latency measurement needs both CreateTime and LogAppendTime. The LogAppendTime does not need to be in the message. Broker now can have a latency metric in for each topic. This could be used for monitoring purpose as well as some application use cases.
Compatibility, Deprecation, and Migration Plan
...
For upgraded consumers, they can handle both V0 and V1.
Rejected Alternatives
Option 1 - Adding only LogAppendTime to the message
This proposal is pretty much the same as the selected proposal, except it does not include CreateTime in the message format.
Code Block | ||
---|---|---|
| ||
MessageAndOffset => MessageSize Offset Message
MessageSize => int32
Offset => int64
Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
Crc => int32
MagicByte => int8
Attributes => int8
LogAppendTime => int64 <---------------------- NEW
KeyLength => int32
Key => bytes
ValueLength => int32
Value => bytes
|
The downside of this proposal are:
- If the CreateTime is not in the message itself. Application needs to include the timestamp in payload. Instead of asking each application to do this, it is better to include the timestamp in the message.
- The broker is not able to report the latency metric. While we could let the application to get the End2End latency, we might lose the latency for each hop in the pipeline.
Option 2 - Adding only CreateTime to the message
While the time based log index has to be based on LogAppendTime, there is some concern about exposing the LogAppendTime (which is a broker internal concept) to user. So this approach include the following changes:
Wire protocol change
Change the MessageAndOffset format to:
Code Block | ||
---|---|---|
| ||
MessageAndOffset => MessageSize Offset Message
MessageSize => int32
Offset => int64
Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
Crc => int32
MagicByte => int8
Attributes => int8
CreateTime => int64 <---------------------- NEW
KeyLength => int32
Key => bytes
ValueLength => int32
Value => bytes
|
Build time based log index using LogAppendTime
The broker will still build time based index using LogAppendTime, LogAppendTime will be only to Kafka message.in the time index file, but not in message format. i.e. not exposed to user.
Change time based log retention and log rolling to use LogAppendTime in index file
The time based log retention and log rolling still needs to use LogAppendTime. Because the leader is source of truth for LogAppendTime, when followers fetch data from the leader, they have to replicate the time index file as well.
Therefore, this approach does not solve the time based log retention and log rolling issues which are the motivation of this KIP. We need to introduce separate wire protocol to propagate the log segment create time and last modified time among brokers. While it is doable, we feel the additional complication for replica fetching over weigh the concern of exposing the LogAppendTime to user, considering the LogAppendTime is still useful to clients in a few use cases.
Also, during a log recovery, the LogAppendTime in the time index will be almost the same. The LogAppendTime will be different from the actual time when the message arrives the brokers.