Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state:  "Under Discussion" Accepted

Discussion thread:  [DISCUSS] KIP-354 Time-based log compaction policy

Vote thread[VOTE] KIP-354 Time-based log compaction policy

JIRA: KAFKA-7321

Pull Requestpull-6009

Motivation

Compaction enables Kafka to remove old messages that are flagged for deletion while other messages can be retained for a relatively longer time.  Today, a log segment may remain un-compacted for an unbound time since the eligibility for log compaction is determined based on dirty ratio (“min.cleanable.dirty.ratio”) and min compaction lag ("min.compaction.lag.ms") setting.  Ability to delete a record through compaction in a timely manner has become an important requirement in some use cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal Identifiable information) data within certain days (e.g., 7 days) while keeping non-PII indefinitely in compacted format.  The goal of this change is to provide a configurable maximum compaction lag that ensures a record is compacted after the specified time interval.  

...

  1. 1. Force a roll of non-empty active segment if the first record is older than "max.compaction.lag.ms"  so that compaction can be done on that segment.  The time to roll an active segments is controlled by "segment.ms" today.  However, to ensure messages currently in the active segment can be compacted in time, we need to roll the active segment when either "max.compaction.lag.ms" or "segment.ms" is reached.  
    We define: 

    Info
    iconfalse
    title maximum time to roll an active segment:

    maxSegmentMs =  if (the log has "compact" enabled) {min(“segment.ms”, “max.compaction.lag.ms")}
                                    else {segment.ms” }


  2. Estimate the earliest message timestamp of an un-compacted log segment. we only need to estimate earliest message timestamp for un-compacted log segments to ensure timely compaction because the deletion requests that belong to compacted segments have already been processed. The estimated earliest timestamp of a log segment is set to the timestamp of the first message.  The message timestamp can be out of ordered in a log segment. However, when combining max.compaction.lag.ms" with "log.message.timestamp.difference.max.ms",  Kafka can provide actual guarantee that a new record will be eligible for log compaction after a bounded time as determined by max.compaction.lag.ms" and "log.message.timestamp.difference.max.ms"[1]

  3. Let log cleaner pick up logs that have reached max compaction lag for compaction.  
    For any given log partition with compaction enabled, as long as the estimated earliest message timestamp of first un-compacted segment is earlier than "max.compaction.lag.ms", the log is picked up for compaction. Otherwise, Kafka uses "min.cleanable.dirty.ratio" and "min.compaction.lag.ms" to determine the log's eligibility for compaction as it does today.  

  4. Add one Metric to track the max compaction delay (as described in the next section)

...

  • One way to force compaction on any cleanable log segment is setting “min.cleanable.dirty.ratio” to 0. However, compacting a log partition whenever a segment become cleanable (controlled by "min.compaction.lag.ms") is very expensive.  We still want to accumulate some amount of log segments before compaction is kicked out.  In addition, in order to honor the max compaction lag requirement, we also need to force a roll on active segment if the required lag has passed. So the existing configuration doesn't meet requirements to ensure a maximum compaction lag.  

  • In Item 2 of the proposed change section, if first message timestamp is not available,  we use "segment.largestTimestamp - maxSegmentMs” as an estimation of earliest timestamp. The actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction earlier.  However, since this estimation is not very accurate and may cause unnecessary compaction, we decide to make this feature depends on the availability of first message timestamp

  • In Item 2 of the proposed change section,  use the largestTimestamp of previous segment as an estimation of next segment's earliest timestamp.  Since the estimation may not be very accurate,  we decide to keep it simple and always use the first message timestamp as an estimation of a log segment's earliest message timestamp. 

[1]  Assuming a user sets "max.compaction.lag.ms" to M and "log.message.timestamp.difference.max.ms" to D, and the current time is "now",   in the worst case, the first message can have timestamp = (D + now), and the second message can have timestamp = (now - D).   This segment will become eligible for compaction at time (D+now+M).  The compaction delay for the second message is (D+M+D).  If we do have a huge timestamp shift between messages, the record is still bounded by (D+M+D) to become eligible for compaction.  In general, if we don't expect huge timestamp shift, we can rely on "max.compaction.lag.ms" alone to trigger a compaction after the max lag