Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

However, Kafka doesn’t provide any mechanism to delete data after data is consumed by downstream jobs. It provides only time-based and size-based log retention policy, both of which are agnostic to consumer’s behavior. If we set small time-based log retention for intermediate data, the data may be deleted even before it is consumed by downstream jobs. If we set large time-based log retention, the data will take large amount of disk space for a long time. Neither solution is good for Kafka users. To address this problem, we propose to add a new admin API which can be called by user to purge data that is no longer needed.

Note that this KIP is related to and overwrites KIP-47.


Public Interfaces

1) Java API

...


- Using committed offset instead of an extra API to trigger data purge operation. Purge data if its offset is smaller than committed offset of all consumer groups that need to consume from this partition.
The This approach is discussed in KIP-68. The advantage of this approach is that it doesn't need coordination of user applications to determine when purgeDataBefore() can be called, which can be hard to do if there are multiple consumer groups interested in consuming this topic. The disadvantage of this approach is that it is less flexible than purgeDataBefore() API because it re-uses committed offset to trigger data purge operation. Also, it adds complexity to broker implementation and would be more complex to implement than the purgeDataBefore() API. An alternative approach is to implement this logic by running an external service which calls purgeDataBefore() API based on committed offset of consumer groups.

Leader sends PurgeResponse without waiting for low_watermark of all followers to increase above the cutoff offset
This approach would be simpler to implement since it doesn't require DelayedOperationPurgatory for PurgeRequest. The leader can reply to PurgeRequest faster since it doesn't need to wait for followers. However, the purgeDataBefore() API would provide weaker guarantee in this approach because the data may not be deleted if the leader crashes right after it sends PurgeResponse. It will be useful to know for sure whether the data has been deleted, e.g. when user wants to delete problematic data from upstream so that downstream application can re-consume clean data, or if user wants to delete some sensitive data.

- Purge data on only one partition by each call to purgeDataBefore(...)
This approach would make the implementation of this API simpler, and would be consistent with the existing seek(TopicPartition partition, long offset) API. The downside of this approach is that it either increases the time to purge data if the number of partitions is large, or it requires user to take extra effort to parallelize the purgeDataBefore(...). This API may take time longer than seek() for a given partition since the broker needs to wait for follower's action before responding to PurgeDataRequest. Thus we allow user to specify a map of partitions to make this API easy to use.