You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 5 Next »

Status

Current state: Under Discussion

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is try to address the following issue in Kafka.

  1. Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
  2. Searching offset by time stamp has very coarse granularity (log segment level), it also does not work well when replica is reassigned.

This KIP is a distilled/improved version of an earlier discussion that we started.

This KIP is preferably to be implemented with KIP-31 if possible to avoid changing wire protocol twice.

Public Interfaces

We propose the following change to the message format

MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    SendTime => int64    <---------------------- NEW
    ReceiveTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Proposed Changes

Wire protocol change

Add SendTime and ReceiveTime field to message

  • SendTime
    • SendTime will be set by the producer and will not be changed afterward.
    • SendTime accuracy is millisecond.
  • ReceiveTime
    • The ReceiveTime will be assigned by broker upon receiving the message. If the message is coming from mirror maker, the original SendTime will be maintained but the ReceiveTime will be changed by the target broker.
    • The ReceiveTime will be used to build the log index.
    • The ReceiveTime accuracy is millisecond
    • The ReceiveTime of the outer message of compressed messages will be the latest ReceiveTime of all its inner messages.
      • If the compressed message is not compacted, the relative offsets of inner messages will be contiguous and share the same ReceiveTime.
      • If the compressed message is compacted, the relative offsets of inner messages may not be contiguous. Its ReceiveTime will be the ReceiveTime of the last inner message.
    • The followers will not reassign ReceiveTime but simply update an in memory lastAppendedReceiveTime and append the message to the log.
    • To handle leader migration where new leader has slower clock than old leader, all the leader should append max(lastAppendedTimestamp, currentTimeMillis) as the timestamp.

Change time based log rolling and retention to use timestamp

Time based log rolling and retention currently use the file creation time and last modified time. This does not work for newly created replicas because the time attributes of files is different from the true message append time. The following changes will address this issue.

  • The time based log rolling will be based on the timestamp of the first message in the log segment file.
  • The time based log retention will be based on the timestamp of the last message in the log segment file.

ConsumerRecord / ProducerRecord format change

  • Add a SendTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both SendTime and ReceiveTime to ConsumerRecord.
    • The SendTime is useful in use cases such as stream processing
    • The ReceiveTime is useful in use cases such as cross colo failover.

New time-based log index

In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well. Broker will build time index based on ReceiveTime of messages.

Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption.

The time index file needs to be built just like the log index file based on each log segment file.

Use a time index for each log segment to save the timestamp -> log offset at minute granularity

Create another index file for each log segment with name SegmentBaseOffset.time.index to have index at minute level. The time index entry format is:

 

Time Index Entry => Timestamp Offset
  Timestamp => int64
  Offset => int32

The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. The way it works will be the same as offset search - find the closet timestamp and corresponding offset, then start the leaner scan over the log until find the target message. The reason we prefer minute level indexing is because timestamp based search is usually rare so it probably does not worth investing significant amount of memory in it.

The time index will be built based on the log index file. Every time when a new entry is inserted into log index file, we take a look at the timestamp of the message and if it falls into next minute, we insert an entry to the time index as well. The following table give the summary of memory consumption using different granularity. The number is calculated based on a broker with 3500 partitions.

second864003.4 GB
Minute144057 MB

Users don't typically need to look up offsets with seconds granularity.

Use case discussion

 

Mirror maker

 

The broker does not distinguish mirror maker from other producers. The following example explains what will the timestamp look like when there is mirror maker in the picture.(S - SendTime, R - ReceiveTime)

 

  1. Application producer produces message at T0. ( [S = T0, R = -1] ) 
  2. The broker in cluster1 received message at T1 = T0 + latency1 and appended the message to the log. Latency1 includes linger.ms and some other latency. ( [S = T0, R = T1] )
  3. Mirror maker copies the message to broker in cluster2. ( [S = T0, R = T1] )
  4. The broker in cluster2 receives message at T2 = T1 + latency2 and appended the message to the log. ( [S = T0, R = T2] )

 

The SendTime of a message in source cluster and target cluster will be same. i.e. the timestamp is passed across clusters.

 

