Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For log compaction enabled topic, Kafka today uses min.cleanable.dirty.ratio” and "min.compaction.lag.ms" to determine what log segments it needs to pick up for compaction. "min.compaction.lag.ms" marks a log segment uncleanable until the segment is sealed and remains un-compacted for the specified "lag". The detailed information can be found in KIP-58.   “min.cleanable.dirty.ratio” is used to determine the eligibility of the entire partition for log compaction. Only log partitions whose dirty ratio is higher than min.cleanable.dirty.ratio” are picked up by log cleaner for compaction.  In addition, how many records that can be compacted on each compaction run are limited by the size of offsetmap. In  In summary, with these two existing compaction configurations,  Kafka cannot enforce a timely log compaction.

...

Here are a list of changes to enforce such a max compaction lag:

  1. Force a roll of non-empty active segment if the first record is older than "max.compaction.lag.ms" (or if the creation time of active segment is older than “max.compaction.lag.ms” when record timestamp is not available) so that compaction can be done on that segment.  The time to roll an active segments is controlled by "segment.ms" today.  However, to ensure messages currently in the active segment can be compacted in time, we need to seal the active segment when either "max.compaction.lag.ms" or "segment.ms" is reached.  
    We define the maximum time to seal an active segment : 

    Info
    iconfalse
    title maximum time to seal an active segment

    maxSegmentMs =  if (the log has "compact" enabled and  max.compaction.lag.ms" is non-zero) {min(“segment.ms”, “max.compaction.lag.ms")}
                                    else {segment.ms” }



  2. Estimate the earliest message timestamp of an un-compacted log segment. we only need to estimate earliest message timestamp for un-compacted log segments to ensure timely compaction because the deletion requests that belong to compacted segments have already been processed.

    1. for the first (earliest) log segment:  The estimated earliest timestamp is set to the timestamp of the first message if timestamp is present in the message. Otherwise, the estimated earliest timestamp is set to "segment.largestTimestamp - min(“segment.ms”, “max.compaction.lag.ms") maxSegmentMs”  (segment.largestTimestamp is lastModified time of the log segment or max timestamp we see for the log segment.). In the later case, the actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction earlier.  

    2. from the second log segment onwards:  there are two methods to estimate the earliest message timestamp of a log segment. First method is to use the largestTimestamp (lastmodified time) of previous segment as an estimation. Second method is to use the timestamp of the first message if timestamp is present in the message.  Since getting the timestamp of a message requires additional IOs, the first method of estimation is sufficient in practice.

  3. Let log cleaner pick up all logs with estimated earliest message timestamp earlier than “now - max.compaction.lag.ms” for compaction.  
    The Rule is simple,  as long as there is an un-compacted log segment whose estimated timestamp is earlier than "max.compaction.lag.ms", the log is picked up for compaction. Otherwise, Kafka uses "min.cleanable.dirty.ratio" and "min.compaction.lag.ms"  to determine a log's eligibility for compaction as it does today. The   The logs to be compacted are currently sorted based on dirty ratio. With the change, the logs are sorted based on "must clean dirty ratio" first and then by dirty ratio.  "must clean dirty ratioclean ratio" is calculated similar as dirty ratio except only segments that are required to be compacted contribute to the "must clean dirty ratio". More specifically, “must clean dirty ratio” is the total size of cleanable segments whose records are older than “max.compaction.lag.ms” divided by total size (clean segment size + cleanable segment size). The reason for this compaction ordering is to compact prioritize compaction of the logs that are required to be cleaned by the max compaction rule first. 

Public Interfaces

  • Adding topic level configuration "max.compaction.lag.ms",  and corresponding broker configuration "log.cleaner.max.compaction.lag.ms", which is set to 0 (disabled) by default.  If both "max.compaction.lag.ms" and "min.compaction.lag.ms" are provided in topic creation, Kafka enforces "max.compaction.lag.ms" is no less than "min.compaction.lag.ms".
    -- Note that an alternative configuration is to use -1 as "disabled" and 0 as "immediate compaction". Because compaction lag is still determined based on min.compaction.lag and how long to roll an active segment,  the actual lag for compaction is undetermined if we use "0".  On the other hand, we can set "min.cleanable.dirty.ratio" to achieve the same goal.  So here we choose "0" as "disabled".

  • "segment.ms" : no change in meaning.  The active segment is forced to roll when either "max.compaction.lag.ms" or "segment.ms" (log.roll.ms and log.roll.hours) has reached.  

  • min.cleanable.dirty.ratio : no change in meaning. This ratio still bounds the maximum space wasted in the log by duplicates.  However, compaction on a partition may happen  before "min.cleanable.dirty.ratio" is reached when setting non-zero "max.compaction.lag.ms".

  • min.compaction.lag.msno change in meaning. However, when determining the eligibility of compaction, "max.compaction.lag.ms" has higher priority than "min.compaction.lag.ms".  

  • All above changes are only applicable for topics when compaction is enabled.

...