Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents


Status

Current state:  "Under Discussion" Accepted

Discussion thread:  [DISCUSS] KIP-354 Time-based log compaction policy

Vote thread[VOTE] KIP-354 Time-based log compaction policy

JIRA: KAFKA-7321

Pull Requestpull-6009

Motivation

Compaction enables Kafka to remove old messages that are flagged for deletion while other messages can be retained for a relatively longer time.  Today, a log segment may remain un-compacted for an unbound time since the eligibility for log compaction is determined based on dirty ratio (“min.cleanable.dirty.ratio”) and min compaction lag ("min.compaction.lag.ms") setting.  Ability to delete a record through compaction in a timely manner has become an important requirement in some use cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal Identifiable information) data within certain days (e.g., 7 days) while keeping non-PII indefinitely in compacted format.  The goal of this change is to provide a configurable maximum compaction lag that ensures a record is compacted after the specified time interval regardless of log dirty ratio.  However, log dirty ratio is still used to determine the eligibility for compaction if the maximum compaction lag has not been reached or specified. In other words, if Kafka receives a deletion request on a key (e..g, a key with null value), the corresponding log segment will be picked up for compaction after the configured time interval to remove the key.

Example

A compacted topic with user id as key and PII in the value:

.  

Example

No Format
A compacted topic with user id as key and PII in the value:


No Format
1 => {name: "John Doe", phone: "5555555"}
2 => {name: "Jane Doe", phone: "6666666"}

# to remove the phone number we can replace the value with a new message
1 => {name: "John Doe"}

# to completely delete key 1 we can send a tombstone record
1 => null

# but until compaction runs (and some other conditions are met), reading the whole topic will get all three values for key 1, and the old values are still retained on disk.

...

.


This example mentions GDPR because it is widely known, but the requirement here is to provide some guarantees around a tombstone or a new value leading to deletion of old values within a maximum time.

...

For log compaction enabled topic, Kafka today uses min.cleanable.dirty.ratio” and "min.compaction.lag.ms" to determine what log segments it needs to pick up for compaction. "min.compaction.lag.ms" marks a log segment uncleanable until the segment is rolled and remains un-compacted for the specified "lag". The detailed information can be found in KIP-58.   “min.cleanable.dirty.ratio” is used to determine the eligibility of the entire partition for log compaction. Only In addition, only log partitions whose dirty ratio is higher larger than min.cleanable.dirty.ratio” are picked up by log cleaner for compaction.  In summary, with these two existing compaction configurations,  Kafka Kafka cannot enforce a timely log compactionmaximum lag on compacting an un-compacted message record.

Proposed Changes

We propose adding a new topic level configuration: “max.compaction.lag.ms”, which  which controls the max time interval a message/segment can be skipped for log lag after which a record is required to be picked up for compaction (note that this lag interval includes the time the message record resides in an active segment).  With this configuration and compaction enabled, log cleaner is required to pick up all log segments that contain messages older than “max.compaction.lag.ms” for compaction. A message record has a guaranteed upper-bound in time to become mandatory for compaction despite min cleanable dirty ratio. The clock starts  The lag is measured starting from when the message record is appended to an active segment.  Since we reply on message timestamp to tell when a should be compacted, the feature provided in this KIP depends on the availability of message timestamp. 

Here are a list of changes to enforce such a max compaction lag:

  1. Force 1. Force a roll of non-empty active segment if the first record is older than "max.compaction.lag.ms"  so that compaction can be done on that segment.  The time to roll an active segments is controlled by "segment.ms" today.  However, to ensure messages currently in the active segment can be compacted in time, we need to roll the active segment when either "max.compaction.lag.ms" or "segment.ms" is reached.  
    We define: 

    Info
    iconfalse
    title maximum time to roll an active segment:

    maxSegmentMs =  if (the log has "compact" enabled and  max.compaction.lag.ms" is non-zero) {min(“segment.ms”, “max.compaction.lag.ms")}
                                    else {segment.ms” }


  2. Estimate the earliest message timestamp of an un-compacted log segment. we only need to estimate earliest message timestamp for un-compacted log segments to ensure timely compaction because the deletion requests that belong to compacted segments have already been processed.

    for the first (earliest) log segment:  The estimated earliest timestamp

    The estimated earliest timestamp of a log segment is set to the timestamp of the first message.  The message

    if timestamp is present in the message. Otherwise, the estimated earliest timestamp is set to "segment.largestTimestamp - maxSegmentMs”  (segment.largestTimestamp is lastModified time of the log segment or max timestamp we see for the log segment.). In the later case, the actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction earlier.  from the second log segment onwards:  there are two methods to estimate the earliest message timestamp of a log segment. First method is to use the largestTimestamp (lastmodified time) of previous segment as an estimation. Second method is to use the timestamp of the first message if timestamp is present in the message.  Since getting the timestamp of a message requires additional IOs, the first method of estimation is sufficient in practice.

    timestamp can be out of ordered in a log segment. However, when combining max.compaction.lag.ms" with "log.message.timestamp.difference.max.ms",  Kafka can provide actual guarantee that a new record will be eligible for log compaction after a bounded time as determined by max.compaction.lag.ms" and "log.message.timestamp.difference.max.ms"[1]. 

  3. Let log cleaner pick up all logs with estimated earliest message timestamp earlier than “now - max.compaction.lag.ms” logs that have reached max compaction lag for compaction.  
    The Rule is simple,  as long as there is an For any given log partition with compaction enabled, as long as the estimated earliest message timestamp of first un-compacted log segment whose estimated timestamp is earlier than "max.compaction.lag.ms", the log is picked up for compaction. Otherwise, Kafka uses "min.cleanable.dirty.ratio" and "min.compaction.lag.ms"  to to determine a the log's eligibility for compaction as it does today.  
    If both log compaction and log retention are enabled for the topic partition, delete records that are earlier than
  4. Add one Metric to track the max compaction delay (as described in the next section)

