Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Allow user to stamp the message when produce
  2. When broker receives a message it take a look at the difference between its local time and the timestamp in the message.
    1. If message.timestamp.type=LogAppendTime, the server will override the timestamp with its current local time and append the message to the log
      1. If the message is a compressed message. the timestamp in the compressed message will be updated to current server time.
      2. If the message is a non-compressed message, the timestamp in the message will be overwritten to current server time.
    2. If message.timestamp.type=CreateTime
      1. If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the
      log
      1. log. For compressed message, server will update the timestamp in compressed message to -1.
      2. If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will
      override
      1. reject the
      timestamp with its current local time and append the message to the log.
      1. entire batch with TimestampExceededThresholdException.
  3. message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.
  4. The indexed will be built so it has the following guarantees. 
    1. If user search by timestamp:
      1. all the messages after that timestamp will be consumed.
      2. user might see earlier messages.
    2. The log retention will take a look at the last time index entry in the time index file. Because the last entry will be the latest timestamp in the entire log segment. If that entry expires, the log segment will be deleted.
    3. The log rolling has to depend on the earliest timestamp. In this case we may need to keep a in memory timestamp only for the current active log. On recover, we will need to read the active log segment to get this timestamp of the earliest messages.
  5. The downside of this proposal are:
    1. The timestamp might not be monotonically increasing.
    2. The log retention might become non-deterministic. i.e. When a message will be deleted now depends on the timestamp of the other messages in the same log segment. And those timestamps are provided by user within a range depending on what the time difference threshold configuration is.
    3. The semantic meaning of the timestamp is vague because some of the timestamp might have been overwritten and some might not.
  6. Although the proposal has some downsides, it gives user the flexibility to use the timestamp.
    1. If the time difference threshold is set to Long.MaxValue. The timestamp in the message is equivalent to CreateTime.
    2. If the time difference threshold is set to 0. The timestamp in the message is equivalent to LogAppendTime.
    3. If the time difference threshold is between 0 and Long.MaxValue, it ensures the messages will always have a timestamp within a certain range.

...

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Time Timestamp => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

...

  • If user specify the timestamp for a ProducerRecord, the ProducerRecord will be sent with this timestamp.
  • If user does not specify the timestamp for a ProducerRecord, the producer stamp the ProducerRecord with current time.
  • ConsumerRecord will have the timestamp of the message that were stored on broker.

Broker configuration change

...

add the following two configurations to broker:

  • message.timestamp.type
  • max.message.time.difference.ms

...

The following This configuration default value will be set to used:

message.timestamp.type=CreateTime

max.message.time.difference.ms=Long.MaxValue

I. i.e. by default the server will not override any user timestamp.

...