Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

DeleteDataResult(long: low_watermark, error: Exception)

2) Protocol

Create DeleteRequestDeleteRecordsRequest

 

Code Block
titlePurgeRequest
DeleteRequestDeleteRecordsRequest => topics timeout
  topics => [DeleteRequestTopicDeleteRecordsRequestTopic]
  timeout => int32
 
DeleteRequestTopicDeleteRecordsRequestTopic => topic partitions
  topic => str
  partitions => [DeleteRequestPartitionDeleteRecordsRequestPartition]
 
DeleteRequestPartitionDeleteRecordsRequestPartition => partition offset
  partition => int32
  offset => int64  // offset -1L will be translated into high_watermark of the partition when leader receives the request.

 

Create DeleteResponseDeleteRecordsResponse

 

Code Block
titlePurgeReponse
DeleteResponseDeleteRecordsResponse => topics
  topics => [DeleteResponseTopicDeleteRecordsResponseTopic]
 
DeleteResponseTopicDeleteRecordsResponseTopic => topic partitions
  topic => str
  partitions => [DeleteResponsePartitionDeleteRecordsResponsePartition]
 
DeleteResponsePartitionDeleteRecordsResponsePartition => partition low_watermark error_code
  partition => int32
  low_watermark => int64
  error_code => int16

 

...

Note that the way broker handles DeleteRequest is DeleteRecordsRequest is similar to how it handles ProduceRequest with ack = all and isr=all_live_replicas, e.g. the leader waits for all followers to catch up with its log_start_offset, doesn't expose message below log_start_offset, and checkpoints log_start_offset periodically. The low_watermark of a partition will be the minimum log_start_offset of all replicas of this partition and this value will be returned to user in DeleteResponseDeleteRecordsResponse.

Please refer to public interface section for our design of the API, request and response. In this section we will describe how broker maintains low watermark per partition, how client communicates with broker to delete old data, and how this API can be protected by authorization.

...

1) User application determines the maximum offset of data that can be deleted per partition. This information is provided to deleteDataBefore() as Map<TopicPartition, Long>. If users application only knows timestamp of data that can be deleted per partition, they can use offsetsForTimes() API to convert the cutoff timestamp into offsetToDelete per partition before providing the map to deleteDataBefore() API.

2) Admin Client builds DeleteRequest using DeleteRecordsRequest using the offsetToDelete from deleteDataBefore() parameter and the requestTimeoutMs is taken from the AdminClient constructor. One DeleteRequestDeleteRecordsRequest is sent to each broker that acts as leader of any partition in the request. The request should only include partitions which the broker leads.

3) After receiving the DeleteRequestDeleteRecordsRequest, for each partition in the DeleteRequestDeleteRecordsRequest, the leader first sets offsetToDelete to high_watermark if offsetToDelete is -1L. It then sets log_start_offset of leader replica to max(log_start_offsetoffsetToDelete) if offsetToDelete <= high_watermark. Those segments whose largest offset < log_start_offset will be deleted by the leader.

4) The leader puts the DeleteRequestDeleteRecordsRequest into a DelayedOperationPurgatory. The DeleteRequestDeleteRecordsRequest can be completed when results for all partitions specified in the DeleteRequest DeleteRecordsRequest are available. The result of a partition will be available within RequestTimeoutMs and it is determined using the following logic:

...

8) The leader updates log_start_offset of each follower. If the DeleteRequestDeleteRecordsRequest can be completed, the leader removes the DeleteRequestDeleteRecordsRequest from DelayedOperationPurgatory and sends DeleteResponseDeleteRecordsResponse with the results (i.e. low_watermark or error) for the specified set of partitions.

9) If admin client does not receive DeleteResponseDeleteRecordsResponse from a broker within RequestTimeoutMs, the DeleteDataResult of the partitions on that broker will beDeleteDataResult(low_watermark = -1, error = TimeoutException). Otherwise, the DeleteDataResult of each partition will be constructed using the low_watermark and the errorof the corresponding partition which is read from the DeleteDataResponse received from its leader broker. deleteDataBefore(...).get() will unblock and return Map<TopicPartition, DeleteDataResult> when DeleteDataResult of all partitions specified in the offsetForPartition param are available.

...


- Using committed offset instead of an extra API to trigger data delete operation. Delete data if its offset is smaller than committed offset of all consumer groups that need to consume from this partition.
This approach is discussed in KIP-68. The advantage of this approach is that it doesn't need coordination of user applications to determine when deleteDataBefore() can be called, which can be hard to do if there are multiple consumer groups interested in consuming this topic. The disadvantage of this approach is that it is less flexible than deleteDataBefore() API because it re-uses committed offset to trigger data delete operation. Also, it adds complexity to broker implementation and would be more complex to implement than the deleteDataBefore() API. An alternative approach is to implement this logic by running an external service which calls deleteDataBefore() API based on committed offset of consumer groups.

Leader sends DeleteResponse sends DeleteRecordsResponse without waiting for low_watermark of all followers to increase above the cutoff offset
This approach would be simpler to implement since it doesn't require DelayedOperationPurgatory for DeleteRequestDeleteRecordsRequest. The leader can reply to DeleteRequest DeleteRecordsRequest faster since it doesn't need to wait for followers. However, the deleteDataBefore() API would provide weaker guarantee in this approach because the data may not be deleted if the leader crashes right after it sends DeleteResponseDeleteRecordsResponse. It will be useful to know for sure whether the data has been deleted, e.g. when user wants to delete problematic data from upstream so that downstream application can re-consume clean data, or if user wants to delete some sensitive data.

- Delete data on only one partition by each call to deleteDataBefore(...)
This approach would make the implementation of this API simpler, and would be consistent with the existing seek(TopicPartition partition, long offset) API. The downside of this approach is that it either increases the time to delete data if the number of partitions is large, or it requires user to take extra effort to parallelize the deleteDataBefore(...). This API may take time longer than seek() for a given partition since the broker needs to wait for follower's action before responding to deleteDataRequest. Thus we allow user to specify a map of partitions to make this API easy to use.