Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Proposed Changes

There are two three options proposed before Option 3 4 is proposed. The details of option 1 and , option 2 and Option 3 are in the Rejected Options section.

Option

...

4 - Add

...

Add CreateTime field to message

  • CreateTime

    • CreateTime will be set by the producer and will not be changed afterward.
    • CreateTime accuracy is millisecond.
    • For compressed message, the CreateTime of the wrapper message will be the CreateTime of the last compressed message (this is to be consistent with base offset in KIP-31)

Use LogAppendTime for time based usages on the broker

  1. Build time index.
  2. Honor log rolling time
  3. Honor log retention time

Compared with Option 1, while the time based log index has to be based on LogAppendTime, there is some concern about exposing the LogAppendTime (which is a broker internal concept) to user. And there will also be some per message overhead for two timestamps.

So this approach includes only CreateTime in the message format.

Wire protocol change

Change the MessageAndOffset format to:

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    CreateTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Build time based log index using LogAppendTime

The broker will still build time based index using LogAppendTime, LogAppendTime will be only in the time index file, but not in message format. i.e. not exposed to user.

When the broker will append a time index file entry for a message when:

  1. The message is the first message of a log segment.
  2. The message is the last message of a log segment.
  3. The message is the first message received in the minute.

Let replicas to also fetch log index file

Because LogAppendTime is not included in the message format. With current replication design, followers will not be able to get the LogAppendTime from leader. In order to make log retention and log rolling policy work, the LogAppendTime needs to be propagated from leader to followers.

In this option, the LogAppendTime only exist in the time index file, therefore when followers fetch data from the leader, they have to replicate the time index file as well.

There are a few requirements here:

  1. Unlike log index file, the time index file should not be rebuilt from local log when it crashes, but should always be fetched from the current leader, just the same as actual data. Otherwise we may have different time index on different replicas.
  2. To ensure the log segments are identical on both leader and followers, we should always have a time index entry for the first message in a log segment.
  3. In order to make the time based log retention work, we need the timestamp entry for the last message in a log segment.
  4. When we truncate the messages in log segment files, we need to truncate entries in the time index files as well.

To replicate the log index entry as well, we can add the log index entry to FetchResponse, so the fetch response will become

a Time field the message format with maximum allowed time difference configuration on broker.

After extended discussion over option 1, option 2 and option 3. It turns out to be very difficult to meet all the following requirements at the same time:

  1. Have only one timestamp concept in Kafka.
  2. Enforce the time based log retention / log rolling for different use cases. (Some use case needs to based on LogAppendTime while others prefer CreateTime)
  3. Protect the broker from misbehave users (e.g. appending wrong create time to messages)

Option 4 adds a configuration to the broker to allow users to decide which timestamp the want to use according to their use case.

  1. Allow user to stamp the message when produce
  2. When broker receives a message it take a look at the difference between its local time and the timestamp in the message.
    1. If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the log.
    2. If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will override the timestamp with its current local time and append the message to the log.
  3. max.message.time.difference.ms will be a per topic configuration.
  4. The indexed will be built so it has the following guarantees. 
    1. If user search by timestamp:
      1. all the messages after that timestamp will be consumed.
      2. user might see earlier messages.
    2. The log retention will take a look at the last time index entry in the time index file. Because the last entry will be the latest timestamp in the entire log segment. If that entry expires, the log segment will be deleted.
    3. The log rolling has to depend on the earliest timestamp. In this case we may need to keep a in memory timestamp only for the current active log. On recover, we will need to read the active log segment to get this timestamp of the earliest messages.
  5. The downside of this proposal are:
    1. The timestamp might not be monotonically increasing.
    2. The log retention might become non-deterministic. i.e. When a message will be deleted now depends on the timestamp of the other messages in the same log segment. And those timestamps are provided by user within a range depending on what the time difference threshold configuration is.
    3. The semantic meaning of the timestamp is vague because some of the timestamp might have been overwritten and some might not.
  6. Although the proposal has some downsides, it gives user the flexibility to use the timestamp.
    1. If the time difference threshold is set to Long.MaxValue. The timestamp in the message is equivalent to CreateTime.
    2. If the time difference threshold is set to 0. The timestamp in the message is equivalent to LogAppendTime.
    3. If the time difference threshold is between 0 and Long.MaxValue, it ensures the messages will always have a timestamp within a certain range.

The following changes are needed to implement the above proposal.

Wire protocol change - add a Time field to the message format

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Time => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Add a time field to both ProducerRecord and ConsumerRecord

  • If user specify the timestamp for a ProducerRecord, the ProducerRecord will be sent with this timestamp.
  • If user does not specify the timestamp for a ProducerRecord, the producer stamp the ProducerRecord with current time.
  • ConsumerRecord will have the timestamp of the message that were stored on broker.

