Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

However, Kafka doesn’t provide any mechanism to delete data after data is consumed by downstream jobs. It provides only time-based and size-based log retention policy, both of which are agnostic to consumer’s behavior. If we set small time-based log retention for intermediate data, the data may be deleted even before it is consumed by downstream jobs. If we set large time-based log retention, the data will take large amount of disk space for a long time. Neither solution is good for Kafka users. To address this problem, we propose to add a new admin API which can be called by user to purge data delete data that is no longer needed.

...

Future<Map<TopicPartition, PurgeDataResult>> purgeDataBeforeDeleteDataResult>> deleteDataBefore(Map<TopicPartition, Long> offsetForPartition)

-  PurgeDataResult DeleteDataResult has the following two fields, which tells user if the data has been successfully purged for deleted for the corresponding partition.

PurgeDataResultDeleteDataResult(long: low_watermark, error: Exception)

2) Protocol

Create PurgeRequestDeleteRequest

 

Code Block
titlePurgeRequest
PurgeRequestDeleteRequest => topics timeout
  topics => [PurgeRequestTopicDeleteRequestTopic]
  timeout => int32
 
PurgeRequestTopicDeleteRequestTopic => topic partitions
  topic => str
  partitions => [PurgeRequestPartitionDeleteRequestPartition]
 
PurgeRequestPartitionDeleteRequestPartition => partition offset
  partition => int32
  offset => int64  // offset -1L will be translated into high_watermark of the partition when leader receives the request.

 Create PurgeResponse

Create DeleteResponse

 

Code Block
titlePurgeReponse
PurgeResponseDeleteResponse => topics
  topics => [PurgeResponseTopicDeleteResponseTopic]
 
PurgeResponseTopicDeleteResponseTopic => topic partitions
  topic => str
  partitions => [PurgeResponsePartitionDeleteResponsePartition]
 
PurgeResponsePartitionDeleteResponsePartition => partition low_watermark error_code
  partition => int32
  low_watermark => int64
  error_code => int16

 

...

We create one more checkpoint file, named "log-begin-offset-checkpoint", in every log directory. The checkpoint file will have the same format as existing checkpoint files (e.g. replication-offset-checkpoint) which map TopicPartition to Long.

4) Script

Add kafka-purgedelete-data.sh that allows user to purge data delete data in the command line. The script requires for the following arguments:

- bootstrap-server. This config is required from user. It is used to identify the Kafka cluster.
- command-config. This is an optional property file containing configs to be passed to Admin Client.
-  purgedelete-offset-json-file. This config is required from user. It allows user to specify offsets of partitions to be purgeddelete. The file has the following format:

...

The idea is to add new APIs in Admin Client (see KIP-4) that can be called by user to purge data delete data that is no longer needed. New request and response needs to be added to communicate this request between client and broker. Given the impact of this API on the data, the API should be protected by Kafka’s authorization mechanism described in KIP-11 to prevent malicious or unintended data deletion. Furthermore, we adopt the soft delete approach because it is expensive to purge data delete data in the middle of a segment. Those segments whose maximum offset < offset-to-purge can delete can be deleted safely. Brokers can increment log_start_offset of a partition to offset-to-purge delete so that data with offset < offset-to-purge delete will not be exposed to consumer even if it is still on the disk. And the log_start_offset will be checkpointed periodically similar to high_watermark to be persistent. 

Note that the way broker handles PurgeRequesthandles DeleteRequest is similar to how it handles ProduceRequest with ack = all and isr=all_live_replicas, e.g. the leader waits for all followers to catch up with its log_start_offset, doesn't expose message below log_start_offset, and checkpoints log_start_offset periodically. The low_watermark of a partition will be the minimum log_start_offset of all replicas of this partition and this value will be returned to user in PurgeResponseDeleteResponse.

Please refer to public interface section for our design of the API, request and response. In this section we will describe how broker maintains low watermark per partition, how client communicates with broker to purge old delete old data, and how this API can be protected by authorization.

...

1) User application determines the maximum offset of data that can be purged per deleted per partition. This information is provided to purgeDataBeforedeleteDataBefore() as Map<TopicPartition, Long>. If users application only knows timestamp of data that can be purged deleted per partition, they can use offsetsForTimes() API to convert the cutoff timestamp into offsetToPurge offsetToDelete per partition before providing the map to purgeDatadeleteDataBefore() API.

2) Admin Client builds PurgeRequestbuilds DeleteRequest using the offsetToPurge offsetToDelete from purgeDataBeforedeleteDataBefore() parameter and the requestTimeoutMs is taken from the AdminClient constructor. One PurgeRequestDeleteRequest is sent to each broker that acts as leader of any partition in the request. The request should only include partitions which the broker leads.

