Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The idea of the configuration delete.retention.ms for compacted topics is to prevent an application that has read a key to not see a subsequent deletion of the key because it's physically removed too early. To solve this problem, from the latest possible time (deleteHorizonMs) that an application could have read a non tombstone key before a tombstone, we preserve that tombstone for at least delete.retention.ms and require the application to complete the reading of the tombstone by then.

deleteHorizonMs is no later than the time when the cleaner has cleaned up to the tombstone. After that time, no application can read a non-tombstone key before the tombstone because they have all been cleaned away through compaction. Since currently we don't explicitly store the time when a round of cleaning completes, deleteHorizonMs is estimated by the last modified time of the segment containing firstDirtyOffset. When merging multiple log segments into a single one, the last modified time is inherited from the last merged segment. So the last modified time of the newly merged segment
is actually not an accurate estimate of deleteHorizonMs. It could be arbitrarily before (KAFKA-4545 <https://issues.apache.org/jira/browse/>) or after (KAFKA-8522 <https://issues.apache.org/jira/browse/KAFKA-8522>). The former causes the tombstone to be deleted too early, which can cause an application to miss the deletion of a key. The latter causes the tombstone to be retained longer than needed and potentially forever.

Proposed Approaches

This issue also not only applies to tombstones, but the deletion of transaction markers as well. This KIP intends to resolve both issues.

Proposed Approach

In the version 2 format for batch headers, these are the contents that are stored in the headers: the base timestamp and the max timestamp. Each record in the batch contains a timestamp delta which is relative to the base timestamp. In other words, to get the record timestamp, you add the record delta to the base timestamp. Typically there is no reason for the base timestamp to be different from
the timestamp of the first message, but this is not necessarily an iron rule. We can still retrieve the record timestamp if the delta and the base timestamp adds up to it. So the idea is to set the base timestamp to the delete horizon and adjust the deltas accordingly.  

This might be a problem if the user are sticking with an older version of the batch header, but we can just add some documentation to note the limitation of this fix. (Most users would have migrated to v2 batch headers anyways). We can set a magic bit (as a delete horizon flag) in batch attributes to indicate if the batch header's timestamp and delta was modified. So in theory, when cleaning the log, it would look something like this:

Case 1: Normal batch

a. If the delete horizon flag is set, then retain tombstones as long as the
current time is before the horizon.
b. If no delete horizon is set, then retain tombstones and set the delete
horizon in the cleaned batch to current time +
log.cleaner.delete.retention.ms.

Case 2: Control batch

a. If the delete horizon flag is set, then retain the batch and the marker
as long as the current time is before the horizon.
b. If no delete horizon is set and there are no records remaining from the
transaction, then retain the marker and set the delete horizon in the
cleaned batch to current time + log.cleaner.delete.retention.ms.

Rejected Alternatives

Below are the rejected proposals that we have to date. These approaches can only fix the delete horizon issue for tombstones. The deletion of transaction markers however will still not be fixed. These approaches were abandoned due to their inability to fix the second issue we are trying to tackle.One way to help resolve this issue is to store the time of when the segment was last cleaned. However, there are multiple candidates for where this cleaning time should be stored. Below are the possibilities. 

Proposal 1

We could store this cleaning time in a checkpoint file (which is used to store both the end offsets of cleaned segments as well as the time at which this cleaning occurred). In this manner, when the checkpoint file is queried for the last offset checkpointed, the last cleaned time can be retrieved with it and be used to calculate the segment's delete horizon.

...

The upgrade path for this alternative is relatively simple. All we have to do is check for the existence of this new field in tombstones (and it does not exist in older versions). If it doesn't exist, then we can default back to the old system of using the last modified time to calculate the deleteHorizonMs.

Rejected Alternatives

None as of the moment.