Status
Current state: Under Discussion
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Kafka can be used in a stream processing pipeline to pass intermediate data between processing jobs. The amount of intermediate data generated from stream processing jobs can taken a large amount of disk space in the Kafka. It is important that we can delete this data soon after it is consumed by downstream application, otherwise we have to pay significant cost to purchase disks for Kafka clusters to keep those data.
However, Kafka doesn’t provide any mechanism to delete data after data is consumed by downstream jobs. It provides only time-based and size-based log retention policy, both of which are agnostic to consumer’s behavior. If we set small time-based log retention for intermediate data, the data may be deleted even before it is consumed by downstream jobs. If we set large time-based log retention, the data will take large amount of disk space for a long time. Neither solution is good for Kafka users. To address this problem, we propose to add a new admin API which can be called by user to purge data that is no longer needed.
Note that this KIP is related to and overwrites KIP-47.
Public Interfaces
1) Java API
- Add the following API in Admin Client. This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.
Future<Map<TopicPartition, PurgeDataResult>> purgeDataBefore(Map<TopicPartition, Long> offsetForPartition)
- PurgeDataResult has the following two fields, which tells user if the data has been successfully purged for the corresponding partition.
PurgeDataResult(long: low_watermark, error: Exception)
2) Protocol
Create PurgeRequest
PurgeRequest => topics topics => [PurgeRequestTopic] timeout => int32 PurgeRequestTopic => topic partitions topic => str partitions => [PurgeRequestPartition] PurgeRequestPartition => partition offset partition => int32 offset => int64 // offset -1L will be translated into high_watermark of the partition when leader receives the request.
Create PurgeReponse
PurgeReponse => topics topics => [PurgeResponseTopic] PurgeResponseTopic => topic partitions topic => str partitions => [PurgeResponsePartition] PurgeResponsePartition => partition low_watermark error_code partition => int32 low_watermark => int32 error_code => int16
Add a low_watermark field to FetchRequestPartition
FetchRequestPartition => partition fetch_offset low_watermark max_bytes partition => int32 fetch_offset => int64 low_watermark => int64 <-- NEW. If it is issued from consumer, the value is -1. Otherwise, this is the low_watermark of this partition on the follower. max_bytes => int32
FetchResponsePartitionHeader => partition error_code high_watermark low_watermark partition => int32 error_code => int16 high_watermark => int64 low_watermark => int64 <-- NEW. This is the low_watermark of this partition on the leader.
3) Checkpoint file
We create one more checkpoint file, named "replication-low-watermark-checkpoint", in every log directory. The checkpoint file will have the same format as existing checkpoint files (e.g. replication-offset-checkpoint) which map TopicPartition to Long.
Proposed Changes
The idea is to add new APIs in Admin Client (see KIP-4) that can be called by user to purge data that is no longer needed. New request and response needs to be added to communicate this request between client and broker. Given the impact of this API on the data, the API should be protected by Kafka’s authorization mechanism described in KIP-11 to prevent malicious or unintended data deletion. Furthermore, we adopt the soft delete approach because it is expensive to purge data in the middle of a segment. Those segments whose maximum offset < offset-to-purge can be deleted safely. Brokers can increment low_watermark of a partition above offset-to-purge so that data with offset < offset-to-purge will not be exposed to consumer even if it is still on the disk. And the low_watermark will be checkpointed periodically similar to high_watermark to be persistent.
Note that the way broker handles PurgeRequest
is similar to how it handles ProduceRequest
with ack = -1 and isr=all_replicas, e.g. it waits for all followers to catch up withlow_watermark
, doesn't expose message below low_watermark
, and checkpoints low_watermark
periodically.
Please refer to public interface section for our design of the API, request and response. In this section we will describe how broker maintains low watermark per partition, how client communicates with broker to purge old data, and how this API can be protected by authorization.
1) Interaction between user application and brokers
1) User application determines the maximum offset of data that can be purged per partition. This information is provided to purgeDataBefore()
as Map<TopicPartition, Long>. If users application only knows timestamp of data that can be purged per partition, they can use offsetsForTimes()
API to convert the cutoff timestamp into offsetToPurge per partition before providing the map to purgeData
Before
() API.
2) Admin Client builds PurgeRequest
using the offsetToPurge from purgeDataBefore
() parameter and the requestTimeoutMs
is taken from the AdminClient
constructor. OnePurgeRequest
is sent to each broker that acts as leader of any partition in the request. The request should only include partitions which the broker leads.
3) After receiving the PurgeRequest
, for each partition in the PurgeRequest
, the leader first sets offsetToPurge to high_watermark
if offsetToPurge is -1L. It then sets low_watermark
of leader replica to max
(low_watermark
, offsetToPurge) if offsetToPurge <= high_watermark
. Those segments whose largest offset < low_watermark
will be deleted by the leader.
4) The leader puts the PurgeRequest
into a DelayedOperationPurgatory
. The PurgeRequest
can be completed when results for all partitions specified in the PurgeRequest are available. The result of a partition will be available within RequestTimeoutMs and it is determined using the following logic:
- If
low_watermark
of this partition on all followers is larger than or equal to the offsetToPurge, the result of this partition will be thelow_watermark
of the leader replica. - If
high_watermark
of this partition is smaller than the offsetToPurge, the result of this partition will beOffsetOutOfRangeException
. - If any replica of this partition goes offline, the result of this partition will be
NotEnoughReplicasException
- If the leadership of this partition moves to another broker, the result of this partition will be
NotLeaderException
- If the result of this partition is not available after
RequestTimeoutMs
, the result of this partition will beTimeoutException
5) The leader sends FetchResponse
with low_watermark
to followers.
6) Follower sets replica's low_watermark
to the max(low_watermark
read from FetchResponse, replica's low_watermark
). It also deletes those segments whose largest offset < low_watermark
.
7) Follower sends FetchRequest
with replica's low_watermark
to the leader.
8) The leader updates low_watermark
of each follower. If the PurgeRequest
can be completed, the leader removes the PurgeRequest
from DelayedOperationPurgatory
andsends PurgeResponse
with the results (i.e. low_watermark or error) for the specified set of partitions.
9) If admin client does not receive PurgeResponse
from a broker within RequestTimeoutMs
, the PurgeDataResult
of the partitions on that broker will bePurgeDataResult
(low_watermark = -1, error = TimeoutException). Otherwise, the PurgeDataResult
of each partition will be constructed using the low_watermark
and the error
of the corresponding partition which is read from the PurgeDataResponse
received from its leader broker. purge
DataBefore(...).get()
will unblock and returnMap<TopicPartition, PurgeDataResult>
when PurgeDataResult
of all partitions specified in the offsetForPartition
param are available.
2) Routine operation in the broker
- Broker will delete those segments whose largest offset < low_watermark
.
- Only message with offset >= low_watermark
can be sent to consumer.
- When a segment is deleted due to log retention, broker updates low_watermark
to max(low_watermark
, smallest offset in the replica's log)
- Broker will checkpoint low_watermark
for all replicas periodically in the file "replication-low-watermark-checkpoint", in the same way it checkpoints high_watermark
of replicas. The checkpiont file will have the same format as existing checkpoint files which map TopicPartition to Long.
3) API Authorization
Given the potential damage that can be caused if this API is used by mistake, it is important that we limit its usage to only authorized users. For this matter, we can take advantage of the existing authorization framework implemented in KIP-11. purgeDataBefore()
will have the same authorization setting as deleteTopic()
. Its operation type is be DELETE and its resource type is TOPIC.
4) ListOffsetRequest
low_watermark
of a partition will be used to decide the smallest offset of the partition that will be exposed to consumer. It will returned when smallest_offset option is used in the ListOffsetRequest.
Compatibility, Deprecation, and Migration Plan
This KIP is a pure addition, so there is no backward compatibility concern.
The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.
Test Plan
- Unit tests to validate that all the individual components work as expected.
- Integration tests to ensure that the feature works correctly end-to-end.
Rejected Alternatives
- Using committed offset instead of an extra API to trigger data purge operation. Purge data if its offset is smaller than committed offset of all consumer groups that need to consume from this partition.
purgeDataBefore
() can be called, which can be hard to do if there are multiple consumer groups interested in consuming this topic. The disadvantage of this approach is that it is less flexible than purgeDataBefore
() API because it re-uses committed offset to trigger data purge operation. Also, it adds complexity to broker implementation and would be more complex to implement than the purgeDataBefore
() API. An alternative approach is to implement this logic by running an external service which calls purgeDataBefore
() API based on committed offset of consumer groups.low_watermark
of all followers to increase above the cutoff offset