Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. The log retention will be changed to base on the time index of a log segment instead of basing on the last modification time of the log segment file.
  2. The time based log rolling will have the following change: The log segment will roll out when log.roll.ms has elapsed since the largest timestamp of the messages in the log segment.

This KIP will indroduce another index file for each log segment. So the number of open file handlers will increased by about 1/3.

Proposed Changes

Add a new time-based log index

Broker will build time index based on the timestamp of the messages. The log index works for both LogAppendTime and CreateTime.

...

Time index format

Create another index file for each log segment with name SegmentBaseOffset.timeindex. The density of the index is defined by index.interval.bytes configuration.

...

Based on the proposal in KIP-32, the broker will build the time index in the following way:

  1. When broker receives a message, if the message is not rejected due to timestamp exceeds threshold, the message will be appended to the log. 
  2. The timestamp will either be LogAppendTime or CreateTime depending on the configuration.
  3. When a new log a new log segment is created, the broker will create a time index file for the log segment. 
  4. The default initial / max size of the time index files is the same as the offset index files. (time index entry is 1.5x of the size of offset index entry, user should set the configuration accordingly).
  5. Each log segment maintains the largest timestamp so far in that segment. The initial value of the largest timestamp is -1 for a newly created segment.
  6. When broker receives a message, if the message is not rejected due to timestamp exceeds threshold, the message will be appended to the log. (The timestamp will either be LogAppendTime or CreateTime depending on the configuration)
  7. When broker appends the message to the log segment, if an offset index entry is inserted, it will also insert a time index entry if the max timestamp so far is not globally monotonically increasing. Instead, it is only monotonically increasing within each time index file. i.e. It is possible that the time index file for a later log segment contains smaller timestamp than some timestamp in the time index file of an earlier segment.Because the time index and the offset index share the same index.interval.bytes configuration,  at the same time we insert an offset index entry, a time index entry is inserted as long as the max timestamp is greater than the timestamp in the last time index entry, or the time index is empty.
    • For message format v0, the timestamp is always -1, so no time index entry will be inserted when message is appended.
  8. When a new log segment is closed, the broker will write a rolled out. A time index entry to will be inserted into the time index file. That to ensure the last time index entry has the
    1. If largest timestamp in the
    this segment and points to the base offset of the next segment.
  9. The default initial / max size of the time index files is the same as the offset index files. (time index entry is 1.5x of the size of offset index entry, user should set the configuration accordingly).
  10. If all the messages in a log segment have message.format.version before 0.10.0, the broker will insert 
    1. segment is non-negative (at least one message has a timestamp), the entry will be (largest_timestamp_in_the_segment -> base_offset_of_the_next_segment)
    2. If largest timestamp in the segment is -1 (No message in the segment has a timestamp), 
    1. (last_modification_time_of_the_segment -> base_offset_of_next_segment) 
    into the

The time index is not globally monotonically increasing. Instead, it is only monotonically increasing within each time index file

...

Search messages by timestamp

The way it works will be the same as offset search - find the closet indexed timestamp and corresponding offset, then start the linear scan the log until find the target message.

The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. i.e. It is possible that the time index file for a later log segment contains smaller timestamp than some timestamp in the time index file of an earlier segment

Broker startup

On broker startup, The latest timestamp is needed for the next log index append. The broker will find the largest timestamp of the active segment by looking at the last inserted time index entry and scan from there till the log end. Broker will only do this if message.format.version is greater than 0.10.0. Otherwise the broker will skip reloading the largest timestamp.

...

When the log is truncated, because the offset in the time index is also monotonically increasing, we will also truncate the time index entries whose corresponding messages have been truncated. The active segment will reload the largest timestamp just like it did during startup.

Enforce time based log retention

...

  • The messages whose timestamp are after the searched timestamp will be consumed.
  • Some messages with earlier timestamp might also be consumed.

The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching.

Use case discussion

 Use caseGoalSolution with LogAppendTime indexSolution with CreateTime indexComparison
1Search by timestamp

Not lose messages

If user want to search for a message with CreateTime CT. They can use CT to search in the LogAppendTime index. Because LogAppendTime > CT for the same message (assuming no skew clock). If the clock is skewed, people can search with CT - X where X is the max skew.

If user want to search for a message with LogAppendTime LAT, they can just search with LAT and get a millisecond accuracy.

User can just search with CT and get a minute level granularity offset.

If the latency in the pipeline is greater than one minute, user might consume less message by using CreateTime index. Otherwise, LogAppendTime index is probably preferred.

Consider the following case:

  1. A message m1 with CreateTime CT arrives broker at LAT1.
  2. Some time later at LAT2, another message m2 with CreateTime CT arrives at broker.

If user want to search with CT after they consumed m2, they will have to reconsume from m1. Depending on how big LAT2 - LAT1 is, the amount of messages to be reconsumed can be very big.

2Search by timestamp (bootstrap)
  1. Not lose messages
  2. Consume less duplicate messages

In bootstrap case, all the LAT would be close. For example If user want to process the data in last 3 days and did the following:

  1. Dump a big database into Kafka
  2. Reprocess the message in last 3 days.

In this case, LogAppendTime index does not help too much. That means user needs to filter out the data older than 3 days before dumping them into Kafka.

In bootstrap case, the CreateTime will not change, if user follow the same procedure started in LogAppendTime index section. Searching by timestamp will work.LogAppendTime index needs further attention from user.
3Failover from cluster 1 to cluster 2
  1. Not lose messages
  2. Consume less duplicate messages

Similar search by timestamp. User can choose to use CT or LAT of cluster 1 to search on cluster 2. In this case, searching with CT - MaxLatencyOfCluster will provide strong guarantee on not losing messages, but might have some duplicates depending on the difference in latency between cluster 1 and cluster 2.

User can use CT to search and get minute level granularity. Duplicates are still not avoidable.

There can be some tricky cases here. Consider the following case [1]:

  • m1 with CT1 and m2 with CT2 are both produced to cluster 1 and cluster 2.
  • m1 is created earlier than m2. i.e. CT1 < CT2
  • m1 arrives cluster 1 at LAT11 and arrives cluster 2 at LAT12, assuming LAT11 < LAT12
  • m2 arrives cluster 2 at LAT21 and arrives cluster 2 at LAT22, assuming LAT12 > LAT22

In this case, m1 is created before m2. Due to latency difference, m1 arrives cluster 1 then m2 does, m2 arrives cluster 2 before m1 does.

If a consumer consumed m2 in cluster 2 and fail over to cluster 1, simply search by CT2 will miss m1 because m1 has larger offset than m2 in cluster 2 but smaller offset than m2 in cluster 1. So the same trick or CT - MaxLatencyOfCluster is still needed.

In cross cluster fail over case, both solution can provide strong guarantee of not losing messages. But both needs to depend on the knowledge of MaxLatencyOfCluster.
4Get lag for consumers by timeKnow how long a consumer is lagging by time.With LogAppendTime in the message, consumer can easily find out the lag by time and estimate how long it might need to reach the log end.Not supported. 
5Broker side latency metricLet the broker to report latency of each topic. i.e. LAT - CTThe latency can be reported as LAT - CT.The latency can be reported as System.currentTimeMillis - CTThe two solutions are the same. This latency information can be used for MaxLatencyOfCluster in use case 3.

...