Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • Add a CreateTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both CreateTime and LogAppendTime to ConsumerRecord.
    • The CreateTime is useful in use cases such as stream processing
    • The LogAppendTime is useful in use cases such as log rolling and log retention.

Use case discussion

Option discussions with use cases

This section

 Mirror maker

The broker does not distinguish mirror maker from other producers. The following example explains what will the timestamp look like when there is mirror maker in the picture.(CT - CreateTime, LAT - LogAppendTime)

...

To discuss the usage of CreateTime and LogAppendTime, it is useful to summarize the latency pattern of the messages flowing through the pipeline. The latency can be summarized to the following pattern:

  1. the messages flow through the pipeline with small latency.
  2. the messages flow through the pipeline with similar large latency.
  3. the messages flow through the pipeline with large latency difference.

Also it would be useful to think about the impact of a completely wrong timestamp set by client. i.e. the robustness of the system.

...

The latency measurement needs both CreateTime and LogAppendTime. The LogAppendTime does not need to be in the message. Broker now can have a latency metric in for each topic. This could be used for monitoring purpose as well as some application use cases.
 

Compatibility, Deprecation, and Migration Plan

...

For upgraded consumers, they can handle both V0 and V1.

Rejected Alternatives

Option 1 - Adding only LogAppendTime to the message

This proposal is pretty much the same as the selected proposal, except it does not include CreateTime in the message format.

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    LogAppendTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

The downside of this proposal are:

  1. If the CreateTime is not in the message itself. Application needs to include the timestamp in payload. Instead of asking each application to do this, it is better to include the timestamp in the message.
  2. The broker is not able to report the latency metric. While we could let the application to get the End2End latency,  we might lose the latency for each hop in the pipeline.

Option 2 - Adding only CreateTime to the message

While the time based log index has to be based on LogAppendTime, there is some concern about exposing the LogAppendTime (which is a broker internal concept) to user. So this approach include the following changes:

Wire protocol change

Change the MessageAndOffset format to:

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    CreateTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Build time based log index using LogAppendTime

The broker will still build time based index using LogAppendTime, LogAppendTime will be only to Kafka message.in the time index file, but not in message format. i.e. not exposed to user.

Change time based log retention and log rolling to use LogAppendTime in index file

The time based log retention and log rolling still needs to use LogAppendTime. Because the leader is source of truth for LogAppendTime, when followers fetch data from the leader, they have to replicate the time index file as well.

Therefore, this approach does not solve the time based log retention and log rolling issues which are the motivation of this KIP. We need to introduce separate wire protocol to propagate the log segment create time and last modified time among brokers. While it is doable, we feel the additional complication for replica fetching over weigh the concern of exposing the LogAppendTime to user, considering the LogAppendTime is still useful to clients in a few use cases.

Also, during a log recovery, the LogAppendTime in the time index will be almost the same. The LogAppendTime will be different from the actual time when the message arrives the brokers.