Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP tries to address the following issues in Kafka.

...

This KIP is preferably to be implemented with KIP-31 if possible to avoid changing wire protocol twice.

Public Interfaces

There are a few options to achieve the goals. Each has their own pros and cons. Please see the details below.

Proposed Changes

There are three options proposed before Option 4 is proposedthis proposal. The details of option 1, option 2 and Option 3 are in the Rejected Options section.

...

Add a

...

Timestamp field to the message format with maximum allowed time difference configuration on broker.

After extended discussion over option 1, option 2 and option 3. It turns out to be very difficult to meet all the following requirements at the same time:

...

The following changes are needed to implement the above proposal.

Wire protocol change - add a Time field to the message format

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Timestamp => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Add a time field to both ProducerRecord and ConsumerRecord

  • If user specify the timestamp for a ProducerRecord, the ProducerRecord will be sent with this timestamp.
  • If user does not specify the timestamp for a ProducerRecord, the producer stamp the ProducerRecord with current time.
  • ConsumerRecord will have the timestamp of the message that were stored on broker.

Broker configuration change

add the following two configurations to broker:

...

I.e. by default the server will not override any user timestamp.

Build Time Index for messages based on message timestamp (will be implemented in KIP-33)

This section will be implemented in KIP-33. It is listed here because it is closely related to the design of this KIP. Please see the details in KIP-33.

Change time based log retention and log rolling to base on the time index (will be implemented in KIP-33)

This section will be implemented in KIP-33. It is listed here because it is closely related to the design of this KIP. Please see the details in KIP-33.

Compatibility, Deprecation, and Migration Plan

NOTE: This part is drafted based on the assumption that KIP-31 and KIP-32 will be implemented in one patch.

...

In step 2, it is recommended to put only small amount of leaders on the broker, because at that point the broker needs to do down conversion for all the fetch requests.

Rejected Alternatives

Option 1 - Add both CreateTime and LogAppendTime to message format

Wire protocol change

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    CreateTime => int64    <---------------------- NEW
    LogAppendTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Add CreateTime and LogAppendTime field to message

  • CreateTime

    • CreateTime will be set by the producer and will not be changed afterward.
    • CreateTime accuracy is millisecond.
    • For compressed message, the CreateTime of the wrapper message will be the CreateTime of the first compressed message.
  • LogAppendTime
    • The LogAppendTime will be assigned by broker upon receiving the message. If the message is coming from mirror maker, the original CreateTime will be maintained but the LogAppendTime will be changed by the target broker.
    • The LogAppendTime will be used to build the log index.
    • The LogAppendTime accuracy is millisecond
    • The LogAppendTime of the outer message of compressed messages will be the latest LogAppendTime of all its inner messages.
      • If the compressed message is not compacted, the relative offsets of inner messages will be contiguous and share the same LogAppendTime.
      • If the compressed message is compacted, the relative offsets of inner messages may not be contiguous. Its LogAppendTime will be the LogAppendTime of the last inner message.
    • The followers will not reassign LogAppendTime but simply update an in memory lastAppendedLogAppendTime and append the message to the log.
    • To handle leader migration where new leader has slower clock than old leader, all the leader should append max(lastAppendedTimestamp, currentTimeMillis) as the timestamp

...

To be more general, when a message is appended, broker (whether leader or follower) should remember the timestamp of the last appended message, if a later appended message has a timestamp earlier than the timestamp of last appended message, the timestamp of last appended message will be used.

Change time based log rolling and retention to use LogAppendTime

Time based log rolling and retention currently use the file creation time and last modified time. This does not work for newly created replicas because the time attributes of files is different from the true message append time. The following changes will address this issue.

  • The time based log rolling will be based on the timestamp of the first message in the log segment file.
  • The time based log retention will be based on the timestamp of the last message in the log segment file.

ConsumerRecord / ProducerRecord format change

  • Add a CreateTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both CreateTime and LogAppendTime to ConsumerRecord.
    • The CreateTime is useful in use cases such as stream processing
    • The LogAppendTime is useful in use cases such as log rolling and log retention.

...

  1. It introduces 16 bytes overhead to the message.
  2. It exposes LogAppendTime to users.

Option 2 - Adding only LogAppendTime to the message

This proposal is pretty much the same as the selected proposal, except it does not include CreateTime in the message format.

...

  1. If the CreateTime is not in the message itself. Application needs to include the timestamp in payload. Instead of asking each application to do this, it is better to include the timestamp in the message.
  2. The broker is not able to report the latency metric. While we could let the application to get the End2End latency,  we might lose the latency for each hop in the pipeline.

Option 3 - Add CreateTime to the message and use LogAppendTime on brokers

Add CreateTime field to message

  • CreateTime

    • CreateTime will be set by the producer and will not be changed afterward.
    • CreateTime accuracy is millisecond.
    • For compressed message, the CreateTime of the wrapper message will be the CreateTime of the last compressed message (this is to be consistent with base offset in KIP-31)

...

So this approach includes only CreateTime in the message format.

Wire protocol change

Change the MessageAndOffset format to:

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    CreateTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Build time based log index using LogAppendTime

The broker will still build time based index using LogAppendTime, LogAppendTime will be only in the time index file, but not in message format. i.e. not exposed to user.

