Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Allow user to stamp the message when produce
  2. When a leader broker receives a message
    1. If message.timestamp.type=LogAppendTime, the server will override the timestamp with its current local time and append the message to the log.
      1. If the message is a compressed message. the timestamp in the compressed message will be updated to current server time. Broker will check to make sure all the inner messages has timestamp -1. If the inner message's timestamp is not -1, the broker will overwrite it and do the recompression. We do this instead of writing current server time to each message is to avoid recompression penalty when people are using LogAppendTime.
      2. If the message is a non-compressed message, the timestamp in the message will be overwritten to current server time.
    2. If message.timestamp.type=CreateTime
      1. If the time difference is within a configurable threshold max.message.time.difference.ms, the server will accept it and append it to the log. For compressed message, server will update the timestamp in compressed message to the largest timestamp of the inner messages.
      2. If the time difference is beyond the configured threshold max.message.time.difference.ms, the server will reject the entire batch with TimestampExceededThresholdException.
  3. When a follower broker receives a message
    1. If the message is a compressed message, the timestamp in the compressed message will be used to build the time index. i.e. the timestamp in a compressed messages is always the largest timestamp of all its inner messages.
    2. If the message is a non-compressed message, the timestamp of this message will be used to build time index.
  4. When a consumer receives a message
    1. If a message is a compressed message
      1. If the inner message timestamp is not -1, that timestamp will be used.
      2. If the inner message timestamp is -1, the timestamp of the compressed message will be used as its timestamp.
    2. If a message is a non-compressed message, the timestamp of the message will be used.
  5. message.timestamp.type and max.message.time.difference.ms will be a per topic configuration.
  6. The indexed will be built so it has the following guarantees. 
    1. If user search by a timestamp:
      1. all the messages after that the searched timestamp will be consumed.
      2. user might see earlier messages.
    2. The log retention will take a look at the last time index entry in the time index file. Because the last entry will be the latest timestamp in the entire log segment. If that entry expires, the log segment will be deleted.
    3. The log rolling has to depend on the earliest timestamp. In this case we may need to keep a in memory timestamp only for the current active log. On recover, we will need to read the active log segment to get this timestamp of the earliest messages.
  7. The downside of this proposal are:
    1. The timestamp might not be monotonically increasing.
    2. The log retention might become non-deterministic. i.e. When a message will be deleted now depends on the timestamp of the other messages in the same log segment. And those timestamps are provided by user within a range depending on what the time difference threshold configuration is.
    3. The semantic meaning of the timestamp is vague because some of the timestamp might have been overwritten and some might not.
  8. Although the proposal has some downsides, it gives user the flexibility to use the timestamp.
    1. If the time difference threshold is set to Long.MaxValue. The timestamp in the message is equivalent to CreateTime.
    2. If the time difference threshold is set to 0. The timestamp in the message is equivalent to LogAppendTime.
    3. If the time difference threshold is between 0 and Long.MaxValue, it ensures the messages will always have a timestamp within a certain range.

...

  1. Set message.format.version=0.9.0 on brokers. (Broker will write MessageAndOffset V0 to disk)
  2. Create internal ApiVersion 0.9.0-1 ** which uses ProducerRequest V2 and FetchRequest V2.
  3. Configure the broker to use ApiVersion 0.9.0 (ProduceRequest V1 and FetchRequest V1).
  4. Do a rolling upgrade of the brokers to let the broker pick up the new code supporting ApiVersion 0.9.0-1.
  5. Bump up ApiVersion of broker to 0.9.0-1 to let the broker use FetchRequest V2 for replication.
  6. Upgraded brokers support both ProducerRequest V2 and FetchRequest V2 which uses magic byte 1 for MessageAndOffset.
    1. When broker sees a producer request V1 (MessageAndOffset = V0), it will decompress the message, assign offsets using absolute offsets and re-compress the message.
    2. When broker sees a producer request V2 (MessageAndOffset = V1), it will decompress the message, assign offsets using absolute offsets, ignore the time field and do re-compression.  i.e. down-convert the message format to MessageAndOffset V0.
    3. When broker sees a fetch request V1 (Supporting MessageAndOffset = V0), because the data format on disk is MessageAndOffset V0, it will use the zero-copy transfer to reply with fetch response V1 with MessageAndOffset V0.
    4. When broker sees a fetch request V2 (Supporting MessageAndOffset = V0, V1), because the data format on disk is MessageAndOffset V0, it will use zero-copy transfer to reply with fetch response V2 with MessageAndOffset V0.
  7. Upgrade consumer to send FetchRequest V2.
  8. Upgrade producer to send ProducerRequest V2.

...

  1. After most of the consumers are upgraded, Bump up message.format.version=1 0.10.0 and rolling bounce the brokers.
  2. Upgraded brokers do the followings:
    1. When broker sees a producer request V1 (MessageAndOffset = V0), it will decompress the message, assign offsets using relative offsets, fill in the time field with -1 if message.timestamp.type=CreateTime or current server time if message.timestamp.type=LogAppendTime and re-compress the message. i.e. up-convert the message format to MessageAndOffset V1.
    2. When broker sees a producer request V2 (MessageAndOffset = V1), it will decompress the message, assign offsets using relative offsets, check and maybe overwrite the time field, and NOT do re-compression.
    3. When broker sees a fetch request V1 (Supporting MessageAndOffset = V0), because the data format on disk is MessageAndOffset V1, it will NOT use the zero-copy transfer. Instead the broker will read the message from disk, down-convert them to V0 and reply using fetch response V1 with MessageAndOffset V0.
    4. When broker sees a fetch request V2 (Supporting MessageAndOffset = V0, V1), because the data format on disk is MessageAndOffset V1, it will use zero-copy transfer to reply with fetch response V2 with MessageAndOffset V1.

...

  1. Set message.format.version=1 0.10.0 on one of the brokers (broker B).
  2. Broker B will start to act like what described in phase 2.
    1. It will sends FetchRequest V2 to other brokers for replication.
    2. It will only see ProduceRequest/FetchRequest V1 from other brokers and clietns.
  3. If something goes wrong, we can do the following to rollback:
    1. shutdown broker B
    2. nuke the data of the topics it was serving as leader before shutdown
    3. set message.format.version=0.9.0
    4. restart the broker to let the broker replicate from leaders. At this point the data on disk will be in MessageAndOffset V0.

...