You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 24 Next »


Status

Current state"Under Discussion"

Discussion thread:  [DISCUSS] KIP-354 Time-based log compaction policy

Vote thread[VOTE] KIP-354 Time-based log compaction policy

JIRA: KAFKA-7321

Motivation

Compaction enables Kafka to remove old messages that are flagged for deletion while other messages can be retained for a relatively longer time.  Today, a log segment may remain un-compacted for an unbound time since the eligibility for log compaction is determined based on dirty ratio (“min.cleanable.dirty.ratio”) and min compaction lag ("min.compaction.lag.ms") setting.  Ability to delete a record through compaction in a timely manner has become an important requirement in some use cases (e.g., GDPR).  For example,  one use case is to delete PII (Personal Identifiable information) data within certain days (e.g., 7 days) while keeping non-PII indefinitely in compacted format.  The goal of this change is to provide a configurable maximum compaction lag that ensures a record is compacted after the specified time interval regardless of log dirty ratio.  However, log dirty ratio is still used to determine the eligibility for compaction if the maximum compaction lag has not been reached or specified. In other words, if Kafka receives a deletion request on a key (e..g, a key with null value), the corresponding log segment will be picked up for compaction after the configured time interval to remove the key.

Example

A compacted topic with user id as key and PII in the value:

1 => {name: "John Doe", phone: "5555555"}
2 => {name: "Jane Doe", phone: "6666666"}

# to remove the phone number we can replace the value with a new message
1 => {name: "John Doe"}

# to completely delete key 1 we can send a tombstone record
1 => null

# but until compaction runs (and some other conditions are met), reading the whole topic will get all three values for key 1, and the old values are still retained on disk.

If there is a requirement to guarantee a maximum time an old record can exist (for example an interpretation of GDPR PII) a new topic setting is needed, because existing configurations focus only on the minimum time it should live, or sacrifice the efficiencies gained by the dirty ratio settings.

This example mentions GDPR because it is widely known, but the requirement here is to provide some guarantees around a tombstone or a new value leading to deletion of old values within a maximum time.

Note: This Change focuses on when to compact a log segment, and it doesn’t conflict with KIP-280, which focuses on how to compact log.

Current Behavior

For log compaction enabled topic, Kafka today uses min.cleanable.dirty.ratio” and "min.compaction.lag.ms" to determine what log segments it needs to pick up for compaction. "min.compaction.lag.ms" marks a log segment uncleanable until the segment is rolled and remains un-compacted for the specified "lag". The detailed information can be found in KIP-58.   “min.cleanable.dirty.ratio” is used to determine the eligibility of the entire partition for log compaction. Only log partitions whose dirty ratio is higher than min.cleanable.dirty.ratio” are picked up by log cleaner for compaction.  In summary, with these two existing compaction configurations,  Kafka cannot enforce a timely log compaction.

Proposed Changes

We propose adding a new topic level configuration: “max.compaction.lag.ms”, which controls the max time interval a message/segment can be skipped for log compaction (note that this interval includes the time the message resides in an active segment).  With this configuration and compaction enabled, log cleaner is required to pick up all log segments that contain messages older than “max.compaction.lag.ms” for compaction. A log segment has a guaranteed upper-bound in time to become mandatory for compaction despite min cleanable dirty ratio. The clock starts when a log segment is first created as an active segment.