The ReceiveTime of a message in source cluster and target cluster will be different. This is because: 1) log retention/rolling needs to based on server clock to provide clear guarantee. 2) To support searching by timestamp, the ReceiveTime in a log file needs to be monotonically increasing, even when the messages in the target cluster came from different source clusters.

Broker side use cases:

To discuss the usage of SendTime and ReceiveTime, it is useful to summarize the latency pattern of the messages flowing through the pipeline. The latency can be summarized to the following pattern:

 

  1. the messages flow through the pipeline with small latency.
  2. the messages flow through the pipeline with similar large latency.
  3. the messages flow through the pipeline with large latency difference.

Also it would be useful to think about the impact of a completely wrong timestamp set by client. i.e. the robustness of the system.

Log Retention

There are both pros and cons for log retention to be based on SendTime or Receive time.

Use SendTime:

  • Pattern 1:
    Because the latency is small, so SendTime and ReceiveTime will be close and their won't be much difference.
  • pattern 2:
    If the log retention is based on the message creation time, it will not be affected by the latency in the data pipeline because the send time will not change.
    If the log retention is based on the receive time, it will be affected by the latency in the pipeline. Because of the latency difference, some data can be deleted on one cluster, but not on another cluster in the pipeline.
  • Pattern 3: 
    When the messages with significantly different timestamp goes into a cluster at around same time, the retention policy is hard to follow if we use SendTime. For example, imagine two mirror makers copy data from two source clusters to the same target cluster. If MirrorMaker1 is copying Messages with SendTime around 1:00 PM today, and MirrorMaker2 is copying messages with SendTime around 1:00 PM yesterday. Those messages can go to the same log segment in the target cluster. It will be difficult for broker to apply retention policy to the log segment. The broker needs to maintain the knowledge about the latest SendTime of all messages in a log segment and persist the information somewhere.
  • Robustness: 
    If there is a message with SendTime set to the future, the log might be kept for very long. Broker needs to sanity check the timestamp when receive the message. It could by tricky to determine which timestamp is not valid

Comparison:

 pattern 1pattern 2pattern 3Robustness
PreferenceS = RS > RS < RS < R

In reality, we usually don't see all the pipeline has same large latency, so it looks ReceiveTime is preferable than SendTime for log retention.

Time based log rolling

The main purpose of time based log rolling is to avoid the situation where a low volume topic always has only one segment which is also the active segment. From its nature, server side log rolling makes more sense.

  • Pattern 1:
    Because the latency is small, so SendTime and ReceiveTime will be close and their won't be much difference.
  • Pattern 2:
    When the latency is large, it is possible that when a new message is produced to a broker, the SendTime has already reached the rolling criteria. This might cause a segment with only one message.
  • Pattern 3:
    Similar to pattern 2, a lagged message might result in a single message log segment.
  • Robustness:
    Similar to as pattern 2 and pattern 3. Also a SendTime in the future might break the log rolling as well.

 

 pattern 1pattern 2pattern 3Robustness
PreferenceS = RS < RS < RS < R

Build time based index

Building time based index is to allow fast search based on timestamps. Please refer to "Searching message by timestamp" for related discussion.

Because SendTime can be (and is likely) set by different producers, they are likely not in order. Building time based index based on SendTime will be difficult. So ReceiveTime is preferable for building time based log index.

Application use cases

Stream Processing

When it comes to the application domain, having a SendTime with a message is preferable.

The benefit of having a SendTime with each message rather than put it into payload is that application protocol can be simplified. Depending on the use case, user may or may not need to decompress the message to get the SendTime for each message. Also, with ReceiveTime user can easily get the latency of each message.

Search message by timestamp

There could be a few reasons people want to search messages by timestamp. One important use case is for disaster recovery.

Imagine people have cluster 1 and cluster 2 at different locations. The two clusters has same data. When cluster 1 goes down, the consumers of cluster 1 will try switching to consume from cluster 2. The problem is which offset should those consumers resume consuming from. The goals here are:

  1. Not lose messages.
  2. Reconsumer as less as possible messages that has already been consumed.

 

