You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 2 Next »

Status

Current stateUnder Discussion

Discussion thread: here

JIRA: here

Released: 0.10.1

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

For some usages, i.e., join windows in Kafka Streams, it is desirable to have logs that are both compacted and deleted. In these types of applications you may have windows of time with many versions of key, during the window you only want to retain the latest version of the key, however once the window has expired you would like to have the segments for the window deleted. Kafka doesn’t currently support these semantics as compaction and deletion are exclusive.


Public Interfaces

A a valid option, compact_and_delete, added to the cleanup.policy configuration

Proposed Changes

Introduce new cleanup policy, compact_and_delete. When set, both compact and delete cleanup strategies will run.

Implementation outline

Currently for topics with cleanup.policy = delete, the delete is triggered from a scheduled thread in LogManager. Every interval it calls cleanupLogs which deletes segments such that it respects both the retention.size and retention.ms configs. 

In this approach we’d remove this scheduled thread and relevant functions in LogManager and perform the cleanup via LogCleaner.CleanerThread. In the cleanOrSleep method we’d first run compaction and then run deletion. We’d need to add some extra code to Log.scala to check if we have segments ready to be deleted and add any that are ready to the inProgress map (so we don’t get multiple threads trying to delete the same segments), run the delete operation, and then remove them from the inProgress map (this is the same as it currently works for compacted logs).


Compatibility, Deprecation, and Migration Plan

  • No impact on existing users

Rejected Alternatives

Add another Lock to Log.scala. We considered adding a ReentrantLock to Log.scala. The CleanerThread and LogManager would try and acquire this lock before attempting to clean LogSegments. We rejected this approach as there is already a fairly complex locking hierarchy in partition / replica / log / logsegment, and we’d prefer to not add another lock.

Move all cleanup code to LogCleaner and use locking approach above. This is a hybrid of our proposed solution and the rejected alternative above. Rejected due to the same reason as above.

Introduce another config log.compact and deprecate cleanup.policy. This would have been a backward compatible change requiring a migration path for existing uses. It also introduced some awkwardness around supporting the existing usage of cleanup.policy=compact, i.e., you would also need to ensure that replication.ms and replication.bytes were set to -1.

  • No labels