Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: AcceptedUnder Discussion

Discussion thread: here

JIRA: KAFKA-2511

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This KIP will likely be implemented with KIP-31, if possible, to avoid changing wire protocol twice.

This KIP is closely related to KIP-33, which is about building time index as well as use cases on top of the time index using the timestamp added in this KIP.

...

  1. Add a new timestamp field to the message format.
  2. Use the fourth least significant bit to indicate the timestamp type. (0 for CreateTime, 1 for LogAppendTime)
  3. Add the following two configurations to the broker
    1. message.timestamp.type - This topic level configuration defines the type of timestamp in the messages of a topic. The valid values are CreateTime or LogAppendTime.
    2. max.message.time.difference.ms - This configuration only works when message.timestamp.type=CreateTime. The broker will only accept messages whose timestamp differs no more than max.message.time.difference.ms from the broker local time.
  4. Add a timestamp field to ProducerRecord and ConsumerRecord. A producer will be able to set a timestamp for a ProducerRecord. A consumer will see the message timestamp when it sees the messages.
  5. Add ProduceRequest/ProduceResponse V2 which uses the new message format.
  6. Add a timestamp in ProduceResponse V2 for each partition. The timestamp will either be LogAppendTime if the topic is configured to use it or it will be NoTimestamp if create time is used.
  7. Add FetchRequest/FetchResponse V2 which uses the new message format.
  8. Add a timestamp variable to RecordMetadata. The timestamp is the timestamp of messages appended to partition log.

For more detail information of the above changes, please refer to the Proposed Changes section.

...

  1. Allow user to stamp the message when produce
  2. When a leader broker receives a message
    1. If message.timestamp.type=LogAppendTime, the server will override the timestamp with its current local time and append the message to the log.
      1. If the message is a compressed message. the timestamp in the compressed wrapper message will be updated to current server time. Broker will check to make sure all the inner messages has timestamp -1. If set the timestamp type bit in wrapper messages to 1. Broker will ignore the inner message 's timestamp is not -1, the broker will overwrite it and do the recompression. We do this instead of writing current server time to each message is to avoid recompression penalty when people are using LogAppendTime.
      2. If the message is a non-compressed message, the timestamp in the message will be overwritten to current server time.
    2. If message.timestamp.type=CreateTime
      1. If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the log. For compressed message, server will update the timestamp in compressed message to the largest timestamp of the inner messages.
      2. If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will reject the entire batch with TimestampExceededThresholdException.
  3. When a follower broker receives a message
    1. If the message is a compressed message, the timestamp in the compressed message will be used to build the time index. i.e. the timestamp in a compressed messages is always the largest timestamp of all its inner messages.
    2. If the message is a non-compressed message, the timestamp of this message will be used to build time index.
  4. When a consumer receives a message
    1. If a message is a compressed message
      1. If the inner wrapper message timestamp is not -1, that attribute bit is 0 (CreateTime), the inner messages' timestamp will be used.
      2. If the inner wrapper message timestamp attribute bit is - 1, the timestamp of the compressed wrapper message will be used as its the timestamp of inner messages.
    2. If a message is a non-compressed message, the timestamp of the message will be used.
  5. message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.
  6. In ProduceResponse V2, a timestamp will be returned for each partition.
    1. If the topic uses LogAppendTime, the timestamp returned would be the LogAppendTime for the message set.
    2. If the topic uses Create Time, the timestamp returned would be NoTimestamp.
    3. When producer invokes the callback for each message, it uses the timestamp returned in the produce response if it is not NoTimestamp. Otherwise, it uses the message timestamp which is tracked by the producer.
    4. The producer will be able to tell whether the timestamp is LogAppendTime or CreateTime in this case.
  7. The time index will be built so it has the following guarantees. (Please notice that the time index will be implemented in KIP-33 instead of this KIP. The behavior discussion is put here because it is closely related to the design of this KIP)
    1. If user search by a timestamp:
      1. all the messages after the searched timestamp will be consumed.
      2. user might see earlier messages.
    2. The log retention will take a look at the last time index entry in the time index file. Because the last entry will be the latest timestamp in the entire log segment. If that entry expires, the log segment will be deleted.
    3. The log rolling will depend on the largest timestamp of all messages ever seen. If no message is appended in log.roll.ms after largest appended timestamp, a new log segment will be rolled out.
  8. The downside of this proposal are:
    1. The timestamp might not be monotonically increasing if message.timestamp.type=CreateTime.
    2. The log retention might become non-deterministic. i.e. When a message will be deleted now depends on the timestamp of the other messages in the same log segment. And those timestamps are provided by user within a range depending on what the time difference threshold configuration is.
  9. Although the proposal has some downsides, it gives user the flexibility to use the timestamp.
    1. If message.timestamp.type=CreateTime
      1. When time difference threshold is set to Long.MaxValue. The timestamp in the message is equivalent to CreateTime.
      2. When time difference threshold is between 0 and Long.MaxValue, it ensures the messages will always have a timestamp within a certain range.
    2. If message.timestamp.type=LogAppendTime, the timestamps will be log append time.

...

Code Block
languagejava
MessageAndOffset => Offset MessageSize Message
  Offset => int64  
  MessageSize => int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8  <---------------------- Bump up magic byte to 1
    Attributes => int8 <---------------------- The 4th LSB will be used to indicate timestamp type.
    Timestamp => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

...

Code Block
languagejava
titleProduceResponse V2
ProduceResponse => [ResponseStatus] ThrottleTime  
  ResponseStatus => TopicName [Partition ErrorCode Offset timestamp]
    TopicName => string
    Partition => int32
    ErrorCode => int16
    Offset => int64
    timestamp => int64 <------------------- NEW
  
  ThrottleTime => int 64

 


