Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Add a new timestamp field to the message format.
  2. Add the following two configurations to the broker
    1. message.timestamp.type - This topic level configuration defines the type of timestamp in the messages of a topic. The valid values are CreateTime or LogAppendTime.
    2. max.message.time.difference.ms - This configuration only works when message.timestamp.type=CreateTime. The broker will only accept messages whose timestamp differs no more than max.message.time.difference.ms from the broker local time.
  3. Add a timestamp field to ProducerRecord and ConsumerRecord. A producer will be able to set a timestamp for a ProducerRecord. A consumer will see the message timestamp when it sees the messages.
  4. Add ProduceRequest/ProduceResponse V2 which uses the new message format.
  5. Add a timestamp in ProduceResponse V2 for each partition. The timestamp will either be LogAppendTime if the topic is configured to use it or it will be NoTimestamp if create time is used.
  6. Add FetchRequest/FetchResponse V2 which uses the new message format.

...

  1. Allow user to stamp the message when produce
  2. When a leader broker receives a message
    1. If message.timestamp.type=LogAppendTime, the server will override the timestamp with its current local time and append the message to the log.
      1. If the message is a compressed message. the timestamp in the compressed message will be updated to current server time. Broker will check to make sure all the inner messages has timestamp -1. If the inner message's timestamp is not -1, the broker will overwrite it and do the recompression. We do this instead of writing current server time to each message is to avoid recompression penalty when people are using LogAppendTime.
      2. If the message is a non-compressed message, the timestamp in the message will be overwritten to current server time.
    2. If message.timestamp.type=CreateTime
      1. If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the log. For compressed message, server will update the timestamp in compressed message to the largest timestamp of the inner messages.
      2. If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will reject the entire batch with TimestampExceededThresholdException.
  3. When a follower broker receives a message
    1. If the message is a compressed message, the timestamp in the compressed message will be used to build the time index. i.e. the timestamp in a compressed messages is always the largest timestamp of all its inner messages.
    2. If the message is a non-compressed message, the timestamp of this message will be used to build time index.
  4. When a consumer receives a message
    1. If a message is a compressed message
      1. If the inner message timestamp is not -1, that timestamp will be used.
      2. If the inner message timestamp is -1, the timestamp of the compressed message will be used as its timestamp.
    2. If a message is a non-compressed message, the timestamp of the message will be used.
  5. message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.
  6. The indexed will be built so it has the following guarantees. (Please notice that the time index will be implemented in KIP-33 instead of this KIP. The behavior discussion is put here because it is closely related to the design of this KIP)
    1. If user search by a timestamp:
      1. all the messages after the searched timestamp will be consumed.
      2. user might see earlier messages.
    2. The log retention will take a look at the last time index entry in the time index file. Because the last entry will be the latest timestamp in the entire log segment. If that entry expires, the log segment will be deleted.
    3. The log rolling will depend on the largest timestamp of all messages ever seen. If no message is appended in log.roll.ms after largest appended timestamp, a new log segment will be rolled out.
  7. The downside of this proposal are:
    1. The timestamp might not be monotonically increasing if message.timestamp.type=CreateTime.
    2. The log retention might become non-deterministic. i.e. When a message will be deleted now depends on the timestamp of the other messages in the same log segment. And those timestamps are provided by user within a range depending on what the time difference threshold configuration is.
  8. Although the proposal has some downsides, it gives user the flexibility to use the timestamp.
    1. If message.timestamp.type=CreateTime
      1. When time difference threshold is set to Long.MaxValue. The timestamp in the message is equivalent to CreateTime.
      2. When time difference threshold is between 0 and Long.MaxValue, it ensures the messages will always have a timestamp within a certain range.
    2. If message.timestamp.type=LogAppendTime, the timestamps will be log append time.

The following changes will be made to implement the above proposal.

Wire protocol change - add a Time field to the message format

  1. In ProduceResponse V2, a timestamp will be returned for each partition.
    1. If the topic uses LogAppendTime, the timestamp returned would be the LogAppendTime for the message set.
    2. If the topic uses Create Time, the timestamp returned would be NoTimestamp.
    3. When producer invokes the callback for each message, it uses the timestamp returned in the produce response if it is not NoTimestamp. Otherwise, it uses the message timestamp which is tracked by the producer.
    4. The producer will be able to tell whether the timestamp is LogAppendTime or CreateTime in this case.
  2. The time index will be built so it has the following guarantees. (Please notice that the time index will be implemented in KIP-33 instead of this KIP. The behavior discussion is put here because it is closely related to the design of this KIP)
    1. If user search by a timestamp:
      1. all the messages after the searched timestamp will be consumed.
      2. user might see earlier messages.
    2. The log retention will take a look at the last time index entry in the time index file. Because the last entry will be the latest timestamp in the entire log segment. If that entry expires, the log segment will be deleted.
    3. The log rolling will depend on the largest timestamp of all messages ever seen. If no message is appended in log.roll.ms after largest appended timestamp, a new log segment will be rolled out.
  3. The downside of this proposal are:
    1. The timestamp might not be monotonically increasing if message.timestamp.type=CreateTime.
    2. The log retention might become non-deterministic. i.e. When a message will be deleted now depends on the timestamp of the other messages in the same log segment. And those timestamps are provided by user within a range depending on what the time difference threshold configuration is.
  4. Although the proposal has some downsides, it gives user the flexibility to use the timestamp.
    1. If message.timestamp.type=CreateTime
      1. When time difference threshold is set to Long.MaxValue. The timestamp in the message is equivalent to CreateTime.
      2. When time difference threshold is between 0 and Long.MaxValue, it ensures the messages will always have a timestamp within a certain range.
    2. If message.timestamp.type=LogAppendTime, the timestamps will be log append time.

The following changes will be made to implement the above proposal.

Wire protocol change - add a Time field to the message format

Code Block
languagejava
Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 1
    Attributes => int8
    Timestamp => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

...

I.e. by default the server will not override any user timestamp.

Add a timestamp field to RecordMetadata

  • The timestamp in record metadata will be LogAppendTime if it is returned from broker, or it will be the timestamp set by user in ProducerRecord.
  • When producer invokes the callback for a message, the timestamp will be available through RecordMetadata.

Add ProduceRequest/ProduceResponse V2 and FetchRequest/FetchResponse V2

The fields of ProduceRequest and FetchResponse V2 will be the same as V1. The difference is the format of the messages.

The fields of ProduceResponse V2 will be changed to following:

Code Block
languagejava
titleProduceResponse V2
ProduceResponse => [ResponseStatus] ThrottleTime  
  ResponseStatus => TopicName [Partition ErrorCode Offset timestamp]
    TopicName => string
    Partition => int32
    ErrorCode => int16
    Offset => int64
    timestamp => int64 <------------------- NEW
  
  ThrottleTime => int 64

 

The fields of the ProduceResponse and FetchRequest V2 will be the same as V1.

...