Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Code Block
titlePurgeRequest
PurgeRequest => topics timeout
  topics => [PurgeRequestTopic]
  timeout => int32
 
PurgeRequestTopic => topic partitions
  topic => str
  partitions => [PurgeRequestPartition]
 
PurgeRequestPartition => partition offset
  partition => int32
  offset => int64  // offset -1L will be translated into high_watermark of the partition when leader receives the request.

 

Create PurgeReponsePurgeResponse

 

Code Block
titlePurgeReponse
PurgeReponsePurgeResponse => topics
  topics => [PurgeResponseTopic]
 
PurgeResponseTopic => topic partitions
  topic => str
  partitions => [PurgeResponsePartition]
 
PurgeResponsePartition => partition low_watermark error_code
  partition => int32
  low_watermark => int64
  error_code => int16

 

Add a lowlog_begin_watermark offset field to FetchRequestPartition

Code Block
titleFetchRequestPartition
FetchRequestPartition => partition fetch_offset low_watermark max_bytes
  partition => int32
  fetch_offset => int64
  lowlog_begin_watermarkoffset => int64  <-- NEW. If it is issued from consumer, the value is -1. Otherwise, this is the lowlog_begin_watermarkoffset of this partition on the follower.
  max_bytes => int32

 

Add a lowlog_begin_watermark offset field to FetchResponsePartitionHeader 

...

Code Block
titleFetchResponsePartitionHeader
FetchResponsePartitionHeader => partition error_code high_watermark low_watermark
  partition => int32
  error_code => int16
  high_watermark => int64
  lowlog_begin_watermarkoffset => int64  <-- NEW. This is the low_watermark of this partition on the leader.

...

We create one more checkpoint file, named "replicationlog-lowbegin-watermarkoffset-checkpoint", in every log directory. The checkpoint file will have the same format as existing checkpoint files (e.g. replication-offset-checkpoint) which map TopicPartition to Long.

...

The idea is to add new APIs in Admin Client (see KIP-4) that can be called by user to purge data that is no longer needed. New request and response needs to be added to communicate this request between client and broker. Given the impact of this API on the data, the API should be protected by Kafka’s authorization mechanism described in KIP-11 to prevent malicious or unintended data deletion. Furthermore, we adopt the soft delete approach because it is expensive to purge data in the middle of a segment. Those segments whose maximum offset < offset-to-purge can be deleted safely. Brokers can increment lowlog_begin_watermark offset of a partition above to offset-to-purge so that data with offset < offset-to-purge will not be exposed to consumer even if it is still on the disk. And the low_watermark will log_begin_offset will be checkpointed periodically similar to high_watermark to be persistent. 

Note that the way broker handles PurgeRequest is similar to how it handles ProduceRequest with ack = -1 and isr=all_replicas, e.g. it the leader waits for all followers to catch up withlow_watermarkwith its log_begin_offset, doesn't expose message below lowlog_begin_watermarkoffset, and checkpoints log_begin_offset periodically. The low_watermark periodically of a partition will be the minimum log_begin_offset of all replicas of this partition and this value will be returned to user in PurgeResponse.

Please refer to public interface section for our design of the API, request and response. In this section we will describe how broker maintains low watermark per partition, how client communicates with broker to purge old data, and how this API can be protected by authorization.

...

3) After receiving the PurgeRequest, for each partition in the PurgeRequest, the leader first sets offsetToPurge to high_watermark if offsetToPurge is -1L. It then sets lowlog_begin_watermarkoffset of leader replica to max(lowlog_begin_watermarkoffsetoffsetToPurge) if offsetToPurge <= high_watermark. Those segments whose largest offset < lowlog_begin_watermarkoffset will be deleted by the leader.

4) The leader puts the PurgeRequest into a DelayedOperationPurgatory. The PurgeRequest can be completed when results for all partitions specified in the PurgeRequest are available. The result of a partition will be available within RequestTimeoutMs and it is determined using the following logic:

  • If lowlog_begin_watermarkoffset of this partition on all followers is larger than or equal to the offsetToPurge, the result of this partition will be the its low_watermark of the leader replica, which is the minimum log_begin_offset of all its replicas.
  • If high_watermark of this partition is smaller than the offsetToPurge, the result of this partition will be OffsetOutOfRangeException.
  • If any replica of this partition goes offline, the result of this partition will be NotEnoughReplicasException
  • If the leadership of this partition moves to another broker, the result of this partition will be NotLeaderException
  • If the result of this partition is not available after RequestTimeoutMs, the result of this partition will be TimeoutException

5) The leader sends FetchResponse with low_watermark with its log_begin_offset to followers.

6) Follower sets replica's lowlog_begin_watermarkoffset to the max(low_watermark read from FetchResponse, replica's low_watermarklog_begin_offset of leader, log_begin_offset of local replica). It also deletes those segments whose largest offset < lowlog_begin_watermarkoffset.

7) Follower sends FetchRequest with replica's lowlog_begin_watermarkoffset to the leader.

8) The leader updates lowlog_begin_watermarkoffset of each follower. If the PurgeRequest can be completed, the leader removes the PurgeRequest from DelayedOperationPurgatory andsends  and sends PurgeResponse with the results (i.e. low_watermark or error) for the specified set of partitions.

9) If admin client does not receive PurgeResponse from a broker within RequestTimeoutMs, the PurgeDataResult of the partitions on that broker will bePurgeDataResult(low_watermark = -1, error = TimeoutException). Otherwise, the PurgeDataResult of each partition will be constructed using the low_watermark and the errorof the corresponding partition which is read from the PurgeDataResponse received from its leader broker. purgeDataBefore(...).get() will unblock and returnreturn Map<TopicPartition, PurgeDataResult> when PurgeDataResult of all partitions specified in the offsetForPartition param are available.

...

- Broker will delete those segments whose largest offset < lowlog_begin_watermarkoffset.

- Only message with offset >= lowlog_begin_watermarkoffset can be sent to consumer.

- When a segment is deleted due to log retention, broker updates lowlog_begin_watermarkoffset to max(lowlog_begin_watermarkoffset, smallest offset in the replica's log)

- Broker will checkpoint lowlog_begin_watermarkoffset for all replicas periodically in the file "replicationlog-lowbegin-watermarkoffset-checkpoint", in the same way it checkpoints high_watermark of replicas. The checkpiont checkpoint file will have the same format as existing checkpoint files which map TopicPartition to Long.

...

Given the potential damage that can be caused if this API is used by mistake, it is important that we limit its usage to only authorized users. For this matter, we can take advantage of the existing authorization framework implemented in KIP-11purgeDataBefore() will have the same authorization setting as deleteTopic(). Its operation type is be DELETE and its resource type is TOPIC.

4) ListOffsetRequest

low_watermark of log_begin_offset of a partition will be used to decide the smallest offset of the partition that will be exposed to consumer. It will be returned when smallest_offset option is used in the ListOffsetRequest.

...