Broker configuration change - add a max.message.time.difference.ms configuration to the broker

This configuration default value will be set to Long.MaxValue. i.e. by default the server will not override any user timestamp.

Build Time Index for messages based on message timestamp

Please see the details in KIP-33.

Change time based log retention and log rolling to base on the time index

Please see the details in KIP-33.

Option discussions with use cases

This section discusses how the three options work with a few use cases. Option 1 and Option 2 are in the rejected option section.

Options comparison

Use cases

Option 1

Code Block
titleFetchResponse format for replication
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet [TimeIndexEntry]]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32
  TimeIndexEntry => LogAppendTime Offset <------------- new, the time index entry in the message set, one partition might contain multiple entries. The array will always be empty if the fetch request is not from followers.
    LogAppendTime => int64
    Offset => int32

ConsumerRecord / ProducerRecord format change

  • Add a CreateTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both CreateTime to ConsumerRecord.
    • The CreateTime is useful in use cases such as stream processing

Option discussions with use cases

This section discusses how the three options work with a few use cases. Option 1 and Option 2 are in the rejected option section.

Options comparison

Use cases

Option 1

(Message contains CreateTime + LogAppendTime)

option 2

(Message contains LogAppendTime only)

option 3

(message contains CreateTime only, brokers keep LogAppendTime in log index)

option 4

(Message contains a timestamp that could be overridden by broker depending on configured time difference threshold)

Comparison
Mirror MakerBroker overrides the LAT and keep the CT as is.Broker overrides the LATBroker keep the CT as. And add index entry with LAT to the log index file.

Mirror maker will keep the consumed messages' timestamp. Those timestamp may or may not be overwritten by target cluster depending on the configuration.

Option 1 provides the most information to user. The only concern is whether we should expose LAT to user.

Option 2 loses the CreateTime information.

Option 3 have same amount information as option 1 from broker point of view. From user point of view, it does not expose the LAT.

Option 4 could be equivalent to either having CreateTime only or having LogAppendTime only depending on configuration.

Log RetentionBroker will use the LAT Log RetentionBroker will use the LAT of the last message in a segment to enforce the policy.Same as option 1.

Broker will use the LAT of the last entry in the log index file to enforce the retention policy. Because the leader is the source of truth for LAT, followers need to get the LAT from leader when they replicate the messages. That means we need to introduce a new wire protocol to fetch the time based log index file as well.

When log recovery happens, the rebuilt time index would have different LAT from the actual arrival time of the messages in the log. And the LAT in the index file will be very close, or even the same.

The broker will take a look at the last Time Index entry of the segment to decide whether to delete the log segment or not.

Option 1 and option 2 can work with existing replication design and solve the log retention issue we have now.

Option 3 solves the issue but is a bit involved because it needs to replicate the time index entry as well.

Option 4 could be equivalent to either having CreateTime only or having LogAppendTime only depending on configuration.

Log rollingBroker will use the LAT of the first message in a log segment to enforce the policy.Same as option 1.

Broker will use the LAT of the first entry in the log index file to enforce the retention policy. Similar to the log retention case, the followers needs to replicate the time index as well.

The log recovery happens, the log rolling might not be honored either.

The broker will keep an in memory earliest message timestamp in the active log segment. 

On broker startup, the broker will need to scan the messages in the active segment to find the earliest timestamp.

Option 1 and option Option 1 and option 2 solves the log rolling issue.

Option 3 solves the issue but is a bit involved because it needs to replicate the time index entry as well.

On broker start up, Option 4 needs to scan the active log segment to find the earliest timestamp in the log segment.

Stream processingApplications don't need to include the CreateTime in the payload but simply use the CreateTime field.Applications have to put CreateTime into the payload.Applications don't need to include the CreateTime in the payload but simply use the CreateTime field.Option 4 could be equivalent to either having CreateTime only or having LogAppendTime only depending on configuration.The benefit of having The benefit of having a CreateTime with each message rather than put it into payload is that application protocol can be simplified. It is convenient for the infrastructure to provide the timestamp so there is no need for each application to worry about the timestamp.
Latency measurementUser can get End2End latency and lag in time.User can get the lag in time.User can get End2End latency.Depending on the max.message.time.difference.ms configuration. User may or may not be able to find out the latency.Option 1 has most information for user.
Search message by timestamp.Detail discussion in KIP-33Detail discussion in KIP-33Detail discussion in KIP-33Detail discussion in KIP-33Detail discussion in KIP-33

