Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  1. Force a roll of non-empty active segment if the first record is older than "max.compaction.lag.ms" (or if the creation time of active segment is older than “max.compaction.lag.ms” when record timestamp is not available) so that compaction can be done on that segment.  The time to roll an active segments is controlled by "segment.ms" today.  However, to ensure messages currently in the active segment can be compacted in time, we need to seal the active segment when either "max.compaction.lag.ms" or "segment.ms" is reached.

  2. Estimate the earliest message timestamp of an un-compacted log segment.

    1. for the first (earliest) log segment:  The estimated earliest timestamp is set to the timestamp of the first message if timestamp is present in the message. Otherwise, the estimated earliest timestamp is set to "segment.largestTimestamp - min(“segment.ms”, “max.compaction.lag.ms")”  (segment.largestTimestamp is lastModified time of the log segment or max timestamp we see for the log segment. Due to the lack of record timestamp, segment.largestTimestamp might be earlier than the actual timestamp of latest record of that segment.). In the second case, the actual timestamp of the first message might be later than the estimation, but it is safe to pick up the log for compaction sooner.  However, we only need to estimate earliest message timestamp for un-compacted log segments because the deletion requests that belong to compacted segments have already been processed.

    2. from the second log segment onwards:  there are two methods to estimate the earliest message timestamp of a log segment. First method is to use the largestTimestamp (lastmodified time) of previous segment as an estimation. Second method is to use the timestamp of the first message if timestamp is present in the message.  Since getting the timestamp of a message requires additional IOs, the first method of estimation may be sufficient in practice.

  3. Let log cleaner pick up all logs with estimated earliest timestamp earlier than “now - max.compaction.lag.ms” for compaction.  
    The Rule is simple,  as long as there is an un-compacted log segment whose estimated timestamp is earlier than "max.compaction.lag.ms", the log is picked up for compaction. Otherwise, Kafka uses "min.cleanable.dirty.ratio" and "min.compaction.lag.ms"  to determine a log's eligibility for compaction as it does today. The logs to be compacted are currently sorted based on dirty ratio. With the change, the logs are sorted based on "must clean dirty ratio" first and then by dirty ratio.  "must clean dirty ratio" is calculated similar as dirty ratio except only the logs that are required to be compacted contribute to the "must clean dirty ratio". More specifically, “must clean dirty ratio” is the total size of cleanable segments whose records are older than “max.compaction.lag.ms” divided by total size (clean segment size + cleanable segment size). The reason is to compact the logs that are required to be cleaned by this time-based policy first.

...