Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

...

Note: A pull request has been created for this KIP. (See here: PR#7175)

Changes

Currently, in Kafka Core, the checkpointing system in log cleaner works like this. One checkpoint file is created per disk, and then each topic partition will have an assigned disk into which log cleaner's cleaning offsets are written. In most use cases, the number of partitions far exceed the number of disks. Thus, due to this organization, one file stores cleaning offsets from multiple topic partitions.  We wish to improve upon this system by switching to a per-partition checkpoint file policy. In this case, each topic partition will write into one designated checkpoint file (meaning no other partitions would have their cleaning offsets written into the same file). This has a number of advantages since we no longer have to store which topic partition a cleaning offset belongs to in a checkpoint file. 

Another change we wish to introduce is also store the cleaning time in addition to the offsets in the checkpoint file. This helps resolve a long-standing bug regarding the retention of tombstones in Kafka log (the cleaning time is used as a more accurate indicator of when the segment was last modified, see KAFKA-8522 for more details).  

Problem

The idea of the configuration delete.retention.ms for compacted topics is to prevent an application that has read a key to not see a subsequent deletion of the key because it's physically removed too early. To solve this problem, from the latest possible time (deleteHorizonMs) that an application could have read a non tombstone key before a tombstone, we preserve that tombstone for at least delete.retention.ms and require the application to complete the reading of the tombstone by then.

deleteHorizonMs is no later than the time when the cleaner has cleaned up to the tombstone. After that time, no application can read a non-tombstone key before the tombstone because they have all been cleaned away through compaction. Since currently we don't explicitly store the time when a round of cleaning completes, deleteHorizonMs is estimated by the last modified time of the segment containing firstDirtyOffset. When merging multiple log segments into a single one, the last modified time is inherited from the last merged segment. So the last modified time of the newly merged segment
is actually not an accurate estimate of deleteHorizonMs. It could be arbitrarily before (KAFKA-4545 <https://issues.apache.org/jira/browse/>) or after (KAFKA-8522 <https://issues.apache.org/jira/browse/KAFKA-8522>). The former causes the tombstone to be deleted too early, which can cause an application to miss the deletion of a key. The latter causes the tombstone to be retained longer than needed and potentially forever.

Proposed Approaches

One way to help resolve this issue is to store the time of when the segment was last cleaned. However, there are multiple candidates for where this cleaning time should be stored. Below are the possibilities. 

Proposal 1

We could store this cleaning time in a checkpoint file (which is used to store both the end offsets of cleaned segments as well as the time at which this cleaning occurred). In this manner, when the checkpoint file is queried for the last offset checkpointed, the last cleaned time can be retrieved with it and be used to calculate the segment's delete horizon.

Upgrade Path For Proposal 1

...

It is possible that when the user upgrades Kafka to a newer version, information could still be stored in the old checkpoint files. In which case, an upgrade path must be provided. 

When a new log cleaner instance is created, we propose to read any remaining data that is stored in the old checkpoint files and rewrite the offsets we have read into the new file system we implemented. Any operations that are then carried out will no longer use the old checkpoint file system but the new one.   

Proposal 2

Another option is to store an individual's tombstone deleteHorizonMs in an internal header field of the record. In this case, if the tombstone's offset is before the first dirty offset in a log, it would be used to determine if it should be removed whenever a log cleaner thread scans over it. This has some pluses over the first proposal because there is no longer a need to change the file system and in theory should be simpler to implement. 

The upgrade path for this alternative is relatively simple. All we have to do is check for the existence of this new field in tombstones (and it does not exist in older versions). If it doesn't exist, then we can default back to the old system of using the last modified time to calculate the deleteHorizonMs.


Rejected Alternatives

None as of the moment.