Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Allow user to stamp the message when produce
  2. When a leader broker receives a message
    1. If message.timestamp.type=LogAppendTime, the server will override the timestamp with its current local time and append the message to the log.
      1. If the message is a compressed message. the timestamp in the compressed wrapper message will be updated to current server time. Broker will check to make sure all the inner messages has timestamp -1. If set the timestamp type bit in wrapper messages to 1. Broker will ignore the inner message 's timestamp is not -1, the broker will overwrite it and do the recompression. We do this instead of writing current server time to each message is to avoid recompression penalty when people are using LogAppendTime.
      2. If the message is a non-compressed message, the timestamp in the message will be overwritten to current server time.
    2. If message.timestamp.type=CreateTime
      1. If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the log. For compressed message, server will update the timestamp in compressed message to the largest timestamp of the inner messages.
      2. If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will reject the entire batch with TimestampExceededThresholdException.
  3. When a follower broker receives a message
    1. If the message is a compressed message, the timestamp in the compressed message will be used to build the time index. i.e. the timestamp in a compressed messages is always the largest timestamp of all its inner messages.
    2. If the message is a non-compressed message, the timestamp of this message will be used to build time index.
  4. When a consumer receives a message
    1. If a message is a compressed message
      1. If the inner wrapper message timestamp is not -1, that attribute bit is 0 (CreateTime), the inner messages' timestamp will be used.
      2. If the inner wrapper message timestamp attribute bit is - 1, the timestamp of the compressed wrapper message will be used as its the timestamp of inner messages.
    2. If a message is a non-compressed message, the timestamp of the message will be used.
  5. message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.
  6. In ProduceResponse V2, a timestamp will be returned for each partition.
    1. If the topic uses LogAppendTime, the timestamp returned would be the LogAppendTime for the message set.
    2. If the topic uses Create Time, the timestamp returned would be NoTimestamp.
    3. When producer invokes the callback for each message, it uses the timestamp returned in the produce response if it is not NoTimestamp. Otherwise, it uses the message timestamp which is tracked by the producer.
    4. The producer will be able to tell whether the timestamp is LogAppendTime or CreateTime in this case.
  7. The time index will be built so it has the following guarantees. (Please notice that the time index will be implemented in KIP-33 instead of this KIP. The behavior discussion is put here because it is closely related to the design of this KIP)
    1. If user search by a timestamp:
      1. all the messages after the searched timestamp will be consumed.
      2. user might see earlier messages.
    2. The log retention will take a look at the last time index entry in the time index file. Because the last entry will be the latest timestamp in the entire log segment. If that entry expires, the log segment will be deleted.
    3. The log rolling will depend on the largest timestamp of all messages ever seen. If no message is appended in log.roll.ms after largest appended timestamp, a new log segment will be rolled out.
  8. The downside of this proposal are:
    1. The timestamp might not be monotonically increasing if message.timestamp.type=CreateTime.
    2. The log retention might become non-deterministic. i.e. When a message will be deleted now depends on the timestamp of the other messages in the same log segment. And those timestamps are provided by user within a range depending on what the time difference threshold configuration is.
  9. Although the proposal has some downsides, it gives user the flexibility to use the timestamp.
    1. If message.timestamp.type=CreateTime
      1. When time difference threshold is set to Long.MaxValue. The timestamp in the message is equivalent to CreateTime.
      2. When time difference threshold is between 0 and Long.MaxValue, it ensures the messages will always have a timestamp within a certain range.
    2. If message.timestamp.type=LogAppendTime, the timestamps will be log append time.

...

Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 1
    Attributes => int8 <---------------------- The 4th LSB will be used to indicate timestamp type.
    Timestamp => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

...