Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP will likely be implemented with KIP-31, if possible, to avoid changing wire protocol twice.

This KIP is closely related to KIP-33, which is about building time index as well as use cases on top of the time index using the timestamp added in this KIP.

Public Interfaces

There are a few options to achieve the goals. Each has their own pros and cons. Please see the details below.

Proposed Changes

There are three options proposed before this proposal. The details of option 1, option 2 and Option 3 are in the Rejected Options section.

Add a Timestamp field to the message format with maximum allowed time difference configuration on broker.

This KIP has the following public interface changes:

  1. Add a new timestamp field to the message format.
  2. Add the following two configurations to the broker
    1. message.timestamp.type - This topic level configuration defines the type of timestamp in the messages of a topic. The valid values are CreateTime or LogAppendTime.
    2. max.message.time.difference.ms - This configuration only works when message.timestamp.type=CreateTime. The broker will only accept messages whose timestamp differs no more than max.message.time.difference.ms from the broker local time.
  3. Add a timestamp field to ProducerRecord and ConsumerRecord. A producer will be able to set a timestamp for a ProducerRecord. A consumer will see the message timestamp when it sees the messages.
  4. Add ProduceRequest/ProduceResponse V2 to use the new message format.
  5. Add FetchRequest/FetchResponse V2 to use the new message format.

For more detail information of the above changes, please refer to the next section.

Proposed Changes

There are three options proposed before this proposal. The details of After extended discussion over option 1, option 2 and option 3. It turns out to be very difficult to meet all the following requirements at the same time:

  1. Have only one timestamp concept in Kafka.
  2. Enforce the time based log retention / log rolling for different use cases. (Some use case needs to based on LogAppendTime while others prefer CreateTime)
  3. Protect the broker from misbehaving users (e.g. appending wrong create time to messages)

Option 3 are in the Rejected Options section.

Add a Timestamp field to the message format with maximum allowed time difference configuration on broker.

The proposed change will implement the following behaviorsOption 4 adds a configuration to the broker to allow users to decide which timestamp they want to use according to their use case.

  1. Allow user to stamp the message when produce
  2. When a leader broker receives a message
    1. If message.timestamp.type=LogAppendTime, the server will override the timestamp with its current local time and append the message to the log.
      1. If the message is a compressed message. the timestamp in the compressed message will be updated to current server time. Broker will check to make sure all the inner messages has timestamp -1. If the inner message's timestamp is not -1, the broker will overwrite it and do the recompression. We do this instead of writing current server time to each message is to avoid recompression penalty when people are using LogAppendTime.
      2. If the message is a non-compressed message, the timestamp in the message will be overwritten to current server time.
    2. If message.timestamp.type=CreateTime
      1. If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the log. For compressed message, server will update the timestamp in compressed message to the largest timestamp of the inner messages.
      2. If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will reject the entire batch with TimestampExceededThresholdException.
  3. When a follower broker receives a message
    1. If the message is a compressed message, the timestamp in the compressed message will be used to build the time index. i.e. the timestamp in a compressed messages is always the largest timestamp of all its inner messages.
    2. If the message is a non-compressed message, the timestamp of this message will be used to build time index.
  4. When a consumer receives a message
    1. If a message is a compressed message
      1. If the inner message timestamp is not -1, that timestamp will be used.
      2. If the inner message timestamp is -1, the timestamp of the compressed message will be used as its timestamp.
    2. If a message is a non-compressed message, the timestamp of the message will be used.
  5. message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.
  6. The indexed will be built so it has the following guarantees.  (Please notice that the time index will be implemented in KIP-33 instead of this KIP. The behavior discussion is put here because it is closely related to the design of this KIP)
    1. If user search by a timestamp:
      1. all the messages after the searched timestamp will be consumed.
      2. user might see earlier messages.
    2. The log retention will take a look at the last time index entry in the time index file. Because the last entry will be the latest timestamp in the entire log segment. If that entry expires, the log segment will be deleted.
    3. The log rolling has to will depend on the earliest largest timestamp . In this case we may need to keep a in memory timestamp only for the current active log. On recover, we will need to read the active log segment to get this timestamp of the earliest messagesof all messages ever seen. If no message is appended in log.roll.ms after largest appended timestamp, a new log segment will be rolled out.
  7. The downside of this proposal are:
    1. The timestamp might not be monotonically increasing.
    2. The log retention might become non-deterministic. i.e. When a message will be deleted now depends on the timestamp of the other messages in the same log segment. And those timestamps are provided by user within a range depending on what the time difference threshold configuration is.
    3. The semantic meaning of the timestamp is vague because some of the timestamp might have been overwritten and some might not.
  8. Although the proposal has some downsides, it gives user the flexibility to use the timestamp.If the time difference threshold is set to Long.MaxValue. The timestamp in the message is equivalent to CreateTime.
    1. If the message.timestamp.type=CreateTime
      1. When time difference threshold is set to
      0
      1. Long.MaxValue. The timestamp in the message is equivalent to
      LogAppendTime
      1. CreateTime.
      If the
      1. When time difference threshold is between 0 and Long.MaxValue, it ensures the messages will always have a timestamp within a certain range.
    2. If message.timestamp.type=LogAppendTime, the timestamps will be log append time.

