...
DeleteDataResult(long: low_watermark, error: Exception)
2) Protocol
Create DeleteRequestDeleteRecordsRequest
Code Block | ||
---|---|---|
| ||
DeleteRequestDeleteRecordsRequest => topics timeout topics => [DeleteRequestTopicDeleteRecordsRequestTopic] timeout => int32 DeleteRequestTopicDeleteRecordsRequestTopic => topic partitions topic => str partitions => [DeleteRequestPartitionDeleteRecordsRequestPartition] DeleteRequestPartitionDeleteRecordsRequestPartition => partition offset partition => int32 offset => int64 // offset -1L will be translated into high_watermark of the partition when leader receives the request. |
Create DeleteResponseDeleteRecordsResponse
Code Block | ||
---|---|---|
| ||
DeleteResponseDeleteRecordsResponse => topics topics => [DeleteResponseTopicDeleteRecordsResponseTopic] DeleteResponseTopicDeleteRecordsResponseTopic => topic partitions topic => str partitions => [DeleteResponsePartitionDeleteRecordsResponsePartition] DeleteResponsePartitionDeleteRecordsResponsePartition => partition low_watermark error_code partition => int32 low_watermark => int64 error_code => int16 |
...
Note that the way broker handles DeleteRequest
is DeleteRecordsRequest is similar to how it handles ProduceRequest
with ack = all and isr=all_live_replicas, e.g. the leader waits for all followers to catch up with its log_start_offset
, doesn't expose message below log_start_offset
, and checkpoints log_start_offset
periodically. The low_watermark
of a partition will be the minimum log_start_offset
of all replicas of this partition and this value will be returned to user in DeleteResponse
DeleteRecordsResponse.
Please refer to public interface section for our design of the API, request and response. In this section we will describe how broker maintains low watermark per partition, how client communicates with broker to delete old data, and how this API can be protected by authorization.
...
1) User application determines the maximum offset of data that can be deleted per partition. This information is provided to deleteDataBefore()
as Map<TopicPartition, Long>. If users application only knows timestamp of data that can be deleted per partition, they can use offsetsForTimes()
API to convert the cutoff timestamp into offsetToDelete per partition before providing the map to deleteData
Before
() API.
2) Admin Client builds DeleteRequest
using DeleteRecordsRequest using the offsetToDelete from deleteDataBefore
() parameter and the requestTimeoutMs
is taken from the AdminClient
constructor. One DeleteRequest
DeleteRecordsRequest
is sent to each broker that acts as leader of any partition in the request. The request should only include partitions which the broker leads.
3) After receiving the DeleteRequest
DeleteRecordsRequest
, for each partition in the DeleteRequest
DeleteRecordsRequest
, the leader first sets offsetToDelete to high_watermark
if offsetToDelete is -1L. It then sets log_start_offset
of leader replica to max
(log_start_offset
, offsetToDelete) if offsetToDelete <= high_watermark
. Those segments whose largest offset < log_start_offset
will be deleted by the leader.
4) The leader puts the DeleteRequest
DeleteRecordsRequest
into a DelayedOperationPurgatory
. The DeleteRequest
DeleteRecordsRequest
can be completed when results for all partitions specified in the DeleteRequest DeleteRecordsRequest are available. The result of a partition will be available within RequestTimeoutMs and it is determined using the following logic:
...
8) The leader updates log_start_offset
of each follower. If the DeleteRequest
DeleteRecordsRequest
can be completed, the leader removes the DeleteRequest
DeleteRecordsRequest
from DelayedOperationPurgatory
and sends DeleteResponse
DeleteRecordsResponse
with the results (i.e. low_watermark or error) for the specified set of partitions.
9) If admin client does not receive DeleteResponse
DeleteRecordsResponse
from a broker within RequestTimeoutMs
, the DeleteDataResult
of the partitions on that broker will beDeleteDataResult
(low_watermark = -1, error = TimeoutException). Otherwise, the DeleteDataResult
of each partition will be constructed using the low_watermark
and the error
of the corresponding partition which is read from the DeleteDataResponse
received from its leader broker. delete
DataBefore(...).get()
will unblock and return Map<TopicPartition, DeleteDataResult>
when DeleteDataResult
of all partitions specified in the offsetForPartition
param are available.
...
- Using committed offset instead of an extra API to trigger data delete operation. Delete data if its offset is smaller than committed offset of all consumer groups that need to consume from this partition.
deleteDataBefore
() can be called, which can be hard to do if there are multiple consumer groups interested in consuming this topic. The disadvantage of this approach is that it is less flexible than deleteDataBefore
() API because it re-uses committed offset to trigger data delete operation. Also, it adds complexity to broker implementation and would be more complex to implement than the deleteDataBefore
() API. An alternative approach is to implement this logic by running an external service which calls deleteDataBefore
() API based on committed offset of consumer groups.low_watermark
of all followers to increase above the cutoff offset