You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 6 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is try to address the following issue in Kafka.

  1. Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
  2. Log rolling might break for a newly created replica as well because of the same reason as (1).

This KIP is a distilled/improved version of an earlier discussion that we started.

This KIP is preferably to be implemented with KIP-31 if possible to avoid changing wire protocol twice.

Public Interfaces

We propose the following change to the message format

MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    CreateTime => int64    <---------------------- NEW
    LogAppendTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Proposed Changes

Wire protocol change

Add CreateTime and LogAppendTime field to message

  • CreateTime
    • CreateTime will be set by the producer and will not be changed afterward.
    • CreateTime accuracy is millisecond.
    • For compressed message, the CreateTime of the wrapper message will be the CreateTime of the first compressed message.
  • LogAppendTime
    • The LogAppendTime will be assigned by broker upon receiving the message. If the message is coming from mirror maker, the original CreateTime will be maintained but the LogAppendTime will be changed by the target broker.
    • The LogAppendTime will be used to build the log index.
    • The LogAppendTime accuracy is millisecond
    • The LogAppendTime of the outer message of compressed messages will be the latest LogAppendTime of all its inner messages.
      • If the compressed message is not compacted, the relative offsets of inner messages will be contiguous and share the same LogAppendTime.
      • If the compressed message is compacted, the relative offsets of inner messages may not be contiguous. Its LogAppendTime will be the LogAppendTime of the last inner message.
    • The followers will not reassign LogAppendTime but simply update an in memory lastAppendedLogAppendTime and append the message to the log.
    • To handle leader migration where new leader has slower clock than old leader, all the leader should append max(lastAppendedTimestamp, currentTimeMillis) as the timestamp.

Change time based log rolling and retention to use LogAppendTime

Time based log rolling and retention currently use the file creation time and last modified time. This does not work for newly created replicas because the time attributes of files is different from the true message append time. The following changes will address this issue.

  • The time based log rolling will be based on the timestamp of the first message in the log segment file.
  • The time based log retention will be based on the timestamp of the last message in the log segment file.

ConsumerRecord / ProducerRecord format change

  • Add a CreateTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both CreateTime and LogAppendTime to ConsumerRecord.
    • The CreateTime is useful in use cases such as stream processing
    • The LogAppendTime is useful in use cases such as cross colo failover.

Use case discussion

 Mirror maker

The broker does not distinguish mirror maker from other producers. The following example explains what will the timestamp look like when there is mirror maker in the picture.(CT - CreateTime, LAT - LogAppendTime)

  1. Application producer produces message at T0. ( [CT = T0, LAT = -1] ) 
  2. The broker in cluster1 received message at T1 = T0 + latency1 and appended the message to the log. Latency1 includes linger.ms and some other latency. ( [CT = T0, LAT = T1] )
  3. Mirror maker copies the message to broker in cluster2. ( [CT = T0, LAT = T1] )
  4. The broker in cluster2 receives message at T2 = T1 + latency2 and appended the message to the log. ( [CT = T0, LAT = T2] )

The CreateTime of a message in source cluster and target cluster will be same. i.e. the timestamp is passed across clusters.

The LogAppendTime of a message in source cluster and target cluster will be different.

Broker side use cases

To discuss the usage of CreateTime and LogAppendTime, it is useful to summarize the latency pattern of the messages flowing through the pipeline. The latency can be summarized to the following pattern:

  1. the messages flow through the pipeline with small latency.
  2. the messages flow through the pipeline with similar large latency.
  3. the messages flow through the pipeline with large latency difference.

Also it would be useful to think about the impact of a completely wrong timestamp set by client. i.e. the robustness of the system.

Log Retention

There are both pros and cons for log retention to be based on CreateTime or Receive time.

  • Pattern 1:
    Because the latency is small, so CreateTime and LogAppendTime will be close and their won't be much difference.
  • pattern 2:
    If the log retention is based on the message creation time, it will not be affected by the latency in the data pipeline because the send time will not change.
    If the log retention is based on the receive time, it will be affected by the latency in the pipeline. Because of the latency difference, some data can be deleted on one cluster, but not on another cluster in the pipeline.
  • Pattern 3: 
    When the messages with significantly different timestamp goes into a cluster at around same time, the retention policy is hard to follow if we use CreateTime. For example, imagine two mirror makers copy data from two source clusters to the same target cluster. If MirrorMaker1 is copying Messages with CreateTime around 1:00 PM today, and MirrorMaker2 is copying messages with CreateTime around 1:00 PM yesterday. Those messages can go to the same log segment in the target cluster. It will be difficult for broker to apply retention policy to the log segment. The broker needs to maintain the knowledge about the latest CreateTime of all messages in a log segment and persist the information somewhere.
  • Robustness: 
    If there is a message with CreateTime set to the future, the log might be kept for very long. Broker needs to sanity check the timestamp when receive the message. It could by tricky to determine which timestamp is not valid

