Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state:  Under Discussion Accepted

Discussion thread: here

JIRA: KAFKA-4015

Released: 0.10.1.0

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

For some usages, i.e., join windows in Kafka Streams, it is desirable to have logs that are both compacted and deleted. In these types of applications you may have windows of time with many versions of key, during the window you only want to retain the latest version of the key, however once the window has expired you would like to have the segments for the window deleted. With both compact and delete enabled retention.ms of the changelog would be set to a value greater than the retention of the window. Although old windows wont automatically be removed on expiration they will eventually be removed by the broker as the old segments expire. Kafka doesn’t currently support these semantics as compaction and deletion are exclusive.

Enabling this will also be useful in other scenarios, i.e., any ingest of data where you only care about the latest value for a particular key, but disk constraints mean you can't keep the entire keyset.


Public Interfaces

New valid option, compact_and_delete, added to the cleanup.policy configuration

Proposed Changes

Modify cleanup.policy to take a comma separated list of valid policies, i.e., cleanup.policy=compact,delete

Proposed Changes

Modify cleanup.policy to take a comma separated list of valid policies. When cleanup.policy=compact,delete is Introduce new cleanup policy, compact_and_delete. When set, both compact and delete cleanup strategies will run.

 Implementation outline

Currently for The LogCleaner.CleanerThread is currently responsible for triggering the cleaning of topics with cleanup.policy=delete, the delete is triggered from a scheduled thread in LogManager. Every interval it calls cleanupLogs which deletes segments such that it respects both the retention.size and retention.ms configs. In this approach we’d remove this scheduled thread and relevant functions in LogManager and perform the cleanup via LogCleaner.CleanerThread. We will extend this to also support cleanup.policy=compact_and_delete. In the cleanOrSleep method we’d first run compaction and then run deletion. We’d need to add some extra code to Log.scala to check if we have segments ready to be deleted and add any that are ready to the inProgress map (so we don’t get multiple threads trying to delete the same segments), run the delete operation, and then remove them from the inProgress map (this is the same as it currently works for compacted logs).

There is no change for topics with cleanup.policy=delete, i.e, the cleanup will still be scheduled via LogManager. The benefits of this approach are that it requires no further locking, all compacted topic cleaning is triggered from LogCleaner.CleanerThread and topics that are cleanup.policy=delete are not impacted. 

 

Compatibility, Deprecation, and Migration Plan
  • No impact on existing users

...

Add another Lock to Log.scala. We considered adding a ReentrantLock to Log. scala. The CleanerThread and LogManager would try and acquire this lock before attempting to clean LogSegments. We rejected this approach as there is already a fairly complex locking hierarchy in partition / replica / log / logsegment, and we’d prefer to not add another lock.

...

Introduce another config log.compact and deprecate cleanup.policy. This would have been a backward compatible change requiring a migration path for existing uses. It also introduced some awkwardness around supporting the existing usage of cleanup.policy=compact, i.e., you would also need to ensure that replication.ms and  and replication.bytes were set to -1.