Mirror maker case in detail

...

  1. Set message.format.version=0 on brokers. (Broker will write MessageAndOffset V0 to disk)
  2. Create internal ApiVersion 0.9.0-1** which uses ProducerRequest V2 and FetchRequest V2.
  3. Configure the broker to use ApiVersion 0.9.0 (ProduceRequest V1 and FetchRequest V1).
  4. Do a rolling upgrade of the brokers to let the broker pick up the new code supporting ApiVersion 0.9.0-1.
  5. Bump up ApiVersion of broker to 0.9.0-1 to let the broker use FetchRequest V2 for replication.
  6. Upgraded brokers support both ProducerRequest V2 and FetchRequest V2 which uses magic byte 1 for MessageAndOffset.
    1. When broker sees a producer request V1 (MessageAndOffset = V0), it will decompress the message, assign offsets using absolute offsets and NOT re-compress the message.
    2. When broker sees a producer request V2 (MessageAndOffset = V1), it will decompress the message, assign offsets using absolute offsets, ignore the time field and do re-compression.  i.e. down-convert the message format to MessageAndOffset V0.
    3. When broker sees a fetch request V1 (Supporting MessageAndOffset = V0), because the data format on disk is MessageAndOffset V0, it will use the zero-copy transfer to reply with fetch response V1 with MessageAndOffset V0.
    4. When broker sees a fetch request V2 (Supporting MessageAndOffset = V0, V1), because the data format on disk is MessageAndOffset V0, it will use zero-copy transfer to reply with fetch response V2 with MessageAndOffset V0.
  7. Upgrade consumer to send FetchRequest V2.
  8. Upgrade producer to send ProducerRequest V2.

...

  1. After most of the consumers are upgraded, Bump up message.format.version=1 and rolling bounce the brokers.
  2. Upgraded brokers do the followings:
    1. When broker sees a producer request V1 (MessageAndOffset = V0), it will decompress the message, assign offsets using relative offsets, fill in the time field with current server time and re-compress the message. i.e. up-convert the message format to MessageAndOffset V1.
    2. When broker sees a producer request V2 (MessageAndOffset = V1), it will decompress the message, assign offsets using relative offsets and NOT , check and maybe overwrite the time field,  and NOT do re-compression.
    3. When broker sees a fetch request V1 (Supporting MessageAndOffset = V0), because the data format on disk is MessageAndOffset V1, it will NOT use the zero-copy transfer. Instead the broker will read the message from disk, down-convert them to V0 and reply using fetch response V1 with MessageAndOffset V0.
    4. When broker sees a fetch request V2 (Supporting MessageAndOffset = V0, V1), because the data format on disk is MessageAndOffset V1, it will use zero-copy transfer to reply with fetch response V2 with MessageAndOffset V1.

...

Option 1 - Add both CreateTime and LogAppendTime to message format

Wire protocol change

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    CreateTime => int64    <---------------------- NEW
    LogAppendTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

...

In step 4, it is possible that T1 < T0 because of the time difference on two brokers. If we naively take T1 as the timestamp of m1, then the timestamp will be out of order. i.e. a message with earlier timestamp will show later in the log. To avoid this problem, at step 4, broker 1 can take max(T1, T0) as the timestamp for m1. So the timestamp in the log will not go backward. Broker 1 will be using T0 as timestamp until its own clock advances after T0.

To be more general, when a message is appended, broker (whether leader or follower) should remember the timestamp of the last appended message, if a later appended message has a timestamp earlier than the timestamp of last appended message, the timestamp of last appended message will be used.

Change time based log rolling and retention to use LogAppendTime

Time based log rolling and retention currently use the file creation time and last modified time. This does not work for newly created replicas because the time attributes of files is different from the true message append time. The following changes will address this issue.

  • The time based log rolling will be based on the timestamp of the first message in the log segment file.
  • The time based log retention will be based on the timestamp of the last message in the log segment file.

ConsumerRecord / ProducerRecord format change

  • Add a CreateTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both CreateTime and LogAppendTime to ConsumerRecord.
    • The CreateTime is useful in use cases such as stream processing
    • The LogAppendTime is useful in use cases such as log rolling and log retention.

This proposed option was rejected because:

  1. It introduces 16 bytes overhead to the message.
  2. It exposes LogAppendTime to users.

Option 2 - Adding only LogAppendTime to the message

This proposal is pretty much the same as the selected proposal, except it does not include CreateTime in the message format.

advances after T0.

To be more general, when a message is appended, broker (whether leader or follower) should remember the timestamp of the last appended message, if a later appended message has a timestamp earlier than the timestamp of last appended message, the timestamp of last appended message will be used.

