Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    TimestampSendTime => int64    <---------------------- NEW
    ReceiveTime => int64 <---------------------- NEW
   KeyLength KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Proposed Changes

Wire protocol change

Add

...

SendTime and ReceiveTime field to message

  • SendTime
    • SendTime will be set by the producer and will not be changed afterward.
    • SendTime accuracy is millisecond.
  • ReceiveTime
    • The ReceiveTime
    The timestamp
    • will be assigned by broker upon receiving the message. If the message is coming from mirror maker, the original
    time stamp will be discarded. In essence, we treat timestamp as another offset-like field. If application needs a timestamp, it can be put into payload.
    • SendTime will be maintained but the ReceiveTime will be changed by the target broker.
    • The ReceiveTime
    The timestamp
    • will be used to build the log index.
    • The
    tiimestamp
    • ReceiveTime accuracy
    would be
    • is millisecond
    • The
    timestamp
    • ReceiveTime of the outer message of compressed messages will be the latest
    timestamp
    • ReceiveTime of all its inner messages.
      • If the compressed message is not compacted, the relative offsets of inner messages will be contiguous and share the same
      timestamp
      • ReceiveTime.
      • If the compressed message is compacted, the relative offsets of inner messages may not be contiguous. Its
      timestamp
      • ReceiveTime will be the
      timestamp
      • ReceiveTime of the last inner message.
    • The followers will not reassign
    timestamp
    • ReceiveTime but simply update
    a
    • an in memory
    lastAppendedTimestamp
    • lastAppendedReceiveTime and append the message to the log.
    • To handle leader migration where new leader has slower clock than old leader, all the leader should append max(lastAppendedTimestamp, currentTimeMillis) as the timestamp.

Change time based log rolling and retention to use timestamp

...

  • The time based log rolling will be based on the timestamp of the first message in the log segment file.
  • The time based log retention will be based on the timestamp of the last message in the log segment file.

Expose the timestamp in ConsumerRecord to users

...

ConsumerRecord / ProducerRecord format change

  • Add a SendTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both SendTime and ReceiveTime to ConsumerRecord.
    • The SendTime is useful in use cases such as stream processing
    • The ReceiveTime is useful in use cases such as cross colo failover.

New time-based log index

In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well. Broker will build time index based on ReceiveTime of messages.

Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption.

...

Users don't typically need to look up offsets with seconds granularity.

Discussion of some

...

use cases

Mirror maker

The broker does not distinguish mirror maker from other producers. The following example explains what will the time stamp timestamp look like when there is mirror maker in the picture.(S - SendTime, R - ReceiveTime)

  1. Application producer produces message at T0.  
    ( [S = T0, R = -1] ) 
  2. The broker in cluster1 received message at T1 = T0 + latency1 and appended the message to the log. Latency1 includes linger.ms and some other latency. ( [S = T0, R = T1] )
  3. Mirror maker copies the message to broker in cluster2. ( [S = T0, R = T1] )
  4. The broker in cluster2 receives message at T2 = T1 + latency2 and appended the message to the log.
    ( [S = T0, R = T2] )

The SendTime So in the case of mirror maker, the timestamp of a message in source cluster and target cluster will be differentsame. i.e. the timestamp is not carried over cross cluster. This will ensure that the timestamp passed across clusters.

The ReceiveTime of a message in source cluster and target cluster will be different. This is because: 1) log retention/rolling needs to based on server clock to provide clear guarantee. 2) To support searching by timestamp, the ReceiveTime in a log file is needs to be monotonically increasing. Even , even when the messages in the target cluster came from different source clusters.

Leader migration

Suppose we have broker0 and broker1. Broker0 is the current leader of a partition and broker1 is a follower. Consider the following scenario:

  1. message m0 is produced to broker 0 at time T0 (R = T0, T0 is the clock on broker 0).
  2. broker1 as a follower replicated m0 and appended to its own log without changing the timestampReceiveTime of m0.
  3. broker0 went down and broker 1 become the new leader.
  4. message m1 is produced to broker 1 at time T1 (R = T1, T1 is the clock on broker 1).

...

To be more general, when a message is appended, broker (whether leader or follower) should remember the timestamp of the last appended message, if a later appended message has a timestamp earlier than the timestamp of last appended message, the timestamp of last appended message will be used.

 

Compatibility, Deprecation, and Migration Plan

...