Here are a list of changes to enforce such a max compaction lag:

  1. Force a roll of non-empty active segment if the first record is older than "max.compaction.lag.ms" (or if the creation time of active segment is older than “max.compaction.lag.ms” when record timestamp is not available) so that compaction can be done on that segment.  The time to roll an active segments is controlled by "segment.ms" today.  However, to ensure messages currently in the active segment can be compacted in time, we need to roll the active segment when either "max.compaction.lag.ms" or "segment.ms" is reached.  
    We define: 

    maximum time to roll an active segment:

    maxSegmentMs =  if (the log has "compact" enabled and  max.compaction.lag.ms" is non-zero) {min(“segment.ms”, “max.compaction.lag.ms")}
                                    else {segment.ms” }



  2. Estimate the earliest message timestamp of an un-compacted log segment. we only need to estimate earliest message timestamp for un-compacted log segments to ensure timely compaction because the deletion requests that belong to compacted segments have already been processed.

    1. for the first (earliest) log segment:  The estimated earliest timestamp is set to the timestamp of the first message if timestamp is present in the message. Otherwise, the estimated earliest timestamp is set to "segment.largestTimestamp - maxSegmentMs”  (segment.largestTimestamp is lastModified time of the log segment or max timestamp we see for the log segment.). In the later case, the actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction earlier.  

    2. from the second log segment onwards:  there are two methods to estimate the earliest message timestamp of a log segment. First method is to use the largestTimestamp (lastmodified time) of previous segment as an estimation. Second method is to use the timestamp of the first message if timestamp is present in the message.  Since getting the timestamp of a message requires additional IOs, the first method of estimation is sufficient in practice.

  3. Let log cleaner pick up all logs with estimated earliest message timestamp earlier than “now - max.compaction.lag.ms” for compaction.  
    The Rule is simple,  as long as there is an un-compacted log segment whose estimated timestamp is earlier than "max.compaction.lag.ms", the log is picked up for compaction. Otherwise, Kafka uses "min.cleanable.dirty.ratio" and "min.compaction.lag.ms"  to determine a log's eligibility for compaction as it does today.  The logs to be compacted are currently sorted based on dirty ratio. With the change, the logs are sorted based on "must clean ratio" first and then by dirty ratio.  "must clean ratio" is calculated similar as dirty ratio except only segments that are required to be compacted contribute to the "must clean ratio". More specifically, “must clean ratio” is the total size of cleanable segments whose records are older than “max.compaction.lag.ms” divided by total size (clean segment size + cleanable segment size). The reason for this compaction ordering is to prioritize compaction of the logs that are required to be cleaned by the max compaction lag rule. 

Public Interfaces

  • Adding topic level configuration "max.compaction.lag.ms",  and corresponding broker configuration "log.cleaner.max.compaction.lag.ms", which is set to 0 (disabled) by default.  If both "max.compaction.lag.ms" and "min.compaction.lag.ms" are provided in topic creation, Kafka enforces "max.compaction.lag.ms" is no less than "min.compaction.lag.ms".
    -- Note that an alternative configuration is to use -1 as "disabled" and 0 as "immediate compaction". Because compaction lag is still determined based on min.compaction.lag and how long to roll an active segment,  the actual lag for compaction is undetermined if we use "0".  On the other hand, we can set "min.cleanable.dirty.ratio" to achieve the same goal.  So here we choose "0" as "disabled".

  • "segment.ms" : no change in meaning.  The active segment is forced to roll when either "max.compaction.lag.ms" or "segment.ms" (log.roll.ms and log.roll.hours) has reached.  

  • min.cleanable.dirty.ratio : no change in meaning. This ratio still bounds the maximum space wasted in the log by duplicates.  However, compaction on a partition may happen  before "min.cleanable.dirty.ratio" is reached when setting non-zero "max.compaction.lag.ms".

  • min.compaction.lag.msno change in meaning. However, when determining the eligibility of compaction, "max.compaction.lag.ms" has higher priority than "min.compaction.lag.ms".  

  • All above changes are only applicable for topics when compaction is enabled.

Compatibility, Deprecation, and Migration Plan

  • By default "max.compaction.lag.ms" is set to 0 and this timely compaction rule is disabled.  There are no compatibility issues and no migration is required. 

Performance impact

  • Kafka already collects compaction metrics (CleanerStats) that include how many bytes that are read/written during each compaction run and how long does it take to compact a log partition. Those metrics can be used to measure the performance impact when adapting this KIP.  For example, if most log partitions get compacted each day before this KIP,  setting the log compaction time interval to more than one day should have little impact on the amount of resource spent on compaction.

Rejected Alternatives

  • One way to force compaction on any cleanable log segment is setting “min.cleanable.dirty.ratio” to 0. However, compacting a log partition whenever a segment become cleanable (controlled by "min.compaction.lag.ms") is very expensive.  We still want to accumulate some amount of log segments before compaction is kicked out.

  • If compaction and time based retention are both enabled on a topic, the compaction might prevent records from being deleted on time.  The reason is when compacting multiple segments into one single segment, the newly created segment will have same lastmodified timestamp as latest original segment. We lose the timestamp of all original segments except the last one. As a result, records might not be deleted as it should be through time based retention.  We decide not to address this issue in this KIP because 1) in practice it is very unlikely that compaction will keep preventing retention from deleting a specific record indefinitely.  2) we don't have obvious use cases that users must enable both time based retention and log compaction. Addressing this issue can be kept as a future work.  One solution is during log compaction, looking into record timestamp to delete expired records. This can be done in compaction logic itself or use AdminClient.deleteRecords() . But this solution assumes we have record timestamp.  Further investigation is needed if we have to deal with on-time retention on log compacted topic. 



  • No labels