Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Searching offset by timestamp has very coarse granularity (log segment level), it also does not work well when replica is reassigned.
  2. The time based log rolling and retention does not work well when replica is reassigned.

In this KIP we propose introducing adding a time based log index using the timestamp of the messages introduced in KIP-32.

...

Broker will build time index based on the timestamp of the messages. The log index works for both LogAppendTime and CreateTime.

Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption. 

Use a time index for each log segment to save the (timestamp -> log offset) at a configurable granularity

Create another index file for each log segment with name SegmentBaseOffset.timetimeindex. index. The density of the index is defined by index.interval.bytes configuration.

...

 

Code Block
languagejava
Time Index Entry => Timestamp Offset
  Timestamp => int64
  Offset => int32

The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. The way it works will be the same as offset search - find the closet timestamp and corresponding offset, then start the linear scan over the log until find the target message. Although the granularity is configurable, it is recommended to have a minute level granularity because timestamp based search is usually rare so it probably does not worth investing significant amount of memory in it.

The following table give the summary of memory consumption of one day using different granularity. The number is calculated based on a broker with 3500 partitions.

second864003.4 GB
Minute144057 MB

Build the time index

Based on the proposal in KIP-32, the broker will build the time index in the following way:

  • Timestamp - the biggest timestamp seen so far in this segment. The timestamp does not carry over to the next segment.
  • Offset - the next offset when the time index entry is inserted.
  • A time index entry (timestamp, offset) means that any message whose timestamp is greater than timestamp come after offset.

Build the time index

Based on the proposal in KIP-32, the broker will build the time index in the following way:

  1. When broker receives a message, if the message is not rejected due to timestamp exceeds threshold, the message When broker receives a message, if the message is not rejected due to timestamp exceeds threshold, the message will be appended to the log. 
  2. The timestamp will either be LogAppendTime or CreateTime depending on the configuration.
  3. When a new log segment is created, the broker will create a time index file for the log segment.
  4. The time index is not globally monotonically increasing. Instead, it is only guaranteed to be monotonically increasing within each time index file. i.e. It is possible that the time index file for a later log segment contains smaller timestamp than some timestamp in the time index file of an earlier segment..
  5. Because the time index and the offset index share the same index.interval.bytes configuration,  at the same time we insert an offset index entry, a time index entry is inserted as long as 
    1. the max timestamp is greater than the timestamp in the last time index entry, or 
    2. the time index is empty
    We will insert a time index entry in the following scenarios:
    1. A time index file is empty and a message is appended to the log segment.
    2. If the timestamp of the appended message is greater than the timestamp of the last time index entry AND the broker has appended more than index.interval.bytes since last time index entry insertion.
  6. When a log segment is closed, the broker will write a time index entry to the time index file. That time index entry points to has the message with the largest timestamp in this log the this segment and points to the base offset of the next segment.
  7. The default initial / max size of the time index files is the same as the offset index files. (time index entry is the same as offset index files1.5x of the size of offset index entry, user should set the configuration accordingly).
  8. If all the messages in a log segment have message.format.version before 0.10.0, the broker will insert (last_modification_time_of_the_segment -> offset_of_the_last_message_in_the_segment) into the time index file.the broker will insert (last_modification_time_of_the_segment -> base_offset_of_next_segment) into the time index file when the segment rolls out.

Search messages by timestamp

The way it works will be the same as offset search - find the closet indexed timestamp and corresponding offset, then start the linear scan the log until find the target message.

The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. 

Broker startup

On broker startup, The latest timestamp is needed for the next log index append. The broker will find the largest timestamp of the active segment by looking at the last inserted time index entry and scan from there till the log end. Broker only do this if message.format.version is greater than 0.10.0. Otherwise the broker will skip reloading the largest timestamp.

Log Truncation

When the log is truncated, because the offset in the time index is also monotonically increasing, we will also truncate the time index entries whose offsets corresponding messages have been truncated.

Enforce time based log retention

...