Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state[One of "Under Discussion", "Accepted", "Rejected"]

Discussion thread: here

JIRA: here KAFKA-1981

Released: <Kafka Version> 0.10.1.0

Motivation

Currently Kafka's log compaction gives minimal control over what portion of the log remains uncompacted. There is a setting that prevents compaction until a certain dirty ratio has been reached but this does not provide any upper bound on how much of the log's head will remain uncompacted once it runs. Although the segment currently being written will never be compacted, this could leave as little as one message uncompacted.

...

One particularly relevant use case is database state replication through change data capture. This use case is specifically called out in the Kafka documentation for the compaction feature under "Database change subscription". It is convenient to produce this data in multiple topics (e.g. one per source table) and/or partitions. However, in order to be able to recreate a database state at a point of transactional consitency consistency some coordination across topics/partions is required (e.g. a separate _checkpoint_ topic with the offsets at each transaction commit). If the table topics are all independently compacting there is currently no way to be assured that any given checkpoint can be materialized as the checkpointed offset for any given topic may have been compacted such that some keys may be taking on some subsequently inserted values. (Details: https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46)

Another use case is to handle application mistakes. For example, a compacted topic could be the source of truth for certain type of data. If there is an application error, some incorrect data may be published to the topic. When compaction is triggered, those incorrect data can wipe out the last correct message associated with a key. Being able to delay the compaction based on a configurable amount of time will allow a user to preserve those last known correct messages after an application error is discovered but before compaction destroys the correct data.

Public Interfaces

This proposal includes new configurations for controlling compaction. The log cleaner can be configured retain a minimum amount of the uncompacted head of the log. This is enabled by setting one or more of the compaction lagslag:

    log.cleaner.min.compaction.lag.ms
for setting minimum message age in log.cleaner.min.compaction.lag.bytes
    log.cleaner.min.compaction.lag.messages

...

milliseconds. This has a similar per-topic configuration:
    min.compaction.lag.ms
    min.compaction.lag.bytes
    min.compaction.lag.messages

...

This lag configuration defaults to zero so that if not set, all log segments are eligible for compaction except for the last segment (i.e. the one currently being written). The active segment will not be compacted even if all of the compaction

...

 lag constraint is satisfied. This leaves unchanged the current behavior. If

...

 the compaction

...

 lag is greater than zero then compaction of the segments of the logs containing any messages that do not satisfy

...

 the lag constraint will not be compacted. In particular this allows for the example use case like: "any consumer that is no more than 1 hour behind will get every message."

Proposed Changes

Introduce an additional configurations configuration to topics that guarantee a minimum portion of the head of the log will remain uncompacted. That is, offer guarantees a guarantee that a consumer that does not lag too far behind will get every update to a compacted topic. These This can be used to set constraints on the minimum _distance_ from the topic head that will remain uncompacted, where distance is defined in terms of :

...

time since insertion (i.e. message age)

...

.

The basic behavior of the compaction ratio to trigger and prioritize compaction order will not be altered. However, the ratio's definition will be expanded to become the ratio of "compactable" to compactable plus compacted message sizes. Where compactable includes log segments that are neither the active segment nor those prohibited from being compacted because they contain messages that do not satisfy all the new lag constraintslag constraint.

The time lag guarantee can be satisfied by preventing compaction of any segment containing a message or messages within the time lag. KIP-33 - Add a time based log index provides a mechanism for this to be accurately computed for a log segment.

Compatibility, Deprecation, and Migration Plan

...

The database replication use case could be satisfied using a combination of "snapshot" and "journal" topics for each table. The journal topics could use regular time-based deletion. There would need to be some external process periodically creating new snapshots from the most recent snapshots and the journals.

While it would be straightforward to introduce other types of "lag" (e.g. aggregate message size, or message count) there were not sufficient motivating use cases to justify their inclusion at this time.