Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The idea of the configuration delete.retention.ms for compacted topics is to prevent an application that has read a key to not see a subsequent deletion of the key because it's physically removed too early. To solve this problem, from the latest possible time (deleteHorizonMs) that an application could have read a non tombstone key before a tombstone, we preserve that tombstone for at least delete.retention.ms and require the application to complete the reading of the tombstone by then.

deleteHorizonMs is no later than the time when the cleaner has cleaned up to the tombstone. After that time, no application can read a non-tombstone key before the tombstone because they have all been cleaned away through compaction. Since currently we don't explicitly store the time when a round of cleaning completes, deleteHorizonMs is estimated by the last modified time of the segment containing firstDirtyOffset. When merging multiple log segments into a single one, the last modified time is inherited from the last merged segment. So the last modified time of the newly merged segment
is actually not an accurate estimate of deleteHorizonMs. It could be arbitrarily before (KAFKA-4545 <https://issues.apache.org/jira/browse/>) or after (KAFKA-8522 <https://issues.apache.org/jira/browse/KAFKA-8522>). The former causes the tombstone to be deleted too early, which can cause an application to miss the deletion of a key. The latter causes the tombstone to be retained longer than needed and potentially forever.

...

We could store this cleaning time in a checkpoint file (which is used to store both the end offsets of cleaned segments as well as the time at which this cleaning occurred). In this manner, when the checkpoint file is queried for the last offset checkpointed, the last cleaned time can be retrieved with it and be used to calculate the segment's delete horizon.

The old format of the checkpoint file is more compact, using one file per disk. This has a couple of implications. One file would store end offsets from multiple partitions, which meant that a typical file in the old system would look like this:

Code Block
languagebash
themeRDark
titleOld Organization
(PARTITION1, OFFSET1)
(PARTITION2, OFFSET2)
...
(PARTITIONK, OFFSETM)

In the new checkpoint file systen, we propose to have each partition be assigned their own specific checkpoint file–which means multiple files would exist on one disk (this is usually the case since the number of partitions exceed the number of disks in most use cases). Since we wish to store offset times as well, we will checkpoint this information in a tuple consisting of an end offset and its respective cleaning time. Therefore, a file would now look like the following:

Code Block
languagebash
themeRDark
titleNew File System
(END_OFFSET, CLEANING_TIME)


Upgrade Path For Proposal 1

...