Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • When the producer compresses a message, write the relative offset value in the raw message's offset field. Leave the wrapped message's offset blank.
  • When broker receives a compressed message, it only needs to
    1. Decompress the message to verify the CRC and relative offset.
      NOTE: If the relative offset is not contiguous (e.g., if this is a mirrored compacted topic), the broker will reassign the relative offsets. There are two ways to handle this - (i) reject the ProducerRequest or (ii) just assign the relative offsets. We chose to reassign offsets rather than reject the request because there is a useful use case where mirror maker can do a direct copy from source cluster to destination cluster without even decompressing the message. In this case, the compressed message can have noncontinuous relative offsets (for compacted topics).
    2. Set outer message's base offset. The outer message's base offset will be the offset of the last inner message.  (Since the broker only needs to update the message-set header, there is no need to re-compress message sets.)
  • When the log cleaner compacts log segments, if multiple message sets are compacted into one message set, the broker needs to update the inner message's relative offset values. (This will leave "holes" inside the new wrapped message).
  • When the consumer receives a message, it converts the relative offset back to actual offset.

Add a message.format.version configuration to the broker

  • The message.format.version controls the message format written to disk. Introducing this configuration is to avoid doing version up/down conversion for majority of users.
  • If a consumer supports message.format.version, the broker will just use zero-copy transfer to send back the FetchResponse.
  • If a consumer does not support message.format.version, the broker will have to do down conversion and send FetchResponse without using zero-copy.

Compatibility, Deprecation, and Migration Plan

...

The proposed protocol is not backward compatible. The migration plan are as below::

Phase 1 (MessageAndOffset V0 on disk):

  1. Set message.format.version=0 on brokers. (Broker will write MessageAndOffset V0 to disk)Bump up ProducerRequest and FetchRequest version to V2, which uses MessageAndOffset V1.
  2. Create internal ApiVersion 0.9.0-1** which uses ProducerRequest V2 and FetchRequest V2.
  3. Configure the broker to use ApiVersion 0.9.0 (ProduceRequest V1 and FetchRequest V1).
  4. Do a rolling upgrade of the brokers to let the broker pick up the new code supporting ApiVersion 0.9.0-V1.
  5. Bump up ApiVersion of broker to 0.9.0-1
  6. Do a rolling bounce of the brokers to let the broker use FetchRequest V2 for replication.
  7. Bump up ProducerRequest and FetchRequest version to V2, which supports both MessageAndOffset V0 and V1.
  8. Upgraded brokers support both ProducerRequest V2 and FetchRequest V2 which uses magic byte 1 for MessageAndOffset.
    1. When broker sees a producer request V1 (MessageAndOffset = V0), it will decompress the message, assign offsets using relative definitive offsets and NOT re-compress the message. i. e. upconvert the message format to MessageAndOffset V1.
    2. When broker sees a producer request V2 (MessageAndOffset = V1), it will decompress the message for verification, assign the offset to outer message and NOT do recompression. offsets using definitive offsets and do re-compression.  i.e. downconvert the message format to MessageAndOffset V0.
    3. When broker sees a fetch request V1 (Supporting MessageAndOffset = V0), because the data format on disk is MessageAndOffset V1V0, it will not use the zero-copy transfer , but read the message to memory, do down-conversion, then send to reply with fetch response V1 with MessageAndOffset V0.
    4. When broker sees a fetch request V2 (Supporting MessageAndOffset = V0, V1), because the data format on disk is MessageAndOffset V0, it will use zero-copy transfer to reply with fetch response V2 with MessageAndOffset V0.
  9. Upgrade consumer to send FetchRequest V2.
  10. Upgrade producer to send ProducerRequest V2.

Phase 2 (MessageAndOffset V1 on disk):

  1. After most of the consumers are upgraded, Bump up message.format.version=1 and rolling bounce the brokers.
  2. Upgraded brokers do the followings:
    1. When broker sees a producer request V1 (MessageAndOffset = V0), it will decompress the message, assign offsets using definitive offsets and NOT re-compress the message.
    2. When broker sees a producer request V2 (MessageAndOffset = V1), it will decompress the message, assign offsets using definitive offsets and do re-compression.  i.e. downconvert the message format to MessageAndOffset V0.
    3. When broker sees a fetch request V1 (Supporting MessageAndOffset = V0), because the data format on disk is MessageAndOffset V0, it will use the zero-copy transfer to reply with fetch response V1 with MessageAndOffset V0.
    4. When broker sees a fetch request V2 (Supporting MessageAndOffset = V0, V1), because the data format on disk is MessageAndOffset V0, it will use zero-copy transfer to reply with fetch response V2 with MessageAndOffset V0.

For producer, there will be no impact.

In phase 1, there will be no impact for consumers.

For consumers using In phase 2, for consumers that only supports MessageAndOffset V0, there will be some performance penalty because there is no zero-copy transfer.

During step 7 and step 8, the majority of the consumers may be still using consumers using MessageAndOffset V0, broker could consume more memoryAt the beginning of phase 2, there will be some time the log segment contains both MessageAndOffset V0 and V1. The broker will always do down conversion for FetchRequest V1 and zero-copy transfer for FetchRequest V2.

** We introduce internal ApiVersion here to help the user who are running on trunk to upgrade in the future. Otherwise the interim ApiVersion between two official releases will require users to downgrade ApiVersion then upgrade.

...