Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently these operations depend on the create time / modification time of the log segment file. This has a few issueissues.

  1. Searching offset by timestamp has very coarse granularity (log segment level), it also does not work well when replica is reassigned.
  2. The time based log rolling and retention does not work well when replica is reassigned.

In this KIP we want to introduce propose introducing a time based log index using the timestamp in of the messages introduced in KIP-32.

...

Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption. The time index needs to be built for each log segment file just like the offset index.

Use a time index for each log segment to save the timestamp -> log offset at a configurable granularity

Create another index file for each log segment with name SegmentBaseOffset.time.index to have index at a configurable level. . The granularity of the index is defined by time.index.interval configuration.

The time index entry format is:

 

Code Block
languagejava
Time Index Entry => Timestamp Offset
  Timestamp => int64
  Offset => int32

The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. The way it works will be the same as offset search - find the closet timestamp and corresponding offset, then start the leaner linear scan over the log until find the target message. Although the granularity is configurable, it is recommended to have a minute level granularity because timestamp based search is usually rare so it probably does not worth investing significant amount of memory in it.

The following table give the summary of memory consumption using different granularity. The number is calculated based on a broker with 3500 partitions.

...

On broker startup, the broker will need to find the latest timestamp of the current active log segment. The latest timestamp may needed for the next log index append. So the broker will need to scan from the current active log segment back to earlier log segment until it finds the latest timestamp of messages.

Enforce time based log retention

To enforce time based log retention, the broker will check the last time index entry of a log segment. The timestamp will be the latest timestamp of the messages in the log segment. So if that timestamp expires, the broker will delete the log segment. If the log index is empty, the broker will check the previous time index.

Enforce time based log rolling

Currently time based log rolling is based on the creating time of the log segment. With this KIP, the time based rolling would be changed to based on the largest timestamp ever seen. A new log segment will be rolled out if current time is greater than largest timestamp ever seen + log.roll.ms. When message.timestamp.type=CreateTime, user should set max.message.time.difference.ms appropriately together with log.roll.ms to avoid frequent log segment roll out.

Search message by timestamp

Searching by timestamp will have better accuracy. The guarantees provided are:

...

  1. Each broker keeps in memory a timestamp index map - Map[TopicPartitionSegment, Map[TimestampByMinute, Offset]]
    1. The timestamp is on minute boundary
    2. The offset is the offset of the first message in the log segment that falls into a minute
  2. Create a timestamp index file for each log segment. The entry in the file is in following format:

    Code Block
    languagejava
    Time Index Entry => Timestamp Offset
      Timestamp => int64
      Offset => int32

    So the timestamp index file will simply become a persistent copy of timestamp index map. Broker will load the timestamp map from the file on startup.

  3. When a broker (regardless leader or follower) receives a message, it does the following:
    1. Find which minute MIN the message with offset OFFSET falls in
    2. Check if MIN has already been in the in memory timestamp map for current log segment. If the timestamp does not exist, then the broker add [MIN->OFFSET] to both the in memory timestamp index map and the timestamp index file.
  4. When a log segment is deleted, the broker:
    1. Remove the TopicPartitionSegment key from in memory map
    2. Remove the log segment timestamp index file

m