Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is try to address the following issue in Kafka.

...

This KIP is preferably to be implemented with KIP-31 if possible to avoid changing wire protocol twice.

Public Interfaces

We propose the following change to the message format

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    SendTime => int64    <---------------------- NEW
    ReceiveTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Proposed Changes

Wire protocol change

Add SendTime and ReceiveTime field to message

  • SendTime
    • SendTime will be set by the producer and will not be changed afterward.
    • SendTime accuracy is millisecond.
  • ReceiveTime
    • The ReceiveTime will be assigned by broker upon receiving the message. If the message is coming from mirror maker, the original SendTime will be maintained but the ReceiveTime will be changed by the target broker.
    • The ReceiveTime will be used to build the log index.
    • The ReceiveTime accuracy is millisecond
    • The ReceiveTime of the outer message of compressed messages will be the latest ReceiveTime of all its inner messages.
      • If the compressed message is not compacted, the relative offsets of inner messages will be contiguous and share the same ReceiveTime.
      • If the compressed message is compacted, the relative offsets of inner messages may not be contiguous. Its ReceiveTime will be the ReceiveTime of the last inner message.
    • The followers will not reassign ReceiveTime but simply update an in memory lastAppendedReceiveTime and append the message to the log.
    • To handle leader migration where new leader has slower clock than old leader, all the leader should append max(lastAppendedTimestamp, currentTimeMillis) as the timestamp.

Change time based log rolling and retention to use timestamp

Time based log rolling and retention currently use the file creation time and last modified time. This does not work for newly created replicas because the time attributes of files is different from the true message append time. The following changes will address this issue.

  • The time based log rolling will be based on the timestamp of the first message in the log segment file.
  • The time based log retention will be based on the timestamp of the last message in the log segment file.

ConsumerRecord / ProducerRecord format change

  • Add a SendTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both SendTime and ReceiveTime to ConsumerRecord.
    • The SendTime is useful in use cases such as stream processing
    • The ReceiveTime is useful in use cases such as cross colo failover.

New time-based log index

In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well. Broker will build time index based on ReceiveTime of messages.

...

The time index file needs to be built just like the log index file based on each log segment file.

Use a time index for each log segment to save the timestamp -> log offset at minute granularity

Create another index file for each log segment with name SegmentBaseOffset.time.index to have index at minute level. The time index entry format is:

...

Users don't typically need to look up offsets with seconds granularity.

Discussion of some use cases

Mirror maker

The broker does not distinguish mirror maker from other producers. The following example explains what will the timestamp look like when there is mirror maker in the picture.(S - SendTime, R - ReceiveTime)

...

The ReceiveTime of a message in source cluster and target cluster will be different. This is because: 1) log retention/rolling needs to based on server clock to provide clear guarantee. 2) To support searching by timestamp, the ReceiveTime in a log file needs to be monotonically increasing, even when the messages in the target cluster came from different source clusters.

Leader migration

Suppose we have broker0 and broker1. Broker0 is the current leader of a partition and broker1 is a follower. Consider the following scenario:

...

To be more general, when a message is appended, broker (whether leader or follower) should remember the timestamp of the last appended message, if a later appended message has a timestamp earlier than the timestamp of last appended message, the timestamp of last appended message will be used.

 

Compatibility, Deprecation, and Migration Plan

NOTE: This part is drafted based on the assumption that KIP-31 and KIP-32 will be implemented in one patch.

...

For upgraded consumers, they can handle both V0 and V1.

Rejected Alternatives

Add a timestamp field to log index entry

The most straight forward approach to have a time index is to let the log index files have a timestamp associate with each entry.

...