Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • When the producer compresses a message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.
  • When broker receives a compressed message, it only needs to
    1. Decompress the message to verify the CRC and relative offset.
      NOTE: If the relative offset is not contiguous (e.g., if this is a mirrored compacted topic), the broker will reassign the relative offsets. There are two ways to handle this - (i) reject the ProducerRequest or (ii) just assign the relative offsets. We chose to reassign offsets rather than reject the request because there is a useful use case where mirror maker can do a direct copy from source cluster to destination cluster without even decompressing the message. In this case, the compressed message can have noncontinuous relative offsets (for compacted topics).
    2. Set outer message's base offset. (Since the broker only needs to update the message-set header, there is no need to re-compress message sets.)
  • When the log cleaner compacts log segments, it needs to update the inner message's relative offset values. (This will leave "holes" inside the new wrapped message).
  • When the consumer receives a message, it converts the relative offset back to actual offset.

Add a message.format.version config to broker configuration

This change is to facilitate protocol migration. Because it is unlikely all the consumer can be migrated to use the new message format at the same time. It is important for the broker to support both new and old message format.

One approach is to let the broker adapt the message format to consumer requirement. It requires the following change:

  1. Add a message.format.version configuration on broker.
  2. Bump up ProduceRequest/FetchRequest version up from V1 to V2 to indicate a new client.

It works in the following way:

  1. Producer sends message V0 or V1 to broker
  2. Broker convert the message into the version specified by message.format.version and write it to disk.
  3. When consumers send FetchRequest to broker,
    1. if the FetchRequest version is equal to message.format.version, the broker will use zero-copy transfer to send message, because the client should be able to parse the message.(Today the client will simply use the highest version to parse the response, which is )
    2. If the FetchRequest version is lower than message.format.version, the broker will not use zero-copy transfer but read the messages from disk, do the down conversion and send back to consumer.

Compatibility, Deprecation, and Migration Plan

...

The proposed protocol is not backward compatible. The migration plan are as below:

  1. Increment magic byte in MessageAndOffset from 0 to 1.Bump up ProducerRequest and FetchRequest version to V2, which uses MessageAndOffset V1.
     
  2. Upgrade broker to support both V0 and V1 of ProducerRequest and FetchRequest V2 which uses magic byte 1 for MessageAndOffset.
    1. When broker see sees a producer request using V1 (MessageAndOffset = V0), it will still decompress the message, assign offsets using relative offsets and re-compress the message. i.e. upconvert the message format to mag.
    2. When broker see sees a producer request using V2 (MessageAndOffset = V1), it will decompress the message for verification, assign the offset to outer message and NOT do recompression. 
    3. When broker sees a fetch request V1 (MessageAndOffset = V0), because the data format on disk is MessageAndOffset V1, it will no use the zero-copy transfer, but read the message to memory, do down-conversion, then send fetch response V1.
    4. When broker sees a fetch request V2 (MessageAndOffset = V1), it will use zero-copy transfer to reply with fetch response V2.
  3. Upgrade consumer to support both V0 and V1.
  4. Upgrade producer to send MessageAndOffset V1.

For producer, there will be no impact.

For old consumers that cannot recognize V1, if they see the new protocol an exception will be thrown for unsupported version. (The current code in ZookeeperConsumerConnector does not validate the magic byte. This is a bug and we will fix it in a separate ticket)consumers using MessageAndOffset V0, there will be some performance penalty because there is no zero-copy transfer.

During step 2 and step 3, the majority of the consumers may be still using consumers using MessageAndOffset V0, broker could consume more memoryFor upgraded consumers, they can handle both V0 and V1.

Rejected Alternatives

None

...