Public Interfaces

  • Adding topic level configuration "max.compaction.lag.ms

...

  • ",  and corresponding broker configuration "log.cleaner.max.compaction.lag.ms", which is set to MAX_LONG by default. Kafka validates "max.compaction.lag.ms" is no less than "min.compaction.lag.ms".  A record may remain un-compacted for this max lag, after which the corresponding log partition becomes eligible for log compaction. This configuration only applies to topics that have compaction enabled. 

  • Add the following metric:  

    1

...

  • ) kafka.log:type=LogCleaner,name=

...

  • max-compaction-delay-

...

  • secs
    type: gauge
    value:

...

...

Public Interfaces

  • Adding topic level configuration "max.compaction.lag.ms",  and corresponding broker configuration "log.cleaner., 0)/1000 
    This value is calculated across all compact-able partitions, where the max.compaction.lag.ms", which is set to MAX_LONG by default.  If both "max.compaction.lag.ms" and "min.compaction.lag.ms" are provided in topic creation, Kafka enforces "max.compaction.lag.ms" is no less than "min.compaction.lag.ms".  This configuration only applies to topics that have compaction enabled.  can be overridden on per-topic basis.

Compatibility, Deprecation, and Migration Plan

  • By default "max.compaction.lag.ms" is set to MAX_LONG and this timely compaction max compaction lag rule will not lead to additional log compaction.  There are no compatibility issues and no migration is required. 

Performance impact

  • Kafka already collects compaction metrics (CleanerStats) that include how many bytes that are read/written during each compaction run and how long does it take to compact a log partition. Those metrics can be used to measure the performance impact when adapting this KIP.  In addition, if If a log partition already gets compacted once per day before this KIP,  setting the log compaction time interval to more than one day should have little impact on the amount of resource spent on compaction since the existing log compaction configuration (e.g., min dirty ratio) will trigger log compaction before "max.compaction.lag.ms".   The added metric "num-logs-compacted-by-max-compaction-delay-lagsecs" can be used to determine how many log whether there are some partitions are actually determined by "max.compaction.lag.ms" to be compacted.  The log4j log generated by "recordStats" includes information about how many bytes are actually read/write by log compaction, which can also be used to evaluate the compaction workload before/after this KIP.   

Rejected Alternatives

  • One way to force compaction on any cleanable log segment is setting “min.cleanable.dirty.ratio” to 0. However, compacting a log partition whenever a segment become cleanable (controlled by "min.compaction.lag.ms") is very expensive.  We still want to accumulate some amount of log segments before compaction is kicked out.  In addition, in order to honor the max compaction lag requirement, we also need to force a roll on active segment if the required lag has passed. So the existing configuration doesn't meet requirements to ensure a maximum compaction lag.  

  • In Item 2 of the proposed change section, if first message timestamp is not available,  we use "segment.largestTimestamp - maxSegmentMs” as an estimation of earliest timestamp. The actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction earlier.  However, since this estimation is not very accurate and may cause unnecessary compaction, we decide to make this feature depends on the availability of first message timestamp

  • In Item 2 of the proposed change section,  use the largestTimestamp of previous segment as an estimation of next segment's earliest timestamp.  Since the estimation may not be very accurate,  we decide to keep it simple and always use the first message timestamp as an estimation of a log segment's earliest message timestamp. 

[1]  Assuming a user sets "max.compaction.lag.ms" to M and "log.message.timestamp.difference.max.ms" to D, and the current time is "now",   in the worst case, the first message can have timestamp = (D + now), and the second message can have timestamp = (now - D).   This segment will become eligible for compaction at time (D+now+M).  The compaction delay for the second message is (D+M+D).  If we do have a huge timestamp shift between messages, the record is still bounded by (D+M+D) to become eligible for compaction.  In general, if we don't expect huge timestamp shift, we can rely on "max.compaction.lag.ms" alone to trigger a compaction after the max lag.