Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Released: <Kafka Version>
Motivation
Currently Kafka's log compaction gives minimal control over what portion of the log remains uncompacted. There is a setting that prevents compaction until a certain dirty ratio has been reached but this does not provide any upper bound on how much of the log's head will remain uncompacted once it runs. Although the segment currently being written will never be compacted, this could leave as little as one message uncompacted.
As a result, it is impossible to be guaranteed that a consumer will get every update to a compacted topic. Even if the consumer falls behind by a single message it might get the compacted version.
One particularly relevant use case is database state replication through change data capture. This use case is specifically called out in the Kafka documentation for the compaction feature under "Database change subscription". It is convenient to produce this data in multiple topics (e.g. one per source table) and/or partitions. However, in order to be able to recreate a database state at a point of transactional consitency some coordination across topics/partions is required (e.g. a separate _checkpoint_ topic with the offsets at each transaction commit). If the table topics are all independently compacting there is currently no way to be assured that any given checkpoint can be materialized as the checkpointed offset for any given topic may have been compacted such that some keys may be taking on some subsequently inserted values. (Details: https://gist.github.com/ewasserman/f8c892c2e7a9cf26ee46)
Public Interfaces
This proposal includes new configurations for controlling compaction. The log cleaner can be configured retain a minimum amount of the uncompacted head of the log. This is enabled by setting one or more of the compaction lags:
log.cleaner.min.compaction.lag.ms
log.cleaner.min.compaction.lag.bytes
log.cleaner.min.compaction.lag.messages
for setting minimum message age in milliseconds, cumulative size in bytes of subsequent messages, and subsequent message count respectively. These have similar per-topic configurations:
min.compaction.lag.ms
min.compaction.lag.bytes
min.compaction.lag.messages
Each of these configured lags defaults to zero so that if none are set, all log segments are eligible for compaction except for the last segment (i.e. the one currently being written). The active segment will not be compacted even if all of the compaction lag constraints are satisfied. This leaves unchanged the current behavior. If one or more of the compaction lags is greater than zero then compaction of the segments of the logs containing any messages that do not satisfy all of the constraints will not be compacted. In particular this allows for the example use case like: "any consumer that is no more than 1 hour behind will get every message."
Proposed Changes
Introduce additional configurations to topics that guarantee a minimum portion of the head of the log will remain uncompacted. That is, offer guarantees that a consumer that does not lag too far behind will get every update to a compacted topic. These can be used to set constraints on the minimum _distance_ from the topic head that will remain uncompacted, where distance is defined in terms of:
- Time since insertion (i.e. message age)
- Aggregate message size
- Message count
The basic behavior of the compaction ratio to trigger and prioritize compaction order will not be altered. However, the ratio's definition will be expanded to become the ratio of "compactable" to compactable plus compacted message sizes. Where compactable includes log segments that are neither the active segment nor those prohibited from being compacted because they contain messages that do not satisfy all the new lag constraints.
Compatibility, Deprecation, and Migration Plan
The proposed change is backward compatible.
Rejected Alternatives
The database replication use case could be satisfied using a combination of "snapshot" and "journal" topics for each table. The journal topics could use regular time-based deletion. There would need to be some external process periodically creating new snapshots from the most recent snapshots and the journals.