Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Force a roll of non-empty active segment if the first record is older than "max.compaction.lag.ms"  so that compaction can be done on that segment.  The time to roll an active segments is controlled by "segment.ms" today.  However, to ensure messages currently in the active segment can be compacted in time, we need to roll the active segment when either "max.compaction.lag.ms" or "segment.ms" is reached.  
    We define: 

    Info
    iconfalse
    title maximum time to roll an active segment:

    maxSegmentMs =  if (the log has "compact" enabled and  max.compaction.lag.ms" is non-zero) {min(“segment.ms”, “max.compaction.lag.ms")}
                                    else {segment.ms” }



  2. Estimate the earliest message timestamp of an un-compacted log segment. we only need to estimate earliest message timestamp for un-compacted log segments to ensure timely compaction because the deletion requests that belong to compacted segments have already been processed.

    1. for the first (earliest) log segment:  The estimated earliest timestamp is set to the timestamp of the first message if timestamp is present in the message. Otherwise, the estimated earliest timestamp is set to "segment.largestTimestamp - maxSegmentMs”  (segment.largestTimestamp is lastModified time of the log segment or max timestamp we see for the log segment.). In the later case, the actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction earlier.  

    2. from the second log segment onwards:  there are two methods to estimate the earliest message timestamp of a log segment. First method is to use the largestTimestamp (lastmodified time) of previous segment as an estimation. Second method is to use the timestamp of the first message if timestamp is present in the message.  Since getting the timestamp of a message requires additional IOs, the first method of estimation is sufficient in practice.

  3. Let log cleaner pick up all logs with estimated earliest message timestamp earlier than “now - max.compaction.lag.ms” for compaction.  
    The Rule is simple,  as long as there is an un-compacted log segment whose estimated timestamp is earlier than "max.compaction.lag.ms", the log is picked up for compaction. Otherwise, Kafka uses "min.cleanable.dirty.ratio" and "min.compaction.lag.ms"  to determine a log's eligibility for compaction as it does today.  
  4. If both log compaction and log retention are enabled for the topic partition, delete records that are earlier than "max.compaction.lag.ms" and retention time. 
    If compaction and time based retention are both enabled on a topic, the compaction might prevent records from being deleted on time. The reason is when compacting multiple segments into one single segment, the newly created segment will have same lastmodified timestamp as latest original segment. We lose the timestamp of all original segments except the last one. As a result, records might not be deleted as it should be through time based retention.  Therefore,  we need to explicitly delete those expired records.  This deletion only applies to message records that have timestamp. 
  5. Add two Metrics: 
    1) numCompactionLogsByMaxLagkafka.log:type=LogCleaner,name=num-logs-compacted-by-max-compaction-lag
    type: gauge
    value: the total number of  logs which needs to be immediately compacted as determined by max.compaction.lag; and 2) maxDelayAfterCompactionLag 
     2) kafka.log:type=LogCleaner,name=max-delay-after-max-compaction-lag:
    type: gauge
    value: the maximum value of "now - earliest_timestamp_of_uncompacted_segment - max.compaction.lag.ms" among all segments that needs to be compacted.

...

  • Kafka already collects compaction metrics (CleanerStats) that include how many bytes that are read/written during each compaction run and how long does it take to compact a log partition. Those metrics can be used to measure the performance impact when adapting this KIP.  In addition, if a log partition already gets compacted once per day before this KIP,  setting the log compaction time interval to more than one day should have little impact on the amount of resource spent on compaction since the existing log compaction configuration (e.g., min dirty ratio) will trigger log compaction before "max.compaction.lag.ms".   The added metric "numCompactionLogsByMaxLagnum-logs-compacted-by-max-compaction-lag" can be used to determine how many log partitions are actually determined by "max.compaction.lag.ms" to be compacted.  The log4j log generated by "recordStats" includes information about how many bytes are actually read/write by log compaction, which can also be used to evaluate the compaction workload before/after this KIP. 

...