The fields of FetchRequest V2 will be the same as V1.

...

  • Add a CreateTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both CreateTime to ConsumerRecord.
    • The CreateTime is useful in use cases such as stream processing

...


Option discussions with use cases

...

This section discusses how the three options work with a few use cases. Option 1 and Option 2 are in the rejected option section.

Options comparison

Use cases

Option 1

(Message contains CreateTime + LogAppendTime)

option 2

(Message contains LogAppendTime only)

option 3

(message contains CreateTime only, brokers keep LogAppendTime in log index)

option 4

(Message contains a timestamp that could be overridden by broker depending on configured time difference threshold)

Comparison
Mirror MakerBroker overrides the LAT and keep the CT as is.Broker overrides the LATBroker keep the CT as. And add index entry with LAT to the log index file.

Mirror maker will keep the consumed messages' timestamp. Those timestamp may or may not be overwritten by target cluster depending on the configuration.

Option 1 provides the most information to user. The only concern is whether we should expose LAT to user.

Option 2 loses the CreateTime information.

Option 3 have same amount information as option 1 from broker point of view. From user point of view, it does not expose the LAT.

Option 4 could be equivalent to either having CreateTime only or having LogAppendTime only depending on configuration.

Log RetentionBroker will use the LAT of the last message in a segment to enforce the policy.Same as option 1.

Broker will use the LAT of the last entry in the log index file to enforce the retention policy. Because the leader is the source of truth for LAT, followers need to get the LAT from leader when they replicate the messages. That means we need to introduce a new wire protocol to fetch the time based log index file as well.

When log recovery happens, the rebuilt time index would have different LAT from the actual arrival time of the messages in the log. And the LAT in the index file will be very close, or even the same.

The broker will take a look at the last Time Index entry of the segment to decide whether to delete the log segment or not.

Option 1 and option 2 can work with existing replication design and solve the log retention issue we have now.

Option 3 solves the issue but is a bit involved because it needs to replicate the time index entry as well.

Option 4 could be equivalent to either having CreateTime only or having LogAppendTime only depending on configuration.

Log rollingBroker will use the LAT of the first message in a log segment to enforce the policy.Same as option 1.

Broker will use the LAT of the first entry in the log index file to enforce the retention policy. Similar to the log retention case, the followers needs to replicate the time index as well.

The log recovery happens, the log rolling might not be honored either.

The broker will keep an in memory earliest message timestamp in the active log segment. 

On broker startup, the broker will need to scan the messages in the active segment to find the earliest timestamp.

Option 1 and option 2 solves the log rolling issue.

Option 3 solves the issue but is a bit involved because it needs to replicate the time index entry as well.

On broker start up, Option 4 needs to scan the active log segment to find the earliest timestamp in the log segment.

Stream processingApplications don't need to include the CreateTime in the payload but simply use the CreateTime field.Applications have to put CreateTime into the payload.Applications don't need to include the CreateTime in the payload but simply use the CreateTime field.Option 4 could be equivalent to either having CreateTime only or having LogAppendTime only depending on configuration.The benefit of having a CreateTime with each message rather than put it into payload is that application protocol can be simplified. It is convenient for the infrastructure to provide the timestamp so there is no need for each application to worry about the timestamp.
Latency measurementUser can get End2End latency and lag in time.User can get the lag in time.User can get End2End latency.Depending on the max.message.time.difference.ms configuration. User may or may not be able to find out the latency.Option 1 has most information for user.
Search message by timestamp.Detail discussion in KIP-33Detail discussion in KIP-33Detail discussion in KIP-33Detail discussion in KIP-33Detail discussion in KIP-33

Mirror maker case in detail

...

  • Pattern 1:
    Because the latency is small, so CreateTime and LogAppendTime will be close and their won't be much difference.
  • pattern 2:
    If the log retention is based on the message creation time, it will not be affected by the latency in the data pipeline because the send time will not change.
    If the log retention is based on the LogAppendTime, it will be affected by the latency in the pipeline. Because of the latency difference, some data can be deleted on one cluster, but not on another cluster in the pipeline.
  • Pattern 3: 
    When the messages with significantly different timestamp goes into a cluster at around same time, the retention policy is hard to follow if we use CreateTime. For example, imagine two mirror makers copy data from two source clusters to the same target cluster. If MirrorMaker1 is copying Messages with CreateTime around 1:00 PM today, and MirrorMaker2 is copying messages with CreateTime around 1:00 PM yesterday. Those messages can go to the same log segment in the target cluster. It will be difficult for broker to apply retention policy to the log segment. The broker needs to maintain the knowledge about the latest CreateTime of all messages in a log segment and persist the information somewhere.
  • Robustness: 
    If there is a message with CreateTime set to the future, the log might be kept for very long. Broker needs to sanity check the timestamp when receive the message. It could by tricky to determine which timestamp is not valid

Comparison:

 

pattern 1pattern 2pattern 3Robustness
PreferenceCT or LATCTLATLAT

In reality, we usually don't see all the pipeline has same large latency, so it looks LogAppendTime is preferable than CreateTime for log retention.

...

  • Pattern 1:
    Because the latency is small, so CreateTime and LogAppendTime will be close and their won't be much difference.
  • Pattern 2:
    When the latency is large, it is possible that when a new message is produced to a broker, the CreateTime has already reached the rolling criteria. This might cause a segment with only one message.
  • Pattern 3:
    Similar to pattern 2, a lagged message might result in a single message log segment.
  • Robustness:
    Similar to as pattern 2 and pattern 3. Also a CreateTime in the future might break the log rolling as well.

...


 


pattern 1pattern 2pattern 3Robustness
PreferenceCT or LATLATLATLAT