Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Under Discussion Accepted

Discussion thread: here

JIRA: KAFKA-2511

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

This KIP is trying to resolve the following important issues in Kafka:

...

Today the broker needs to decompress compressed messages, assign offsets to each message and recompress the messages again. This causes additional CPU cost.

...

This KIP is trying to avoid server side recompression.

This KIP is a distilled/improved version of an earlier discussion that we started.

Public Interfaces

We propose the following change to the message format

Code Block
languagejava
MessageAndOffset => Offset MessageSize Offset Message
  MessageSizeOffset => int32 int64 <------ CHANGE: Base offset for wrapper message of compressed message, relative offset for inner compressed message.
  OffsetMessageSize => int64int32
  
  Message => Crc MagicByte Attributes Timestamp KeyLength Key ValueLength Value
    Crc => int32
    MagicByte => int8
    Attributes => int8
    Timestamp => int64 <---------------------- NEW
    KeyLength => int32
    Key => bytes
    ValueLength => int32
    Value => bytes

  • The magic byte (int8) contains the version id of the message (which is currently zero).
  • The attribute byte (int8) holds metadata attributes about the message. The lowest two bits contain the compression codec used for the message. The other bits are currently set to zero.
  • The key / value field can be omitted if the keylength / valuelength field is set to -1.
  • For compressed message, the offset field stores the last wrapped message's offset.

Proposed Changes

Wire protocol change

...

  • The timestamp will be assigned by broker upon receiving the message. If the message is coming from mirror maker, the original time stamp will be discarded.
  • The timestamp will be used to build the log index.
  • The timestamp of the outer message of compressed messages will be the latest timestamp of all its inner messages.
    • If the compressed message is not compacted, the relative offsets of inner messages will be contiguous and share the same timestamp.
    • If the compressed message is compacted, the relative offsets of inner messages may not be contiguous. Its timestamp will be the timestamp of the last inner message.

Change the usage of offset field

  • When the producer compresses a message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.
  • When broker receives a compressed message, it only needs to
    1. Decompress the message to verify the CRC and relative offset.
      NOTE: If the relative offset is not contiguous (e.g., if this is a mirrored compacted topic), the broker will reassign the relative offsets. There are two ways to handle this - (i) reject the ProducerRequest or (ii) just assign the relative offsets. We chose to reassign offsets rather than reject the request because there is a useful use case where mirror maker can do a direct copy from source cluster to destination cluster without even decompressing the message. In this case, the compressed message can have noncontinuous relative offsets (for compacted topics).
    2. Set outer message's base offset. The outer message's base offset will be the offset of the last inner message.  (Since the broker only needs to update the message-set header, there is no need to re-compress message sets.)
  • When the log cleaner compacts log segments, it , if multiple message sets are compacted into one message set, the broker needs to update the inner message's relative offset values. (This will leave "holes" inside the new wrapped message).

Change time based log rolling and retention to use timestamp

Time based log rolling and retention currently use the file creation time and last modified time. This does not work for newly created replicas because the time attributes of files is different from the true message append time. The following changes will address this issue.

  • The time based log rolling will be based on the timestamp of the first message in the log segment file.
  • The time based log retention will be based on the timestamp of the last message in the log segment file.

New time-based log index

In order to enable timestamp based search at finer granularity, we need to add the timestamp to log indices as well.

Because all the index files are memory mapped files the main consideration here is to avoid significantly increasing the memory consumption.

The time index file needs to be built just like the log index file based on each log segment file.

Use a time index for each log segment to save the timestamp -> log offset at minute granularity

Create another index file for each log segment with name SegmentBaseOffset.time.index to have index at minute level. The time index entry format is:

 

Code Block
languagejava
Time Index Entry => Timestamp Offset
  Timestamp => int64
  Offset => int32

The time index will be built based on the log index file. Every time when a new entry is inserted into log index file, we take a look at the timestamp of the message and if it falls into next minute, we insert an entry to the time index as well. The following table give the summary of memory consumption using different granularity. The number is calculated based on a broker with 3500 partitions.

second864003.4 GB
Minute144057 MB

...

  • When the consumer receives a message, it converts the relative offset back to actual offset.

Add a message.format.version configuration to the broker

  • The message.format.version controls the message format written to disk. The value equals to the magic byte of the message format. Introducing this configuration is to avoid doing version up/down conversion for the majority of users.
  • If a consumer supports message.format.version, the broker will just use zero-copy transfer to send back the FetchResponse.
  • If a consumer does not support message.format.version, the broker will have to do down conversion and send FetchResponse without using zero-copy.

Compatibility, Deprecation, and Migration Plan

NOTE: This part is drafted based on the assumption that KIP-31 and KIP-32 will be implemented in one patch.

The proposed protocol is not backward compatible. The migration plan are as below::

