Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. When broker receives a message, if the message is not rejected due to timestamp exceeds threshold, the message will be appended to the log. 
  2. The timestamp will either be LogAppendTime or CreateTime depending on the configuration.
  3. If the message timestamp falls into a new time.index.interval bucket which is greater than the previous appended index entry, the broker appends a new time index entry to the time index with the new time.index.interval bucket timestamp.
  4. When a log segment is closed, if the message with largest timestamp is in this closed segment, the broker will insert a time index entry to the time index. The time index entry points to that message with largest timestamp.
  5. When a new log segment is created, the broker will insert a time index entry to the time index of the new log segment when the first message whose timestamp falls into a new time.index.interval bucket is appended to the log segment. 
  6. It is possible that a log segment does not have any time index entry if all the messages has smaller timestamp than the previous log segments. In that case the time based index would be empty.
  7. If all the messages in a log segment have message.format.version before 0.10.0, broker will insert a time index entry (last_modification_time_of_the_segment -> offset_of_the_last_message_in_the_segment) to the time index.

Broker startup

On broker startup, the broker will need to find the latest timestamp of the current active log segment. The latest timestamp may needed for the next log index append. So the broker will need to scan from the current active log segment back to earlier log segment until it finds the latest timestamp of messages.

...

The change is backward compatible after KIP-31 and KIP-32 are checked in.

Broker will do the followings for retention during migration:

  1. The broker will rebuild a time based log index for each segment if the segment does not have a time index.
    1. If the message.format.version of a topic is before 0.10.0, the time index will only have one entry (last_modification_time_of_the_segment -> offset_of_the_last_message_in_the_segment)
    2. If the message.format.version of a topic is on 0.10.0, the broker will scan the messages in a log segment and rebuild the timestamp, if no message has a timestamp in the segment, the entry (last_modification_time_of_the_segment -> offset_of_the_last_message_in_the_segment) will be inserted to the log index.
  2. After the entire cluster is migrated to use time based log index for log retention:
    1. For segments only has messages whose versions are before 0.10.0, the entry with last modification time in the time index will be used for retention.
    2. For segments has at least one message whose version is after 0.10.0, the max timestamp of the messages will be used for log retention.

Broker will do the followings for log rolling during migration.

  1. Broker will initially use the segment last modification time as the max message timestamp.
  2. If a new message whose version is after 0.10.0 and its timestamp is greater than current max message timestamp. The broker updates the current max message timestamp.
  3. Broker always use the difference between current time and max message timestamp to decide whether roll out a new log segment or not.

 

 

Rejected Alternatives

Add a timestamp field to log index entry

...