Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-2511
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP is try to address the following issue in Kafka.
- Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
- Log rolling might break for a newly created replica as well because of the same reason as (1).
Besides that, the KIP will also facilitate some use cases such as streaming processing where a timestamp is needed.
This KIP is a distilled/improved version of an earlier discussion that we started.
This KIP is preferably to be implemented with KIP-31 if possible to avoid changing wire protocol twice.
Public Interfaces
We propose the following change to the message format
MessageAndOffset => MessageSize Offset Message MessageSize => int32 Offset => int64 Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value Crc => int32 MagicByte => int8 Attributes => int8 CreateTime => int64 <---------------------- NEW LogAppendTime => int64 <---------------------- NEW KeyLength => int32 Key => bytes ValueLength => int32 Value => bytes
Proposed Changes
Wire protocol change
Add CreateTime and LogAppendTime field to message
- CreateTime
- CreateTime will be set by the producer and will not be changed afterward.
- CreateTime accuracy is millisecond.
- For compressed message, the CreateTime of the wrapper message will be the CreateTime of the first compressed message.
- LogAppendTime
- The LogAppendTime will be assigned by broker upon receiving the message. If the message is coming from mirror maker, the original CreateTime will be maintained but the LogAppendTime will be changed by the target broker.
- The LogAppendTime will be used to build the log index.
- The LogAppendTime accuracy is millisecond
- The LogAppendTime of the outer message of compressed messages will be the latest LogAppendTime of all its inner messages.
- If the compressed message is not compacted, the relative offsets of inner messages will be contiguous and share the same LogAppendTime.
- If the compressed message is compacted, the relative offsets of inner messages may not be contiguous. Its LogAppendTime will be the LogAppendTime of the last inner message.
- The followers will not reassign LogAppendTime but simply update an in memory lastAppendedLogAppendTime and append the message to the log.
- To handle leader migration where new leader has slower clock than old leader, all the leader should append max(lastAppendedTimestamp, currentTimeMillis) as the timestamp
A corner case for LogAppendTime(LAT) - Leader migration:
Suppose we have broker0 and broker1. Broker0 is the current leader of a partition and broker1 is a follower. Consider the following scenario:
- message m0 is produced to broker 0 at time T0 (LAT = T0, T0 is the clock on broker 0).
- broker1 as a follower replicated m0 and appended to its own log without changing the LogAppendTime of m0.
- broker0 went down and broker 1 become the new leader.
- message m1 is produced to broker 1 at time T1 (LAT = T1, T1 is the clock on broker 1).
In step 4, it is possible that T1 < T0 because of the time difference on two brokers. If we naively take T1 as the timestamp of m1, then the timestamp will be out of order. i.e. a message with earlier timestamp will show later in the log. To avoid this problem, at step 4, broker 1 can take max(T1, T0) as the timestamp for m1. So the timestamp in the log will not go backward. Broker 1 will be using T0 as timestamp until its own clock advances after T0.
To be more general, when a message is appended, broker (whether leader or follower) should remember the timestamp of the last appended message, if a later appended message has a timestamp earlier than the timestamp of last appended message, the timestamp of last appended message will be used.
Change time based log rolling and retention to use LogAppendTime
Time based log rolling and retention currently use the file creation time and last modified time. This does not work for newly created replicas because the time attributes of files is different from the true message append time. The following changes will address this issue.
- The time based log rolling will be based on the timestamp of the first message in the log segment file.
- The time based log retention will be based on the timestamp of the last message in the log segment file.
ConsumerRecord / ProducerRecord format change
- Add a CreateTime field to ProducerRecord. This field can be used by application to set the send time. It will also allow mirror maker to maintain the send time easily.
- Add both CreateTime and LogAppendTime to ConsumerRecord.
- The CreateTime is useful in use cases such as stream processing
- The LogAppendTime is useful in use cases such as log rolling and log retention.
Option discussions with use cases
This section discusses how the proposed and rejected option works with a few use cases.
Mirror maker
The behavior of broker for all the options are the same: The broker will always override the LogAppendTime(if exists) when message arrives the broker and keep the CreateTime(if exists) untouched.
The broker does not distinguish mirror maker from other producers. The following example explains what will the timestamp look like when there is mirror maker in the picture.(CT - CreateTime, LAT - LogAppendTime)
- Application producer produces message at T0. ( [CT = T0, LAT = -1] )
- The broker in cluster1 received message at T1 = T0 + latency1 and appended the message to the log. Latency1 includes linger.ms and some other latency. ( [CT = T0, LAT = T1] )
- Mirror maker copies the message to broker in cluster2. ( [CT = T0, LAT = T1] )
- The broker in cluster2 receives message at T2 = T1 + latency2 and appended the message to the log. ( [CT = T0, LAT = T2] )
The CreateTime of a message in source cluster and target cluster will be same. i.e. the timestamp is passed across clusters.
The LogAppendTime of a message in source cluster and target cluster will be different.
Comparison
Proposed Option (Message contains CreateTime + LogAppendTime) | rejected option 1 (Message contains LogAppendTime only) | rejected option 2 (message contains CreateTime only, brokers keep LogAppendTime in log index) | Comparison | |
---|---|---|---|---|
Mirror Maker | Broker overrides the LAT and keep the CT as is. | Broker overrides the LAT | Broker keep the CT as. And add index entry with LAT to the log index file. | Proposed option provides the most information to user. The only concern is whether we should expose LAT to user. Rejected option 1 loses the CreateTime information. Rejected option 2 have same amount information as proposed option from broker point of view. From user point of view, it does not expose the LAT. |
Log Retention | Broker will use the LAT of the last message in a segment to enforce the policy. | Same as proposed option. | Broker will use the LAT of the last entry in the log index file to enforce the retention policy. Because the leader is the source of truth for LAT, followers need to get the LAT from leader when they replicate the messages. That means we need to introduce a new wire protocol to fetch the time based log index file as well. When log recovery happens, the rebuilt time index would have different LAT from the actual arrival time of the messages in the log. And the LAT in the index file will be very close, or even the same. | Proposed option and rejected option 1 can work with existing replication design and solve the log retention issue we have now. rejected option 2 alone can not solve the problem we have now. We need additional replication protocol to solve the log retention problem. |
Log rolling | Broker will use the LAT of the first message in a log segment to enforce the policy. | Same as proposed option. | Broker will use the LAT of the first entry in the log index file to enforce the retention policy. Similar to the log retention case, the followers needs to replicate the time index as well. The log recovery happens, the log rolling might not be honored either. | Proposed option and rejected option 1 solves the log rolling issue. Rejected option 2 does not solve the problem and needs additional replication protocol. |
Stream processing | Applications don't need to include the CreateTime in the payload but simply use the CreateTime field. | Applications have to put CreateTime into the payload. | Applications don't need to include the CreateTime in the payload but simply use the CreateTime field. | The benefit of having a CreateTime with each message rather than put it into payload is that application protocol can be simplified. It is convenient for the infrastructure to provide the timestamp so there is no need for each application to worry about the timestamp. |
Latency measurement | User can get End2End latency and lag in time. | User can get the lag in time. | User can get End2End latency. | Proposed option has most information for user. |
Search message by timestamp. | Detail discussion in KIP-33 | Detail discussion in KIP-33 | Detail discussion in KIP-33 | Detail discussion in KIP-33 |
Discussion: should we use CreateTime OR LogAppendTime for log retention and time based log rolling?
To discuss the usage of CreateTime and LogAppendTime, it is useful to summarize the latency pattern of the messages flowing through the pipeline. The latency can be summarized to the following pattern:
- the messages flow through the pipeline with small latency.
- the messages flow through the pipeline with similar large latency.
- the messages flow through the pipeline with large latency difference.
Also it would be useful to think about the impact of a completely wrong timestamp set by client. i.e. the robustness of the system.
Log Retention
There are both pros and cons for log retention to be based on CreateTime or LogAppendTime.
- Pattern 1:
Because the latency is small, so CreateTime and LogAppendTime will be close and their won't be much difference. - pattern 2:
If the log retention is based on the message creation time, it will not be affected by the latency in the data pipeline because the send time will not change.
If the log retention is based on the LogAppendTime, it will be affected by the latency in the pipeline. Because of the latency difference, some data can be deleted on one cluster, but not on another cluster in the pipeline. - Pattern 3:
When the messages with significantly different timestamp goes into a cluster at around same time, the retention policy is hard to follow if we use CreateTime. For example, imagine two mirror makers copy data from two source clusters to the same target cluster. If MirrorMaker1 is copying Messages with CreateTime around 1:00 PM today, and MirrorMaker2 is copying messages with CreateTime around 1:00 PM yesterday. Those messages can go to the same log segment in the target cluster. It will be difficult for broker to apply retention policy to the log segment. The broker needs to maintain the knowledge about the latest CreateTime of all messages in a log segment and persist the information somewhere. - Robustness:
If there is a message with CreateTime set to the future, the log might be kept for very long. Broker needs to sanity check the timestamp when receive the message. It could by tricky to determine which timestamp is not valid
Comparison:
pattern 1 | pattern 2 | pattern 3 | Robustness | |
---|---|---|---|---|
Preference | CT or LAT | CT | LAT | LAT |
In reality, we usually don't see all the pipeline has same large latency, so it looks LogAppendTime is preferable than CreateTime for log retention.
Time based log rolling
The main purpose of time based log rolling is to avoid the situation where a low volume topic always has only one segment which is also the active segment. From its nature, server side log rolling makes more sense.
- Pattern 1:
Because the latency is small, so CreateTime and LogAppendTime will be close and their won't be much difference. - Pattern 2:
When the latency is large, it is possible that when a new message is produced to a broker, the CreateTime has already reached the rolling criteria. This might cause a segment with only one message. - Pattern 3:
Similar to pattern 2, a lagged message might result in a single message log segment. - Robustness:
Similar to as pattern 2 and pattern 3. Also a CreateTime in the future might break the log rolling as well.
pattern 1 | pattern 2 | pattern 3 | Robustness | |
---|---|---|---|---|
Preference | CT or LAT | LAT | LAT | LAT |
Compatibility, Deprecation, and Migration Plan
NOTE: This part is drafted based on the assumption that KIP-31 and KIP-32 will be implemented in one patch.
The proposed protocol is not backward compatible. The migration plan are as below:
- Bump up ProducerRequest and FetchRequest version to V2, which uses MessageAndOffset V1.
- Upgrade broker to support both ProducerRequest and FetchRequest V2 which uses magic byte 1 for MessageAndOffset.
- When broker sees a producer request V1 (MessageAndOffset = V0), it will decompress the message, assign offsets using relative offsets and re-compress the message, set CreateTime=-1. i.e. upconvert the message format to mag.
- When broker sees a producer request V2 (MessageAndOffset = V1), it will decompress the message for verification, assign the offset to outer message and NOT do recompression.
- When broker sees a fetch request V1 (MessageAndOffset = V0), because the data format on disk is MessageAndOffset V1, it will no use the zero-copy transfer, but read the message to memory, do down-conversion, then send fetch response V1.
- When broker sees a fetch request V2 (MessageAndOffset = V1), it will use zero-copy transfer to reply with fetch response V2.
- When broker sees a producer request V1 (MessageAndOffset = V0), it will decompress the message, assign offsets using relative offsets and re-compress the message, set CreateTime=-1. i.e. upconvert the message format to mag.
- Upgrade consumer to support both V0 and V1.
- Upgrade producer to send MessageAndOffset V1.
For producer, there will be no impact.
For consumers using MessageAndOffset V0, there will be some performance penalty because there is no zero-copy transfer.
During step 2 and step 3, the majority of the consumers may be still using consumers using MessageAndOffset V0, broker could consume more memory.
Rejected Alternatives
Option 1 - Adding only LogAppendTime to the message
This proposal is pretty much the same as the selected proposal, except it does not include CreateTime in the message format.
MessageAndOffset => MessageSize Offset Message MessageSize => int32 Offset => int64 Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value Crc => int32 MagicByte => int8 Attributes => int8 LogAppendTime => int64 <---------------------- NEW KeyLength => int32 Key => bytes ValueLength => int32 Value => bytes
The downside of this proposal are:
- If the CreateTime is not in the message itself. Application needs to include the timestamp in payload. Instead of asking each application to do this, it is better to include the timestamp in the message.
- The broker is not able to report the latency metric. While we could let the application to get the End2End latency, we might lose the latency for each hop in the pipeline.
Option 2 - Adding only CreateTime to the message
While the time based log index has to be based on LogAppendTime, there is some concern about exposing the LogAppendTime (which is a broker internal concept) to user. So this approach include the following changes:
Wire protocol change
Change the MessageAndOffset format to:
MessageAndOffset => MessageSize Offset Message MessageSize => int32 Offset => int64 Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value Crc => int32 MagicByte => int8 Attributes => int8 CreateTime => int64 <---------------------- NEW KeyLength => int32 Key => bytes ValueLength => int32 Value => bytes
Build time based log index using LogAppendTime
The broker will still build time based index using LogAppendTime, LogAppendTime will be only in the time index file, but not in message format. i.e. not exposed to user.
Change time based log retention and log rolling to use LogAppendTime in index file
The time based log retention and log rolling still needs to use LogAppendTime. Because the leader is source of truth for LogAppendTime, when followers fetch data from the leader, they have to replicate the time index file as well.
Therefore, this approach does not solve the time based log retention and log rolling issues which are the motivation of this KIP. We need to introduce separate wire protocol to propagate the log segment create time and last modified time among brokers. While it is doable, we feel the additional complication for replica fetching over weigh the concern of exposing the LogAppendTime to user, considering the LogAppendTime is still useful to clients in a few use cases.
Also, during a log recovery, the LogAppendTime in the time index will be almost the same. The LogAppendTime will be different from the actual time when the message arrives the brokers.