Change time based log rolling and retention to use LogAppendTime

Time based log rolling and retention currently use the file creation time and last modified time. This does not work for newly created replicas because the time attributes of files is different from the true message append time. The following changes will address this issue.

  • The time based log rolling will be based on the timestamp of the first message in the log segment file.
  • The time based log retention will be based on the timestamp of the last message in the log segment file.

ConsumerRecord / ProducerRecord format change

  • Add a CreateTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both CreateTime and LogAppendTime to ConsumerRecord.
    • The CreateTime is useful in use cases such as stream processing
    • The LogAppendTime is useful in use cases such as log rolling and log retention.

This proposed option was rejected because:

  1. It introduces 16 bytes overhead to the message.
  2. It exposes LogAppendTime to users.

Option 2 - Adding only LogAppendTime to the message

This proposal is pretty much the same as the selected proposal, except it does not include CreateTime in the message format.

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    LogAppendTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

The downside of this proposal are:

  1. If the CreateTime is not in the message itself. Application needs to include the timestamp in payload. Instead of asking each application to do this, it is better to include the timestamp in the message.
  2. The broker is not able to report the latency metric. While we could let the application to get the End2End latency,  we might lose the latency for each hop in the pipeline.

Option 3 - Add CreateTime to the message and use LogAppendTime on brokers

Add CreateTime field to message

  • CreateTime

    • CreateTime will be set by the producer and will not be changed afterward.
    • CreateTime accuracy is millisecond.
    • For compressed message, the CreateTime of the wrapper message will be the CreateTime of the last compressed message (this is to be consistent with base offset in KIP-31)

Use LogAppendTime for time based usages on the broker

  1. Build time index.
  2. Honor log rolling time
  3. Honor log retention time

Compared with Option 1, while the time based log index has to be based on LogAppendTime, there is some concern about exposing the LogAppendTime (which is a broker internal concept) to user. And there will also be some per message overhead for two timestamps.

So this approach includes only CreateTime in the message format.

Wire protocol change

Change the MessageAndOffset format to:

Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    CreateTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

Build time based log index using LogAppendTime

The broker will still build time based index using LogAppendTime, LogAppendTime will be only in the time index file, but not in message format. i.e. not exposed to user.

When the broker will append a time index file entry for a message when:

  1. The message is the first message of a log segment.
  2. The message is the last message of a log segment.
  3. The message is the first message received in the minute.

Let replicas to also fetch log index file

Because LogAppendTime is not included in the message format. With current replication design, followers will not be able to get the LogAppendTime from leader. In order to make log retention and log rolling policy work, the LogAppendTime needs to be propagated from leader to followers.

In this option, the LogAppendTime only exist in the time index file, therefore when followers fetch data from the leader, they have to replicate the time index file as well.

There are a few requirements here:

  1. Unlike log index file, the time index file should not be rebuilt from local log when it crashes, but should always be fetched from the current leader, just the same as actual data. Otherwise we may have different time index on different replicas.
  2. To ensure the log segments are identical on both leader and followers, we should always have a time index entry for the first message in a log segment.
  3. In order to make the time based log retention work, we need the timestamp entry for the last message in a log segment.
  4. When we truncate the messages in log segment files, we need to truncate entries in the time index files as well.

To replicate the log index entry as well, we can add the log index entry to FetchResponse, so the fetch response will become

Code Block
titleFetchResponse format for replication
FetchResponse => [TopicName [Partition ErrorCode HighwaterMarkOffset MessageSetSize MessageSet [TimeIndexEntry]]]
  TopicName => string
  Partition => int32
  ErrorCode => int16
  HighwaterMarkOffset => int64
  MessageSetSize => int32
  TimeIndexEntry => LogAppendTime Offset
Code Block
languagejava
MessageAndOffset => MessageSize Offset Message
  MessageSize => int32
  Offset => int64
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    LogAppendTime => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes new, the time index entry in the message set, one partition might contain multiple entries. The array will always be empty if the fetch request is not from followers.
    ValueLengthLogAppendTime => int32int64
    ValueOffset => bytesint32

The downside of this proposal are:

  1. If the CreateTime is not in the message itself. Application needs to include the timestamp in payload. Instead of asking each application to do this, it is better to include the timestamp in the message.
  2. The broker is not able to report the latency metric. While we could let the application to get the End2End latency,  we might lose the latency for each hop in the pipeline.

ConsumerRecord / ProducerRecord format change

  • Add a CreateTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
  • Add both CreateTime to ConsumerRecord.
    • The CreateTime is useful in use cases such as stream processing

...