...

  1. The message is the first message of a log segment.
  2. The message is the last message of a log segment.
  3. The message is the first message received in the minute.

Let replicas to also fetch log index file

Because LogAppendTime is not included in the message format. With current replication design, followers will not be able to get the LogAppendTime from leader. In order to make log retention and log rolling policy work, the LogAppendTime needs to be propagated from leader to followers.

...

Code Block
titleFetchResponse format for replication
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet [TimeIndexEntry]]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32
  TimeIndexEntry => LogAppendTime Offset <------------- new, the time index entry in the message set, one partition might contain multiple entries. The array will always be empty if the fetch request is not from followers.
    LogAppendTime => int64
    Offset => int32

ConsumerRecord / ProducerRecord format change

  • Add a CreateTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both CreateTime to ConsumerRecord.
    • The CreateTime is useful in use cases such as stream processing

 

Option discussions with use cases

For documentation purpose, here are some discussions we had on this KIP.

This section discusses how the three options work with a few use cases. Option 1 and Option 2 are in the rejected option section.

Options comparison

Use cases

Option 1

(Message contains CreateTime + LogAppendTime)

option 2

(Message contains LogAppendTime only)

option 3

(message contains CreateTime only, brokers keep LogAppendTime in log index)

option 4

(Message contains a timestamp that could be overridden by broker depending on configured time difference threshold)

Comparison
Mirror MakerBroker overrides the LAT and keep the CT as is.Broker overrides the LATBroker keep the CT as. And add index entry with LAT to the log index file.

Mirror maker will keep the consumed messages' timestamp. Those timestamp may or may not be overwritten by target cluster depending on the configuration.

Option 1 provides the most information to user. The only concern is whether we should expose LAT to user.

Option 2 loses the CreateTime information.

Option 3 have same amount information as option 1 from broker point of view. From user point of view, it does not expose the LAT.

Option 4 could be equivalent to either having CreateTime only or having LogAppendTime only depending on configuration.

Log RetentionBroker will use the LAT of the last message in a segment to enforce the policy.Same as option 1.

Broker will use the LAT of the last entry in the log index file to enforce the retention policy. Because the leader is the source of truth for LAT, followers need to get the LAT from leader when they replicate the messages. That means we need to introduce a new wire protocol to fetch the time based log index file as well.

When log recovery happens, the rebuilt time index would have different LAT from the actual arrival time of the messages in the log. And the LAT in the index file will be very close, or even the same.

The broker will take a look at the last Time Index entry of the segment to decide whether to delete the log segment or not.

Option 1 and option 2 can work with existing replication design and solve the log retention issue we have now.

Option 3 solves the issue but is a bit involved because it needs to replicate the time index entry as well.

Option 4 could be equivalent to either having CreateTime only or having LogAppendTime only depending on configuration.

Log rollingBroker will use the LAT of the first message in a log segment to enforce the policy.Same as option 1.

Broker will use the LAT of the first entry in the log index file to enforce the retention policy. Similar to the log retention case, the followers needs to replicate the time index as well.

The log recovery happens, the log rolling might not be honored either.

The broker will keep an in memory earliest message timestamp in the active log segment. 

On broker startup, the broker will need to scan the messages in the active segment to find the earliest timestamp.

Option 1 and option 2 solves the log rolling issue.

Option 3 solves the issue but is a bit involved because it needs to replicate the time index entry as well.

On broker start up, Option 4 needs to scan the active log segment to find the earliest timestamp in the log segment.

Stream processingApplications don't need to include the CreateTime in the payload but simply use the CreateTime field.Applications have to put CreateTime into the payload.Applications don't need to include the CreateTime in the payload but simply use the CreateTime field.Option 4 could be equivalent to either having CreateTime only or having LogAppendTime only depending on configuration.The benefit of having a CreateTime with each message rather than put it into payload is that application protocol can be simplified. It is convenient for the infrastructure to provide the timestamp so there is no need for each application to worry about the timestamp.
Latency measurementUser can get End2End latency and lag in time.User can get the lag in time.User can get End2End latency.Depending on the max.message.time.difference.ms configuration. User may or may not be able to find out the latency.Option 1 has most information for user.
Search message by timestamp.Detail discussion in KIP-33Detail discussion in KIP-33Detail discussion in KIP-33Detail discussion in KIP-33Detail discussion in KIP-33

Mirror maker case in detail

The behavior of broker for all the options are the same: The broker will always override the LogAppendTime(if exists) when message arrives the broker and keep the CreateTime(if exists) untouched. 

...

The LogAppendTime of a message in source cluster and target cluster will be different.

Discussion: should we use CreateTime OR LogAppendTime for log retention and time based log rolling?

To discuss the usage of CreateTime and LogAppendTime, it is useful to summarize the latency pattern of the messages flowing through the pipeline. The latency can be summarized to the following pattern:

...

Also it would be useful to think about the impact of a completely wrong timestamp set by client. i.e. the robustness of the system.

Log Retention

There are both pros and cons for log retention to be based on CreateTime or LogAppendTime.

...

In reality, we usually don't see all the pipeline has same large latency, so it looks LogAppendTime is preferable than CreateTime for log retention.

Time based log rolling

The main purpose of time based log rolling is to avoid the situation where a low volume topic always has only one segment which is also the active segment. From its nature, server side log rolling makes more sense.

...