Status

Current state: Under Discussion

Discussion thread: here

JIRA: KAFKA-2511

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is try to address the following issue in Kafka.

  1. Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
  2. Log rolling might break for a newly created replica as well because of the same reason as (1).

Besides that, the KIP will also facilitate some use cases such as streaming processing where a timestamp is needed.

This KIP is a distilled/improved version of an earlier discussion that we started.

This KIP is preferably to be implemented with KIP-31 if possible to avoid changing wire protocol twice.

Public Interfaces

We propose the following change to the message format

MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    CreateTime => int64    <---------------------- NEW
    LogAppendTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Proposed Changes

Wire protocol change

Add CreateTime and LogAppendTime field to message

A corner case for LogAppendTime(LAT) - Leader migration:

Suppose we have broker0 and broker1. Broker0 is the current leader of a partition and broker1 is a follower. Consider the following scenario:

  1. message m0 is produced to broker 0 at time T0 (LAT = T0, T0 is the clock on broker 0).
  2. broker1 as a follower replicated m0 and appended to its own log without changing the LogAppendTime of m0.
  3. broker0 went down and broker 1 become the new leader.
  4. message m1 is produced to broker 1 at time T1 (LAT = T1, T1 is the clock on broker 1).

In step 4, it is possible that T1 < T0 because of the time difference on two brokers. If we naively take T1 as the timestamp of m1, then the timestamp will be out of order. i.e. a message with earlier timestamp will show later in the log. To avoid this problem, at step 4, broker 1 can take max(T1, T0) as the timestamp for m1. So the timestamp in the log will not go backward. Broker 1 will be using T0 as timestamp until its own clock advances after T0.

To be more general, when a message is appended, broker (whether leader or follower) should remember the timestamp of the last appended message, if a later appended message has a timestamp earlier than the timestamp of last appended message, the timestamp of last appended message will be used.

Change time based log rolling and retention to use LogAppendTime

Time based log rolling and retention currently use the file creation time and last modified time. This does not work for newly created replicas because the time attributes of files is different from the true message append time. The following changes will address this issue.

ConsumerRecord / ProducerRecord format change

Use case discussion

 Mirror maker

The broker does not distinguish mirror maker from other producers. The following example explains what will the timestamp look like when there is mirror maker in the picture.(CT - CreateTime, LAT - LogAppendTime)

  1. Application producer produces message at T0. ( [CT = T0, LAT = -1] ) 
  2. The broker in cluster1 received message at T1 = T0 + latency1 and appended the message to the log. Latency1 includes linger.ms and some other latency. ( [CT = T0, LAT = T1] )
  3. Mirror maker copies the message to broker in cluster2. ( [CT = T0, LAT = T1] )
  4. The broker in cluster2 receives message at T2 = T1 + latency2 and appended the message to the log. ( [CT = T0, LAT = T2] )

The CreateTime of a message in source cluster and target cluster will be same. i.e. the timestamp is passed across clusters.

The LogAppendTime of a message in source cluster and target cluster will be different.

Broker side use cases

To discuss the usage of CreateTime and LogAppendTime, it is useful to summarize the latency pattern of the messages flowing through the pipeline. The latency can be summarized to the following pattern:

  1. the messages flow through the pipeline with small latency.
  2. the messages flow through the pipeline with similar large latency.
  3. the messages flow through the pipeline with large latency difference.

Also it would be useful to think about the impact of a completely wrong timestamp set by client. i.e. the robustness of the system.

Log Retention

There are both pros and cons for log retention to be based on CreateTime or LogAppendTime.

Comparison:

 pattern 1pattern 2pattern 3Robustness
PreferenceCT or LATCTLATLAT

In reality, we usually don't see all the pipeline has same large latency, so it looks LogAppendTime is preferable than CreateTime for log retention.

Time based log rolling

The main purpose of time based log rolling is to avoid the situation where a low volume topic always has only one segment which is also the active segment. From its nature, server side log rolling makes more sense.

 

 pattern 1pattern 2pattern 3Robustness
PreferenceCT or LATLATLATLAT

Application use cases

Stream Processing

When it comes to the application domain, having a CreateTime with a message is useful.

The benefit of having a CreateTime with each message rather than put it into payload is that application protocol can be simplified. It is convenient for the infrastructure to provide the timestamp so there is no need for each application to worry about the timestamp.

Depending on the use case, user may or may not need to decompress the message to get the CreateTime for each message. Also, with LogAppendTime user can easily get the latency of each message.

Latency Measurement

The latency measurement needs both CreateTime and LogAppendTime. The LogAppendTime does not need to be in the message. Broker now can have a latency metric in for each topic. This could be used for monitoring purpose as well as some application use cases.

 

Compatibility, Deprecation, and Migration Plan

NOTE: This part is drafted based on the assumption that KIP-31 and KIP-32 will be implemented in one patch.

The proposed protocol is not backward compatible. The migration plan are as below:

  1. Increment magic byte in MessageAndOffset from 0 to 1.
  2. Upgrade broker to support both V0 and V1 of MessageAndOffset.
    1. When broker see a producer request using V0, it will still decompress the message, assign offsets and re-compress.
    2. When broker see a producer request using V1, it will decompress the message for verification, assign the offset to outer message and NOT do recompression.
  3. Upgrade consumer to support both V0 and V1.
  4. Upgrade producer to send MessageAndOffset V1.

For producer, there will be no impact.

For old consumers that cannot recognize V1, if they see the new protocol an exception will be thrown for unsupported version. (The current code in ZookeeperConsumerConnector does not validate the magic byte. This is a bug and we will fix it in a separate ticket)

For upgraded consumers, they can handle both V0 and V1.

Rejected Alternatives

Adding a LogAppendTime only to Kafka message.