Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
  2. Log rolling might break for a newly created replica as well because of the same reason as (1).

...

  1. Some use cases such as streaming processing

...

  1. needs a timestamp

...

  1. in messages.

This KIP is a distilled/improved version of an earlier discussion that we started.

...

  1. Add a new timestamp field to the message format.
  2. Add the following two configurations to the broker
    1. message.timestamp.type - This topic level configuration defines the type of timestamp in the messages of a topic. The valid values are CreateTime or LogAppendTime.
    2. max.message.time.difference.ms - This configuration only works when message.timestamp.type=CreateTime. The broker will only accept messages whose timestamp differs no more than max.message.time.difference.ms from the broker local time.
  3. Add a timestamp field to ProducerRecord and ConsumerRecord. A producer will be able to set a timestamp for a ProducerRecord. A consumer will see the message timestamp when it sees the messages.
  4. Add ProduceRequest/ProduceResponse V2 to use the new message format.
  5. Add FetchRequest/FetchResponse V2 to use the new message format.

For more detail information of the above changes, please refer to the next section.

Proposed Changes

  1. V2 which uses the new message format.
  2. Add FetchRequest/FetchResponse V2 which uses the new message format.

For more detail information of the above changes, please refer to the Proposed Changes section.

Proposed Changes

There are three options proposed before this proposal. The details of option 1, option 2 and Option 3 are in the Rejected Alternatives section.

The key decision we made in this KIP is whether use LogAppendTime(Broker Time) or CreateTime (Application Time)

The good things about LogAppendTime are:

  1. Broker is more robust.
  2. Monotonically increasing.
  3. Deterministic behavior for log rolling and retention.

The good things about CreateTime are:

  1. More intuitive to users.
  2. User may want to have log retention based on when the message is created instead of when the message enters the pipeline.
  3. Immutable after entering the pipeline.

Because both LogAppendTime and CreateTime have their own use cases, the proposed change provides users with the flexibility to choose which one they want to use.

For more detail discussion please refer to the mail thread as well as the Rejected Alternatives There are three options proposed before this proposal. The details of option 1, option 2 and Option 3 are in the Rejected Options section.

Add a Timestamp field to the message format with maximum allowed time difference configuration on broker.

...

  1. Allow user to stamp the message when produce
  2. When a leader broker receives a message
    1. If message.timestamp.type=LogAppendTime, the server will override the timestamp with its current local time and append the message to the log.
      1. If the message is a compressed message. the timestamp in the compressed message will be updated to current server time. Broker will check to make sure all the inner messages has timestamp -1. If the inner message's timestamp is not -1, the broker will overwrite it and do the recompression. We do this instead of writing current server time to each message is to avoid recompression penalty when people are using LogAppendTime.
      2. If the message is a non-compressed message, the timestamp in the message will be overwritten to current server time.
    2. If message.timestamp.type=CreateTime
      1. If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the log. For compressed message, server will update the timestamp in compressed message to the largest timestamp of the inner messages.
      2. If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will reject the entire batch with TimestampExceededThresholdException.
  3. When a follower broker receives a message
    1. If the message is a compressed message, the timestamp in the compressed message will be used to build the time index. i.e. the timestamp in a compressed messages is always the largest timestamp of all its inner messages.
    2. If the message is a non-compressed message, the timestamp of this message will be used to build time index.
  4. When a consumer receives a message
    1. If a message is a compressed message
      1. If the inner message timestamp is not -1, that timestamp will be used.
      2. If the inner message timestamp is -1, the timestamp of the compressed message will be used as its timestamp.
    2. If a message is a non-compressed message, the timestamp of the message will be used.
  5. message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.
  6. The indexed will be built so it has the following guarantees. (Please notice that the time index will be implemented in KIP-33 instead of this KIP. The behavior discussion is put here because it is closely related to the design of this KIP)
    1. If user search by a timestamp:
      1. all the messages after the searched timestamp will be consumed.
      2. user might see earlier messages.
    2. The log retention will take a look at the last time index entry in the time index file. Because the last entry will be the latest timestamp in the entire log segment. If that entry expires, the log segment will be deleted.
    3. The log rolling will depend on the largest timestamp of all messages ever seen. If no message is appended in log.roll.ms after largest appended timestamp, a new log segment will be rolled out.
  7. The downside of this proposal are:
    1. The timestamp might not be monotonically increasing.
    2. The log retention might become non-deterministic. i.e. When a message will be deleted now depends on the timestamp of the other messages in the same log segment. And those timestamps are provided by user within a range depending on what the time difference threshold configuration is.The semantic meaning of the timestamp is vague because some of the timestamp might have been overwritten and some might not.
  8. Although the proposal has some downsides, it gives user the flexibility to use the timestamp.
    1. If message.timestamp.type=CreateTime
      1. When time difference threshold is set to Long.MaxValue. The timestamp in the message is equivalent to CreateTime.
      2. When time difference threshold is between 0 and Long.MaxValue, it ensures the messages will always have a timestamp within a certain range.
    2. If message.timestamp.type=LogAppendTime, the timestamps will be log append time.

...