Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. 1. Force a roll of non-empty active segment if the first record is older than "max.compaction.lag.ms"  so that compaction can be done on that segment.  The time to roll an active segments is controlled by "segment.ms" today.  However, to ensure messages currently in the active segment can be compacted in time, we need to roll the active segment when either "max.compaction.lag.ms" or "segment.ms" is reached.  
    We define: 

    Info
    iconfalse
    title maximum time to roll an active segment:

    maxSegmentMs =  if (the log has "compact" enabled) {min(“segment.ms”, “max.compaction.lag.ms")}
                                    else {segment.ms” }


  2. Estimate the earliest message timestamp of an un-compacted log segment. we only need to estimate earliest message timestamp for un-compacted log segments to ensure timely compaction because the deletion requests that belong to compacted segments have already been processed.

    1. for the first (earliest) log segment:  The estimated earliest timestamp is set to the timestamp of the first message.  The feature provided in this KIP depends on the availability of message timestamp (see rejected alternatives section for alternate solutions). 

    2. from the second log segment onwards:  we can use the largestTimestamp (lastmodified time) of previous segment as an estimation since getting the earliest timestamp of a message requires additional IOs.

  3. Let log cleaner pick up logs that have reached max compaction lag for compaction.  
    For any given log partition with compaction enabled, as long as the estimated earliest message timestamp of first un-compacted segment is earlier than "max.compaction.lag.ms", the log is picked up for compaction. Otherwise, Kafka uses "min.cleanable.dirty.ratio" and "min.compaction.lag.ms" to determine the log's eligibility for compaction as it does today.  

  4. Add one Metric to track the max compaction delay (as described in the next section)

Public Interfaces

  • Adding topic level configuration "max.compaction.lag.ms",  and corresponding broker configuration "log.cleaner.max.compaction.lag.ms", which is set to MAX_LONG by default. Kafka validates "max.compaction.lag.ms" is no less than "min.compaction.lag.ms".  A record may remain un-compacted for this max lag, after which the corresponding log partition becomes eligible for log compaction. This configuration only applies to topics that have compaction enabled. 

  • Add the following metric:  

    1) kafka.log:type=LogCleaner,name=max-compaction-delay
    type: gauge
    value: Math.max(now - earliest_timestamp_of_uncompacted_segment - max.compaction.lag.ms, 0)
    This metric tells the max compaction delay after the time when a log is required for compaction as determined based on max.compaction.lag.ms and the time when the compaction is done for the log. 

Compatibility, Deprecation, and Migration Plan

...