The challenge is that there is little cross reference between cluster 1 and cluster 2. The offset checkpoint for cluster 1 does not apply for cluster 2. This is a hard to get accurate resuming point, because the message order and content for each partition could be significantly different between the two clusters. An approximate approach to take in this case is to take a look at the timestamp T of the last consume message from cluster 1 (), and then start to consume from the message produced to cluster 2 at (T - X) where x is some safety buffer based on latency difference between cluster 1 and cluster 2, say 15 min.

There are two questions here: 

Which timestamp should we use for T?

  • SendTime
    As mentioned earlier, because it is hard to build a time-based index using SendTime, if we use SendTime for T, it is difficult to find the offset corresponding to T and also meet the two goals.
  • ReceiveTime
    Because receive time is monotonically increasing on the broker. The broker can easily build index for it and find the message corresponding to T.
  • Timestamp when cluster 1 is down
    If the consumer lags are small, we can simply use the time when cluster 1 is down as T.

No matter which T we use, in order to find the message corresponding to T, we need the broker side time-based log index. 

How to determin X?

Imagine we have two messages:

m0 with SendTime ST0 reaches cluster 1 at ReceiveTime RT01 and reached cluster 2 at RT02.

m1 with SendTime ST1 reaches cluster 1 at ReceiveTime RT11 and reached cluster 2 at RT12.

Assuming the clock skew between cluster 1 and cluster 2 is very small so we can ignore it.

Leader migration

Suppose we have broker0 and broker1. Broker0 is the current leader of a partition and broker1 is a follower. Consider the following scenario:

  1. message m0 is produced to broker 0 at time T0 (R = T0, T0 is the clock on broker 0).
  2. broker1 as a follower replicated m0 and appended to its own log without changing the ReceiveTime of m0.
  3. broker0 went down and broker 1 become the new leader.
  4. message m1 is produced to broker 1 at time T1 (R = T1, T1 is the clock on broker 1).

In step 4, it is possible that T1 < T0 because of the time difference on two brokers. If we naively take T1 as the timestamp of m1, then the timestamp will be out of order. i.e. a message with earlier timestamp will show later in the log. To avoid this problem, at step 4, broker 1 can take max(T1, T0) as the timestamp for m1. So the timestamp in the log will not go backward. Broker 1 will be using T0 as timestamp until its own clock advances after T0.

To be more general, when a message is appended, broker (whether leader or follower) should remember the timestamp of the last appended message, if a later appended message has a timestamp earlier than the timestamp of last appended message, the timestamp of last appended message will be used.

 

Compatibility, Deprecation, and Migration Plan

NOTE: This part is drafted based on the assumption that KIP-31 and KIP-32 will be implemented in one patch.

The proposed protocol is not backward compatible. The migration plan are as below:

  1. Increment magic byte in MessageAndOffset from 0 to 1.
  2. Upgrade broker to support both V0 and V1 of MessageAndOffset.
    1. When broker see a producer request using V0, it will still decompress the message, assign offsets and re-compress.
    2. When broker see a producer request using V1, it will decompress the message for verification, assign the offset to outer message and NOT do recompression.
  3. Upgrade consumer to support both V0 and V1.
  4. Upgrade producer to send MessageAndOffset V1.

For producer, there will be no impact.

For old consumers that cannot recognize V1, if they see the new protocol an exception will be thrown for unsupported version. (The current code in ZookeeperConsumerConnector does not validate the magic byte. This is a bug and we will fix it in a separate ticket)

For upgraded consumers, they can handle both V0 and V1.

Rejected Alternatives

Add a timestamp field to log index entry

The most straight forward approach to have a time index is to let the log index files have a timestamp associate with each entry.

Log Index Entry => Offset Position Timestamp
  Offset => int32
  Position => int32
  Timestamp => int64

Because the index entry size become 16 bytes instead of 8 bytes. The index file size also needs to be doubled. As an example, one of the broker we have has ~3500 partitions. The index file took about 16GB memory. With this new format, the memory consumption would be 32GB.

  • No labels