Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For log compaction enabled topic, Kafka today uses min.cleanable.dirty.ratio” and "min.compaction.lag.ms" to determine what log segments it needs to pick up for compaction. "min.compaction.lag.ms" marks a log segment uncleanable until the segment is sealed rolled and remains un-compacted for the specified "lag". The detailed information can be found in KIP-58.   “min.cleanable.dirty.ratio” is used to determine the eligibility of the entire partition for log compaction. Only log partitions whose dirty ratio is higher than min.cleanable.dirty.ratio” are picked up by log cleaner for compaction.  In summary, with these two existing compaction configurations,  Kafka cannot enforce a timely log compaction.

...

  1. Force a roll of non-empty active segment if the first record is older than "max.compaction.lag.ms" (or if the creation time of active segment is older than “max.compaction.lag.ms” when record timestamp is not available) so that compaction can be done on that segment.  The time to roll an active segments is controlled by "segment.ms" today.  However, to ensure messages currently in the active segment can be compacted in time, we need to seal roll the active segment when either "max.compaction.lag.ms" or "segment.ms" is reached.  
    We define: 

    Info
    iconfalse
    title maximum time to seal roll an active segment:

    maxSegmentMs =  if (the log has "compact" enabled and  max.compaction.lag.ms" is non-zero) {min(“segment.ms”, “max.compaction.lag.ms")}
                                    else {segment.ms” }



  2. Estimate the earliest message timestamp of an un-compacted log segment. we only need to estimate earliest message timestamp for un-compacted log segments to ensure timely compaction because the deletion requests that belong to compacted segments have already been processed.

    1. for the first (earliest) log segment:  The estimated earliest timestamp is set to the timestamp of the first message if timestamp is present in the message. Otherwise, the estimated earliest timestamp is set to "segment.largestTimestamp - maxSegmentMs”  (segment.largestTimestamp is lastModified time of the log segment or max timestamp we see for the log segment.). In the later case, the actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction earlier.  

    2. from the second log segment onwards:  there are two methods to estimate the earliest message timestamp of a log segment. First method is to use the largestTimestamp (lastmodified time) of previous segment as an estimation. Second method is to use the timestamp of the first message if timestamp is present in the message.  Since getting the timestamp of a message requires additional IOs, the first method of estimation is sufficient in practice.

  3. Let log cleaner pick up all logs with estimated earliest message timestamp earlier than “now - max.compaction.lag.ms” for compaction.  
    The Rule is simple,  as long as there is an un-compacted log segment whose estimated timestamp is earlier than "max.compaction.lag.ms", the log is picked up for compaction. Otherwise, Kafka uses "min.cleanable.dirty.ratio" and "min.compaction.lag.ms"  to determine a log's eligibility for compaction as it does today.  The logs to be compacted are currently sorted based on dirty ratio. With the change, the logs are sorted based on "must clean ratio" first and then by dirty ratio.  "must clean ratio" is calculated similar as dirty ratio except only segments that are required to be compacted contribute to the "must clean ratio". More specifically, “must clean ratio” is the total size of cleanable segments whose records are older than “max.compaction.lag.ms” divided by total size (clean segment size + cleanable segment size). The reason for this compaction ordering is to prioritize compaction of the logs that are required to be cleaned by the max compaction lag rule. 

...