The following changes are needed will be made to implement the above proposal.

...

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
     MagicByte => int8MagicByte => int8  <---------------------- Bump up magic byte to 1
    Attributes => int8
    Timestamp => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

...

I.e. by default the server will not override any user timestampoverride any user timestamp.

Add ProduceRequest/ProduceResponse V2 and FetchRequest/FetchResponse V2

The fields of ProduceRequest and FetchResponse V2 will be the same as V1. The difference is the format of the messages.

The fields of the ProduceResponse and FetchRequest V2 will be the same as V1.

We bump up ProduceRequest/FetchRequest (and responses) versions to indicate the broker that this client supports new message format.

Build Time Index for messages based on message timestamp (will be implemented in KIP-33)

...

After phase 1, it is possible for user to canary a broker in phase 2 and roll back if something goes wrong. The procedure is:broker in phase 2 and roll back if something goes wrong. The procedure is:

  1. Set message.format.version=0.10.0 on one of the brokers (broker B).
  2. Broker B will start to act like what described in phase 2.
    1. It will send FetchRequest V2 to other brokers for replication.
    2. It will only see ProduceRequest/FetchRequest V1 from other brokers and clietns.
  3. If something goes wrong, we can do the following to rollback:
    1. shutdown broker B
    2. nuke the data of the topics it was serving as leader before shutdown
    3. set
    Set
    1. message.format.version=0.
    10
    1. 9.0
    on one of the brokers (broker B).
  4. Broker B will start to act like what described in phase 2.
    1. It will sends FetchRequest V2 to other brokers for replication.
    2. It will only see ProduceRequest/FetchRequest V1 from other brokers and clietns.
  5. If something goes wrong, we can do the following to rollback:
    1. shutdown broker B
    2. nuke the data of the topics it was serving as leader before shutdown
    3. set message.format.version=0.9.0
    4. restart the broker to let the broker replicate from leaders. At this point the data on disk will be in MessageAndOffset V0.

In step 2, it is recommended to put only small amount of leaders on the broker, because at that point the broker needs to do down conversion for all the fetch requests.

...

    1. restart the broker to let the broker replicate from leaders. At this point the data on disk will be in MessageAndOffset V0.

In step 2, it is recommended to put only small amount of leaders on the broker, because at that point the broker needs to do down conversion for all the fetch requests.

Rejected Alternatives

After extended discussion over option 1, option 2 and option 3. It turns out to be very difficult to meet all the following requirements at the same time:

  1. Have only one timestamp concept in Kafka.
  2. Enforce the time based log retention / log rolling for different use cases. (Some use case needs to based on LogAppendTime while others prefer CreateTime)
  3. Protect the broker from misbehaving users (e.g. appending wrong create time to messages)

The final proposal adds a configuration to the broker to allow users to decide which timestamp they want to use according to their use case.

Option 1 - Add both CreateTime and LogAppendTime to message format

...