Phase 1 (MessageAndOffset V0 on disk):

  1. Set message.format.version=0 on brokers. (Broker will write MessageAndOffset V0 to disk)
  2. Create internal ApiVersion 0.9.0-1** which uses ProducerRequest V2 and FetchRequest V2.
  3. Configure the broker to use ApiVersion 0.9.0 (ProduceRequest V1 and FetchRequest V1).
  4. Do a rolling upgrade of the brokers to let the broker pick up the new code supporting ApiVersion 0.9.0-1.
  5. Bump up ApiVersion of broker to 0.9.0-1. to let the broker use FetchRequest V2 for replication.
  6. Upgraded brokers support both ProducerRequest V2 and FetchRequest V2 which uses magic byte 1 for
  7. Increment magic byte in MessageAndOffset from 0 to 1.
  8. Upgrade broker to support both V0 and V1 of MessageAndOffset.
    1. When broker see sees a producer request using V1 (MessageAndOffset = V0), it will still decompress the message, assign offsets using absolute offsets and re-compress the message.
    2. When broker see sees a producer request V2 (MessageAndOffset = V1), it will decompress the message, assign offsets using absolute offsets and do re-compression.  i.e. down-convert the message format to MessageAndOffset V0.(This is no worse than what the brokers are currently doing.)
    3. When broker sees a fetch request V1 (Supporting MessageAndOffset = V0), because the data format on disk is MessageAndOffset V0, it will decompress the message for verification, assign the offset to outer message and NOT do recompression.use the zero-copy transfer to reply with fetch response V1 with MessageAndOffset V0.
    4. When broker sees a fetch request V2 (Supporting MessageAndOffset = V0, V1), because the data format on disk is MessageAndOffset V0, it will use zero-copy transfer to reply with fetch response V2 with MessageAndOffset V0.
  9. Upgrade consumer to send FetchRequest V2.
  10. Upgrade producer to send ProducerRequest V2.

Phase 2 (MessageAndOffset V1 on disk):

  1. After most of the consumers are upgraded, Bump up message.format.version=1 and rolling bounce the brokers.
  2. Upgraded brokers do the followings:
    1. When broker sees a producer request V1 (MessageAndOffset = V0), it will decompress the message, assign offsets using relative offsets and re-compress the message. i.e. up-convert the message format to MessageAndOffset V1.
    2. When broker sees a producer request V2 (MessageAndOffset = V1), it will decompress the message, assign offsets using relative offsets and NOT do re-compression.
    3. When broker sees a fetch request V1 (Supporting MessageAndOffset = V0), because the data format on disk is MessageAndOffset V1, it will NOT use the zero-copy transfer. Instead the broker will read the message from disk, down-convert them to V0 and reply using fetch response V1 with MessageAndOffset V0.
    4. When broker sees a fetch request V2 (Supporting MessageAndOffset = V0, V1), because the data format on disk is MessageAndOffset V1, it will use zero-copy transfer to reply with fetch response V2 with
  3. Upgrade consumer to support both V0 and V1.
  4. Upgrade producer to send
    1. MessageAndOffset V1.

For producer, there will be no impact.

For old consumers that cannot recognize V1, if they see the new protocol an exception will be thrown for unsupported version. (The current code in ZookeeperConsumerConnector does not validate the magic byte. This is a bug and we will fix it in a separate ticket)

For upgraded consumers, they can handle both V0 and V1.

Rejected Alternatives

 

Add a timestamp field to log index entry

The most straight forward approach to have a time index is to let the log index files have a timestamp associate with each entry.

Code Block
languagejava
Log Index Entry => Offset Position Timestamp
  Offset => int32
  Position => int32
  Timestamp => int64

In phase 1, there will be no impact for consumers.

In phase 2, there will be some performance penalty for consumers that only supports MessageAndOffset V0, because there is no zero-copy transfer.

At the beginning of phase 2, there will be some time the log segment contains both MessageAndOffset V0 and V1. The broker will always do down conversion for FetchRequest V1 and zero-copy transfer for FetchRequest V2.

** We introduce internal ApiVersion here to help the user who are running on trunk to upgrade in the future. Otherwise the interim ApiVersion between two official releases will require users to downgrade ApiVersion then upgrade.

To canary a broker

After phase 1, it is possible for user to canary a broker in phase 2 and roll back if something goes wrong. The procedure is:

  1. Set message.format.version=1 on one of the brokers (broker B).
  2. Broker B will start to act like what described in phase 2.
    1. It will sends FetchRequest V2 to other brokers for replication.
    2. It will only see ProduceRequest/FetchRequest V1 from other brokers and clietns.
  3. If something goes wrong, we can do the following to rollback:
    1. shutdown broker B
    2. nuke the data of the topics it was serving as leader before shutdown
    3. set message.format.version=0
    4. restart the broker to let the broker replicate from leaders. At this point the data on disk will be in MessageAndOffset V0.

In step 2, it is recommended to put only small amount of leaders on the broker, because at that point the broker needs to do down conversion for all the fetch requests.

Rejected Alternatives

None

 Because the index entry size become 16 bytes instead of 8 bytes. The index file size also needs to be doubled. As an example, one of the broker we have has ~3500 partitions. The index file took about 16GB memory. With this new format, the memory consumption would be 32GB.