3) After receiving the PurgeRequestDeleteRequest, for each partition in the PurgeRequestDeleteRequest, the leader first sets offsetToPurge offsetToDelete to high_watermark if offsetToPurge offsetToDelete is -1L. It then sets log_start_offset of leader replica to max(log_start_offsetoffsetToPurgeoffsetToDelete) if offsetToPurge offsetToDelete <= high_watermark. Those segments whose largest offset < log_start_offset will be deleted by the leader.

4) The leader puts the PurgeRequestDeleteRequest into a DelayedOperationPurgatory. The PurgeRequestDeleteRequest can be completed when results for all partitions specified in the PurgeRequest DeleteRequest are available. The result of a partition will be available within RequestTimeoutMs and it is determined using the following logic:

  • If log_start_offset of this partition on all live followers is larger than or equal to the offsetToPurgeoffsetToDelete, the result of this partition will be its low_watermark, which is the minimum log_start_offset of all its live replicas.
  • If high_watermark of this partition is smaller than the offsetToPurgeoffsetToDelete, the result of this partition will be OffsetOutOfRangeException.
  • If the leadership of this partition moves to another broker, the result of this partition will be NotLeaderException
  • If the result of this partition is not available after RequestTimeoutMs, the result of this partition will be TimeoutException

...

8) The leader updates log_start_offset of each follower. If the PurgeRequestDeleteRequest can be completed, the leader removes the PurgeRequestDeleteRequest from DelayedOperationPurgatory and sends PurgeResponseDeleteResponse with the results (i.e. low_watermark or error) for the specified set of partitions.

9) If admin client does not receive PurgeResponseDeleteResponse from a broker within RequestTimeoutMs, the PurgeDataResultDeleteDataResult of the partitions on that broker will bePurgeDataResultDeleteDataResult(low_watermark = -1, error = TimeoutException). Otherwise, the PurgeDataResultDeleteDataResult of each partition will be constructed using the low_watermark and the errorof the corresponding partition which is read from the PurgeDataResponseDeleteDataResponse received from its leader broker. purgedeleteDataBefore(...).get() will unblock and return Map<TopicPartition, PurgeDataResult>DeleteDataResult> when PurgeDataResultDeleteDataResult of all partitions specified in the offsetForPartition param are available.

...

Given the potential damage that can be caused if this API is used by mistake, it is important that we limit its usage to only authorized users. For this matter, we can take advantage of the existing authorization framework implemented in KIP-11purgeDataBeforedeleteDataBefore() will have the same authorization setting as deleteTopic(). Its operation type is be DELETE and its resource type is TOPIC.

...


- Using committed offset instead of an extra API to trigger data purge delete operation. Purge Delete data if its offset is smaller than committed offset of all consumer groups that need to consume from this partition.
This approach is discussed in KIP-68. The advantage of this approach is that it doesn't need coordination of user applications to determine when purgeDataBeforedeleteDataBefore() can be called, which can be hard to do if there are multiple consumer groups interested in consuming this topic. The disadvantage of this approach is that it is less flexible than purgeDataBeforedeleteDataBefore() API because it re-uses committed offset to trigger data purge delete operation. Also, it adds complexity to broker implementation and would be more complex to implement than the purgeDataBeforedeleteDataBefore() API. An alternative approach is to implement this logic by running an external service which calls purgeDataBeforedeleteDataBefore() API based on committed offset of consumer groups.

Leader sends PurgeResponse sends DeleteResponse without waiting for low_watermark of all followers to increase above the cutoff offset
This approach would be simpler to implement since it doesn't require DelayedOperationPurgatory for PurgeRequestDeleteRequest. The leader can reply to PurgeRequest DeleteRequest faster since it doesn't need to wait for followers. However, the purgeDataBeforethe deleteDataBefore() API would provide weaker guarantee in this approach because the data may not be deleted if the leader crashes right after it sends PurgeResponseDeleteResponse. It will be useful to know for sure whether the data has been deleted, e.g. when user wants to delete problematic data from upstream so that downstream application can re-consume clean data, or if user wants to delete some sensitive data.

- Purge Delete data on only one partition by each call to purgedeleteDataBefore(...)
This approach would make the implementation of this API simpler, and would be consistent with the existing seek(TopicPartition partition, long offset) API. The downside of this approach is that it either increases the time to purge delete data if the number of partitions is large, or it requires user to take extra effort to parallelize the purgedeleteDataBefore(...). This API may take time longer than seek() for a given partition since the broker needs to wait for follower's action before responding to PurgeDataRequestdeleteDataRequestThus we allow user to specify a map of partitions to make this API easy to use.