...
Code Block | ||
---|---|---|
| ||
PurgeResponse => topics topics => [PurgeResponseTopic] PurgeResponseTopic => topic partitions topic => str partitions => [PurgeResponsePartition] PurgeResponsePartition => partition low_watermark error_code partition => int32 low_watermark => int64 error_code => int16 |
Add a log_beginstart_offset field to FetchRequestPartition
Code Block | ||
---|---|---|
| ||
FetchRequestPartition => partition fetch_offset low_watermark max_bytes partition => int32 fetch_offset => int64 log_beginstart_offset => int64 <-- NEW. If it is issued from consumer, the value is -1. Otherwise, this is the log_beginstart_offset of this partition on the follower. max_bytes => int32 |
...
Code Block | ||
---|---|---|
| ||
FetchResponsePartitionHeader => partition error_code high_watermark low_watermark partition => int32 error_code => int16 high_watermark => int64 log_beginstart_offset => int64 <-- NEW. This is the low_watermark of this partition on the leader. |
...
The idea is to add new APIs in Admin Client (see KIP-4) that can be called by user to purge data that is no longer needed. New request and response needs to be added to communicate this request between client and broker. Given the impact of this API on the data, the API should be protected by Kafka’s authorization mechanism described in KIP-11 to prevent malicious or unintended data deletion. Furthermore, we adopt the soft delete approach because it is expensive to purge data in the middle of a segment. Those segments whose maximum offset < offset-to-purge can be deleted safely. Brokers can increment log_beginstart_offset of a partition to offset-to-purge so that data with offset < offset-to-purge will not be exposed to consumer even if it is still on the disk. And the log_beginstart_offset will be checkpointed periodically similar to high_watermark to be persistent.
Note that the way broker handles PurgeRequest
is similar to how it handles ProduceRequest
with ack = all and isr=all_live_replicas, e.g. the leader waits for all followers to catch up with its log_beginstart_offset
, doesn't expose message below log_beginstart_offset
, and checkpoints log_beginstart_offset
periodically. The low_watermark
of a partition will be the minimum log_beginstart_offset
of all replicas of this partition and this value will be returned to user in PurgeResponse
.
...
3) After receiving the PurgeRequest
, for each partition in the PurgeRequest
, the leader first sets offsetToPurge to high_watermark
if offsetToPurge is -1L. It then sets log_beginstart_offset
of leader replica to max
(log_beginstart_offset
, offsetToPurge) if offsetToPurge <= high_watermark
. Those segments whose largest offset < log_beginstart_offset
will be deleted by the leader.
4) The leader puts the PurgeRequest
into a DelayedOperationPurgatory
. The PurgeRequest
can be completed when results for all partitions specified in the PurgeRequest are available. The result of a partition will be available within RequestTimeoutMs and it is determined using the following logic:
- If
log_beginstart_offset
of this partition on all live followers is larger than or equal to the offsetToPurge, the result of this partition will be itslow_watermark
, which is the minimumlog_beginstart_offset
of all its live replicas. - If
high_watermark
of this partition is smaller than the offsetToPurge, the result of this partition will beOffsetOutOfRangeException
. - If the leadership of this partition moves to another broker, the result of this partition will be
NotLeaderException
- If the result of this partition is not available after
RequestTimeoutMs
, the result of this partition will beTimeoutException
5) The leader sends FetchResponse
with its log_beginstart_offset
to followers.
6) Follower sets replica's log_beginstart_offset
to the max(log_beginstart_offset
of leader, log_beginstart_offset
of local replica). It also deletes those segments whose largest offset < log_beginstart_offset
.
7) Follower sends FetchRequest
with replica's log_beginstart_offset
to the leader.
8) The leader updates log_beginstart_offset
of each follower. If the PurgeRequest
can be completed, the leader removes the PurgeRequest
from DelayedOperationPurgatory
and sends PurgeResponse
with the results (i.e. low_watermark or error) for the specified set of partitions.
...
- Broker will delete those segments whose largest offset < log_beginstart_offset
.
- Only message with offset >= log_beginstart_offset
can be sent to consumer.
- When a segment is deleted due to log retention, broker updates log_beginstart_offset
to max(log_beginstart_offset
, smallest offset in the replica's log)
- Broker will checkpoint log_beginstart_offset
for all replicas periodically in the file "log-begin-offset-checkpoint", in the same way it checkpoints high_watermark
of replicas. The checkpoint file will have the same format as existing checkpoint files which map TopicPartition to Long.
...
Given the potential damage that can be caused if this API is used by mistake, it is important that we limit its usage to only authorized users. For this matter, we can take advantage of the existing authorization framework implemented in KIP-11. purgeDataBefore()
will have the same authorization setting as deleteTopic()
. Its operation type is be DELETE and its resource type is TOPIC.
4) ListOffsetRequest
log_beginstart_offset
of a partition will be used to decide the smallest offset of the partition that will be exposed to consumer. It will be returned when smallest_offset option is used in the ListOffsetRequest.
...