Comparison:

 pattern 1pattern 2pattern 3Robustness
PreferenceCT = LATCT > LATCT < LATCT < LAT

In reality, we usually don't see all the pipeline has same large latency, so it looks LogAppendTime is preferable than CreateTime for log retention.

Time based log rolling

The main purpose of time based log rolling is to avoid the situation where a low volume topic always has only one segment which is also the active segment. From its nature, server side log rolling makes more sense.

  • Pattern 1:
    Because the latency is small, so CreateTime and LogAppendTime will be close and their won't be much difference.
  • Pattern 2:
    When the latency is large, it is possible that when a new message is produced to a broker, the CreateTime has already reached the rolling criteria. This might cause a segment with only one message.
  • Pattern 3:
    Similar to pattern 2, a lagged message might result in a single message log segment.
  • Robustness:
    Similar to as pattern 2 and pattern 3. Also a CreateTime in the future might break the log rolling as well.

 

 pattern 1pattern 2pattern 3Robustness
PreferenceCT = LATCT < LATCT < LATCT < LAT

Build time based index

Building time based index is to allow fast search based on timestamps. Please refer to "Searching message by timestamp" for related discussion.

Because CreateTime can be (and is likely) set by different producers, they are likely not in order. Building time based index based on CreateTime will be difficult. So LogAppendTime is preferable for building time based log index.

Application use cases

Stream Processing

When it comes to the application domain, having a CreateTime with a message is useful.

The benefit of having a CreateTime with each message rather than put it into payload is that application protocol can be simplified. It is convenient for the infrastructure to provide the timestamp so there is no need for each application to worry about the timestamp.

Depending on the use case, user may or may not need to decompress the message to get the CreateTime for each message. Also, with LogAppendTime user can easily get the latency of each message.

 

Latency Measurement

The latency measurement needs both CreateTime and LogAppendTime. The LogAppendTime does not need to be in the message. Broker now can have a latency metric in for each topic.

A corner case: Leader migration

Suppose we have broker0 and broker1. Broker0 is the current leader of a partition and broker1 is a follower. Consider the following scenario:

  1. message m0 is produced to broker 0 at time T0 (R = T0, T0 is the clock on broker 0).
  2. broker1 as a follower replicated m0 and appended to its own log without changing the LogAppendTime of m0.
  3. broker0 went down and broker 1 become the new leader.
  4. message m1 is produced to broker 1 at time T1 (R = T1, T1 is the clock on broker 1).

In step 4, it is possible that T1 < T0 because of the time difference on two brokers. If we naively take T1 as the timestamp of m1, then the timestamp will be out of order. i.e. a message with earlier timestamp will show later in the log. To avoid this problem, at step 4, broker 1 can take max(T1, T0) as the timestamp for m1. So the timestamp in the log will not go backward. Broker 1 will be using T0 as timestamp until its own clock advances after T0.

To be more general, when a message is appended, broker (whether leader or follower) should remember the timestamp of the last appended message, if a later appended message has a timestamp earlier than the timestamp of last appended message, the timestamp of last appended message will be used.

Compatibility, Deprecation, and Migration Plan

NOTE: This part is drafted based on the assumption that KIP-31 and KIP-32 will be implemented in one patch.

The proposed protocol is not backward compatible. The migration plan are as below:

  1. Increment magic byte in MessageAndOffset from 0 to 1.
  2. Upgrade broker to support both V0 and V1 of MessageAndOffset.
    1. When broker see a producer request using V0, it will still decompress the message, assign offsets and re-compress.
    2. When broker see a producer request using V1, it will decompress the message for verification, assign the offset to outer message and NOT do recompression.
  3. Upgrade consumer to support both V0 and V1.
  4. Upgrade producer to send MessageAndOffset V1.

For producer, there will be no impact.

For old consumers that cannot recognize V1, if they see the new protocol an exception will be thrown for unsupported version. (The current code in ZookeeperConsumerConnector does not validate the magic byte. This is a bug and we will fix it in a separate ticket)

For upgraded consumers, they can handle both V0 and V1.

Rejected Alternatives

Add a timestamp field to log index entry

The most straight forward approach to have a time index is to let the log index files have a timestamp associate with each entry.

Log Index Entry => Offset Position Timestamp
  Offset => int32
  Position => int32
  Timestamp => int64

Because the index entry size become 16 bytes instead of 8 bytes. The index file size also needs to be doubled. As an example, one of the broker we have has ~3500 partitions. The index file took about 16GB memory. With this new format, the memory consumption would be 32GB.

  • No labels