Status
Current state: Under Discussion
Discussion thread: here
JIRA: KAFKA-2511
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP is trying to resolve the following important issues in Kafka:
- The broker needs to decompress compressed messages, assign offsets to each message and recompress the messages again. This causes additional CPU cost.
- Log retention might not be honored: Log retention is currently at the log segment level, and is driven off the last modification time of a log segment. This approach does not quite work when a replica reassignment happens because the newly created log segment will effectively have its modification time reset to now.
- Searching offset by time stamp has very coarse granularity (log segment level), it also does not work well when replica is reassigned.
This KIP is a distilled/improved version of an earlier discussion that we started.
Public Interfaces
We propose the following change to the message format
MessageAndOffset => MessageSize Offset Message MessageSize => int32 Offset => int64 Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value Crc => int32 MagicByte => int8 Attributes => int8 Timestamp => int64 <---------------------- NEW KeyLength => int32 Key => bytes ValueLength => int32 Value => bytes
- The magic byte (int8) contains the version id of the message (which is currently zero).
- The attribute byte (int8) holds metadata attributes about the message. The lowest two bits contain the compression codec used for the message. The other bits are currently set to zero.
- The key / value field can be omitted if the keylength / valuelength field is set to -1.
- For compressed message, the offset field stores the last wrapped message's offset.
Proposed Changes
Wire protocol change
Add a timestamp field to message
- The timestamp will be assigned by broker upon receiving the message. If the message is coming from mirror maker, the original time stamp will be discarded. In essence, we treat timestamp as another offset-like field. If application needs a timestamp, it can be put into payload.
- The timestamp will be used to build the log index.
- The tiimestamp accuracy would be millisecond
- The timestamp of the outer message of compressed messages will be the latest timestamp of all its inner messages.
- If the compressed message is not compacted, the relative offsets of inner messages will be contiguous and share the same timestamp.
- If the compressed message is compacted, the relative offsets of inner messages may not be contiguous. Its timestamp will be the timestamp of the last inner message.
- The followers will not reassign timestamp but simply update a in memory lastAppendedTimestamp and append the message to the log.
- To handle leader migration where new leader has slower clock than old leader, all the leader should append max(lastAppendedTimestamp, currentTimeMillis) as the timestamp.
Change the usage of offset field
- When the producer compresses a message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.
- When broker receives a compressed message, it only needs to
- Decompress the message to verify the CRC and relative offset.
NOTE: If the relative offset is not contiguous (e.g., if this is a mirrored compacted topic), the broker will reassign the relative offsets. There are two ways to handle this - (i) reject the ProducerRequest or (ii) just assign the relative offsets. We chose to reassign offsets rather than reject the request because there is a useful use case where mirror maker can do a direct copy from source cluster to destination cluster without even decompressing the message. In this case, the compressed message can have noncontinuous relative offsets (for compacted topics). - Set outer message's base offset. (Since the broker only needs to update the message-set header, there is no need to re-compress message sets.)
- Decompress the message to verify the CRC and relative offset.
- When the log cleaner compacts log segments, it needs to update the inner message's relative offset values. (This will leave "holes" inside the new wrapped message).
Change time based log rolling and retention to use timestamp
Time based log rolling and retention currently use the file creation time and last modified time. This does not work for newly created replicas because the time attributes of files is different from the true message append time. The following changes will address this issue.
- The time based log rolling will be based on the timestamp of the first message in the log segment file.
- The time based log retention will be based on the timestamp of the last message in the log segment file.
New time-based log index
In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well.
Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption.
The time index file needs to be built just like the log index file based on each log segment file.
Use a time index for each log segment to save the timestamp -> log offset at minute granularity
Create another index file for each log segment with name SegmentBaseOffset.time.index to have index at minute level. The time index entry format is:
Time Index Entry => Timestamp Offset Timestamp => int64 Offset => int32
The time index granularity does not change the actual timestamp searching granularity. It only affects the time needed for searching. The way it works will be the same as offset search - find the closet timestamp and corresponding offset, then start the leaner scan over the log until find the target message. The reason we prefer minute level indexing is because timestamp based search is usually rare so it probably does not worth investing significant amount of memory in it.
The time index will be built based on the log index file. Every time when a new entry is inserted into log index file, we take a look at the timestamp of the message and if it falls into next minute, we insert an entry to the time index as well. The following table give the summary of memory consumption using different granularity. The number is calculated based on a broker with 3500 partitions.
second | 86400 | 3.4 GB |
Minute | 1440 | 57 MB |
Users don't typically need to look up offsets with seconds granularity.
Compatibility, Deprecation, and Migration Plan
The proposed protocol is not backward compatible. The migration plan are as below:
- Increment magic byte in MessageAndOffset from 0 to 1.
- Upgrade broker to support both V0 and V1 of MessageAndOffset.
- When broker see a producer request using V0, it will still decompress the message, assign offsets and re-compress.
- When broker see a producer request using V1, it will decompress the message for verification, assign the offset to outer message and NOT do recompression.
- Upgrade consumer to support both V0 and V1.
- Upgrade producer to send MessageAndOffset V1.
For producer, there will be no impact.
For old consumers that cannot recognize V1, if they see the new protocol an exception will be thrown for unsupported version. (The current code in ZookeeperConsumerConnector does not validate the magic byte. This is a bug and we will fix it in a separate ticket)
For upgraded consumers, they can handle both V0 and V1.
Rejected Alternatives
Add a timestamp field to log index entry
The most straight forward approach to have a time index is to let the log index files have a timestamp associate with each entry.
Log Index Entry => Offset Position Timestamp Offset => int32 Position => int32 Timestamp => int64
Because the index entry size become 16 bytes instead of 8 bytes. The index file size also needs to be doubled. As an example, one of the broker we have has ~3500 partitions. The index file took about 16GB memory. With this new format, the memory consumption would be 32GB.