Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

For some usages, i.e., join windows in Kafka Streams, it is desirable to have logs that are both compacted and deleted. In these types of applications you may have windows of time with many versions of key, during the window you only want to retain the latest version of the key, however once the window has expired you would like to have the segments for the window deleted. Kafka doesn’t currently support these semantics as compaction and deletion are exclusive.


Public Interfaces

New valid option, compact_and_delete, added to the cleanup.policy configuration

Proposed Changes

Introduce new cleanup policy, compact_and_delete. When set, both compact and delete cleanup strategies will run.

 Implementation outline

Currently for topics with cleanup.policy=delete, the delete is triggered from a scheduled thread in LogManager. Every interval it calls cleanupLogs which deletes segments such that it respects both the retention.size and and retention.ms configsms configs

In this approach we’d remove this scheduled thread and relevant functions in LogManager and perform the cleanup via LogCleaner.CleanerThread. In the cleanOrSleep method we’d first run compaction and then run deletion. We’d need to add some extra code to Log.scala to check if we have segments ready to be deleted and add any that are ready to the inProgress map (so we don’t get multiple threads trying to delete the same segments), run the delete operation, and then remove them from the inProgress map (this is the same as it currently works for compacted logs).


Compatibility, Deprecation, and Migration Plan
  • No impact on existing users

...

Add another Lock to Log.scala. We considered adding a ReentrantLock to Log.scala. The CleanerThread and LogManager would try and acquire this lock before attempting to clean LogSegments. We rejected this approach as there is already a fairly complex locking hierarchy in partition / replica / log / logsegment, and we’d prefer to not add another lock.

...

Introduce another config log.compact and deprecate cleanup.policy. This would have been a backward compatible change requiring a migration path for existing uses. It also introduced some awkwardness around supporting the existing usage of cleanup.policy=compact, i.e., you would also need to ensure that replication.ms and  and replication.bytes were set to -1.