Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

We propose adding a new topic level configuration: “max.compaction.lag.ms”, which  which controls the max lag after which a record is required to be picked up for compaction (note that this lag interval includes the time the record resides in an active segment). The lag is measured starting from when the message record is appended to an active segment. 

...

  1. 1. Force a roll of non-empty active segment if the first record is older than "max.compaction.lag.ms"  so that compaction can be done on that segment.  The time to roll an active segments is controlled by "segment.ms" today.  However, to ensure messages currently in the active segment can be compacted in time, we need to roll the active segment when either "max.compaction.lag.ms" or "segment.ms" is reached.  
    We define: 

    Info
    iconfalse
    title maximum time to roll an active segment:

    maxSegmentMs =  if (the log has "compact" enabled) {min(“segment.ms”, “max.compaction.lag.ms")}
                                    else {segment.ms” }



  2. Estimate the earliest message timestamp of an un-compacted log segment. we only need to estimate earliest message timestamp for un-compacted log segments to ensure timely compaction because the deletion requests that belong to compacted segments have already been processed.

    1. for the first (earliest) log segment:  The estimated earliest timestamp is set to the timestamp of the first message if timestamp is present in the message. Otherwise, the estimated earliest timestamp is set to "segment.largestTimestamp - maxSegmentMs”  (segment.largestTimestamp is lastModified time of the log segment or max timestamp we see for the log segment.). In the later case, the actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction earlier.  .  The feature provided in this KIP depends on the availability of message timestamp (see rejected alternatives section for alternate solutions). 

    2. from the second log segment onwards:  we can use the largestTimestamp (lastmodified time) of previous segment as an estimation since getting the earliest timestamp of a message requires additional IOs.

  3. Let log cleaner pick up logs that have reached max compaction lag for compaction.  
    For any given log partition with compaction enabled, as long as the estimated earliest message timestamp of first un-compacted segment is earlier than "max.compaction.lag.ms", the log is picked up for compaction. Otherwise, Kafka uses "min.cleanable.dirty.ratio" and "min.compaction.lag.ms" to determine the log's eligibility for compaction as it does today.  
  4. Add two Metrics one Metric to track the max compaction delay (as described in the next section)


...

  • Adding topic level configuration "max.compaction.lag.ms",  and corresponding broker configuration "log.cleaner.max.compaction.lag.ms", which is set to MAX_LONG by default.  If both "max.compaction.lag.ms" and "min.compaction.lag.ms" are provided in topic creation, Kafka enforces  Kafka validates "max.compaction.lag.ms" is no less than "min.compaction.lag.ms".  A record may remain un-compacted for this max lag, after which log cleaner threads are required to pick up the corresponding log partition becomes eligible for compaction unless they are currently working on other log partitionslog compaction. This configuration only applies to topics that have compaction enabled. 

  • Add two Metricsthe following metric:  

    1) kafka.log:type=LogCleaner,name=num-logs-compacted-by-max-compaction-lagdelay
    type: gauge
    value: the number of logs which needs to be immediately compacted as determined by max.compaction.lag in the last cleaner run from each cleaner thread.
    2) kafka.log:type=LogCleaner,name=max-compaction-delay
    type: gauge
    value: the maximum value of "compaction_finish_time Math.max(now - earliest_timestamp_of_first_uncompacted_segment - max.compaction.lag.ms" in the last cleaner run from each cleaner thread., 0)
    This metric tells the max compaction delay between after the time when a log is required to be picked up for compaction for compaction as determined based on max.compaction.lag.ms and the time when the compaction is done for the log. We only collect this max-compaction-delay for log compaction that is determined by max.compaction.lag


Compatibility, Deprecation, and Migration Plan

...

  • If a log partition already gets compacted once per day before this KIP,  setting the log compaction time interval to more than one day should have little impact on the amount of resource spent on compaction since the existing log compaction configuration (e.g., min dirty ratio) will trigger log compaction before "max.compaction.lag.ms".   The added metric "num-logs-compacted-by-max-compaction-lagdelay" can be used to determine how many log whether there are some partitions are actually determined by "max.compaction.lag.ms" to be compacted.  

...

  • One way to force compaction on any cleanable log segment is setting “min.cleanable.dirty.ratio” to 0. However, compacting a log partition whenever a segment become cleanable (controlled by "min.compaction.lag.ms") is very expensive.  We still want to accumulate some amount of log segments before compaction is kicked out.  In addition, in order to honor the max compaction lag requirement, we also need to force a roll on active segment if the required lag has passed. So the existing configuration doesn't meet requirements to ensure a maximum compaction lag.  

  • In 2-a of the proposed change section, if first message timestamp is not available,  we use "segment.largestTimestamp - maxSegmentMs” as an estimation of earliest timestamp. The actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction earlier.  However, since this estimation is not very accurate and may cause unnecessary compaction, we decide to make this feature depends on the availability of first message timestamp

  • In 2-b of the proposed change section, use Use the timestamp of the first message to estimate earliest timestamp of un-compacted segments (2-b in proposed change section) .  Since getting the timestamp of a message requires additional IOs, the estimation based on previous segment's largestTimestamp is sufficient. 

...