Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. When broker receives a message, if the message is not rejected due to timestamp exceeds threshold, the message will be appended to the log. 
  2. The timestamp will either be LogAppendTime or CreateTime depending on the configuration.
  3. The broker checks if the the message timestamp falls into a new time.index.interval bucket greater than the previous appended index entry. If it is, the broker appends a new time index entry to the time index with the new time.index.interval timestamp.
  4. When a log segment is closed, the broker will always insert a time index entry to the time index of the previous closed log segment. The entry points to the last message with largest timestamp in the previous closed log segment if the largest timestamp ever seen is in the closed segment.
  5. If a new log segment is created, the broker will insert a time index entry to the time index of the new log segment when the first message whose timestamp falls into a new time.index.interval bucket is appended to the log segment. The It is possible that this log segment does not have any time index entry points to the first message in the log segmentif all the messages has smaller timestamp than the previous log segments. In that case the time based index would be empty.

Broker startup

On broker startup, the broker will need to find the earliest and latest timestamp of the current active log segment. The latest timestamp may needed for the next log index append. The earliest timestamp is needed to enforce time based log rolling. So the broker will need to scan the current active log segment to find the latest or earliest timestamp of messages.

Enforce time based log retention

...

Enforce time based log rolling

Because the broker keeps the earliest timestamp of the messages in the current active log segment. If the time beyond the configured log segment rolling out time from that timestamp. The broker will roll out a new log segmentCurrently time based log rolling is based on the creating time of the log segment. With this KIP, the time based rolling would be changed to based on the largest timestamp ever seen. A new log segment will be rolled out if current time is greater than largest timestamp ever seen + log.roll.ms. When message.timestamp.type=CreateTime, user should set max.message.time.difference.ms set appropriately together with log.roll.ms to avoid frequent log segment roll out.

Search message by timestamp

...

For documentation purpose, the followings are a few discussion for this KIP.
 

Comparison between building time index based on LogAppendTime and CreateTime

...

  1. Each broker keeps in memory a timestamp index map - Map[TopicPartitionSegment, Map[TimestampByMinute, Offset]]
    1. The timestamp is on minute boundary
    2. The offset is the offset of the first message in the log segment that falls into a minute
  2. Create a timestamp index file for each log segment. The entry in the file is in following format:

    Code Block
    languagejava
    Time Index Entry => Timestamp Offset
      Timestamp => int64
      Offset => int32

    So the timestamp index file will simply become a persistent copy of timestamp index map. Broker will load the timestamp map from the file on startup.

  3. When a broker (regardless leader or follower) receives a message, it does the following:
    1. Find which minute MIN the message with offset OFFSET falls in
    2. Check if MIN has already been in the in memory timestamp map for current log segment. If the timestamp does not exist, then the broker add [MIN->OFFSET] to both the in memory timestamp index map and the timestamp index file.
  4. When a log segment is deleted, the broker:
    1. Remove the TopicPartitionSegment key from in memory map
    2. Remove the log segment timestamp index file

 m