Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Users don't typically need to look up offsets with seconds granularity.

Use case discussion

 

Mirror maker

 

The broker does not distinguish mirror maker from other producers. The following example explains what will the timestamp look like when there is mirror maker in the picture.(S - SendTime, R - ReceiveTime)

 

  1. Application producer produces message at T0. ( [S = T0, R = -1] ) 
  2. The broker in cluster1 received message at T1 = T0 + latency1 and appended the message to the log. Latency1 includes linger.ms and some other latency. ( [S = T0, R = T1] )
  3. Mirror maker copies the message to broker in cluster2. ( [S = T0, R = T1] )
  4. The broker in cluster2 receives message at T2 = T1 + latency2 and appended the message to the log. ( [S = T0, R = T2] )

 

The SendTime of a message in source cluster and target cluster will be same. i.e. the timestamp is passed across clusters.

 

The ReceiveTime of a message in source cluster and target cluster will be different. This is because: 1) log retention/rolling needs to based on server clock to provide clear guarantee. 2) To support searching by timestamp, the ReceiveTime in a log file needs to be monotonically increasing, even when the messages in the target cluster came from different source clusters.

Broker side use cases:

To discuss the usage of SendTime and ReceiveTime, it is useful to summarize the latency pattern of the messages flowing through the pipeline. The latency can be summarized to the following pattern:

 

  1. the messages flow through the pipeline with small latency.
  2. the messages flow through the pipeline with similar large latency.
  3. the messages flow through the pipeline with large latency difference.

Also it would be useful to think about the impact of a completely wrong timestamp set by client. i.e. the robustness of the system.

Log Retention

There are both pros and cons for log retention to be based on SendTime or Receive time.

Use SendTime:

  • Pattern 1:

...


  • Because the latency is small, so SendTime and ReceiveTime will be close and their won't be much difference.
  • pattern 2:
    If the log retention is based on Good: The log retention will be associated the message creation time, and in ideal case it will not be affected by the latency in the data pipeline because the send time will not change. The assumption here is the message with similar SendTime will reach a cluster at around same time.
    If the log retention is based on the receive time, it will be affected by the latency in the pipeline. Because of the latency difference, some data can be deleted on one cluster, but not on another cluster in the pipeline.
  • Pattern 3: 
    Bad: When the messages with significantly different timestamp goes into a cluster at around same time, the retention policy is hard to follow if we use SendTime. For example, imagine two mirror makers copy data from two source clusters to the same target cluster. If MirrorMaker1 is copying Messages with SendTime around 1:00 PM today, and MirrorMaker2 is copying messages with SendTime around 1:00 PM yesterday. Those messages can go to the same log segment in the target cluster. It will be difficult for broker to apply retention policy to the log segment.
    1) The broker needs to maintain the knowledge about the latest SendTime of all messages in a log segment and persist the information somewhere.
    2)
  • Robustness: 
    If there is a message with SendTime set to the future, the log might be kept for very long. Broker needs to sanity check the timestamp when receive the message. It could by tricky to determine which timestamp is not valid

Use ReceiveTime:

  • Good: The log retention policy is easy to enforce and it does not suffer from wrong client timestamp.
  • Bad: The log retention will be independent on each Kafka cluster in the pipeline. Because of the latency difference, some data can be deleted on one cluster, but not on another cluster in the pipeline.

Comparison:

The key issue here is about the latency of the messages flow through the pipeline. The latency can be summarized to the following pattern:

...

Comparison:

 pattern 1pattern 2pattern 3Robustness
PreferenceS = RS > RS < RS < R

In reality, we usually don't see all the pipeline has same large latency, so it looks ReceiveTime is preferable than SendTime for log retention.

Time based log rolling

The main purpose of time based log rolling is to avoid the situation where a low volume topic always has only one segment which is also the active segment. From its nature, server side log rolling makes more sense.

  • Pattern 1:
    Because the latency is small, so SendTime and ReceiveTime will be close and their won't be much difference.
  • Pattern 2:
    When the latency is large, it is possible that when a new message is produced to a broker, the SendTime has already reached the rolling criteria. This might cause a segment with only one message.
  • Pattern 3:
    Similar to pattern 2, a lagged message might result in a single message log segment.
  • Robustness:
    Similar to as pattern 2 and pattern 3. Also a SendTime in the future might break the log rolling as well.

 

...

 pattern 1pattern 2pattern 3pattern 4Robustness
PreferenceS = RS > < RS = < RS < R

Build time based index

Building time based index is to allow fast search based on timestamps. Please refer to "Searching message by timestamp" for related discussion.

Because SendTime can be (and is likely) set by different producers, they are likely not in order. Building time based index based on SendTime will be difficult. So ReceiveTime is preferable for building time based log index.

Application use cases

Stream Processing

When it comes to the application domain, having a SendTime with a message is preferable.

The benefit of having a SendTime with each message rather than put it into payload is that application protocol can be simplified. Depending on the use case, user may or may not need to decompress the message to get the SendTime for each message. Also, with ReceiveTime user can easily get the latency of each message.

Search message by timestamp

There could be a few reasons people want to search messages by timestamp. One important use case is for disaster recovery.

Imagine people have cluster 1 and cluster 2 at different locations. The two clusters has same data. When cluster 1 goes down, the consumers of cluster 1 will try switching to consume from cluster 2. The problem is which offset should those consumers resume consuming from. The goals here are:

  1. Not lose messages.
  2. Reconsumer as less as possible messages that has already been consumed.

 

The challenge is that there is little cross reference between cluster 1 and cluster 2. The offset checkpoint for cluster 1 does not apply for cluster 2. This is a hard to get accurate resuming point, because the message order and content for each partition could be significantly different between the two clusters. An approximate approach to take in this case is to take a look at the timestamp T of the last consume message from cluster 1 (), and then start to consume from the message produced to cluster 2 at (T - X) where x is some safety buffer based on latency difference between cluster 1 and cluster 2, say 15 min.

There are two questions here: 

Which timestamp should we use for T?

  • SendTime
    As mentioned earlier, because it is hard to build a time-based index using SendTime, if we use SendTime for T, it is difficult to find the offset corresponding to T and also meet the two goals.
  • ReceiveTime
    Because receive time is monotonically increasing on the broker. The broker can easily build index for it and find the message corresponding to T.
  • Timestamp when cluster 1 is down
    If the consumer lags are small, we can simply use the time when cluster 1 is down as T.

No matter which T we use, in order to find the message corresponding to T, we need the broker side time-based log index. 

How to determin X?

Imagine we have two messages:

m0 with SendTime ST0 reaches cluster 1 at ReceiveTime RT01 and reached cluster 2 at RT02.

m1 with SendTime ST1 reaches cluster 1 at ReceiveTime RT11 and reached cluster 2 at RT12.

As we can see, it folds down to whether pattern 2 or pattern 4 is more likely. In reality, we rarely seeAssuming the clock skew between cluster 1 and cluster 2 is very small so we can ignore it.

Leader migration

Suppose we have broker0 and broker1. Broker0 is the current leader of a partition and broker1 is a follower. Consider the following scenario:

...