Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateAccepted

Discussion thread: here

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka can be used in a stream processing pipeline to pass intermediate data between processing jobs. The amount of intermediate data generated from stream processing jobs can taken a large amount of disk space in the Kafka. It is important that we can delete this data soon after it is consumed by downstream application, otherwise we have to pay significant cost to purchase disks for Kafka clusters to keep those data.

...

Note that this KIP is related to and supersedes KIP-47.


Public Interfaces

1) Java API

- Add the following API in Admin Client. This API returns a future object whose result will be available within RequestTimeoutMs, which is configured when user constructs the AdminClient.

Future<Map<TopicPartition, DeleteDataResult>> deleteDataBeforedeleteRecordsBefore(Map<TopicPartition, Long> offsetForPartition)

...

DeleteDataResult(long: low_watermark, error: Exception)

2) Protocol

Create DeleteRecordsRequest

...

Code Block
titleFetchResponsePartitionHeader
FetchResponsePartitionHeader => partition error_code high_watermark low_watermark
  partition => int32
  error_code => int16
  high_watermark => int64
  log_start_offset => int64  <-- NEW. This is the low_watermark of this partition on the leader.

3) Checkpoint file

We create one more checkpoint file, named "log-begin-offset-checkpoint", in every log directory. The checkpoint file will have the same format as existing checkpoint files (e.g. replication-offset-checkpoint) which map TopicPartition to Long.

4) Script

Add kafka-delete-data.sh that allows user to delete data in the command line. The script requires for the following arguments:

...

Code Block
{
  "version" : int,
  "partitions" : [
    {
      "topic": str,
      "partition": int,
      "offset": long
    },
    ...
  ]
}


Proposed Changes

The idea is to add new APIs in Admin Client (see KIP-4) that can be called by user to delete data that is no longer needed. New request and response needs to be added to communicate this request between client and broker. Given the impact of this API on the data, the API should be protected by Kafka’s authorization mechanism described in KIP-11 to prevent malicious or unintended data deletion. Furthermore, we adopt the soft delete approach because it is expensive to delete data in the middle of a segment. Those segments whose maximum offset < offset-to-delete can be deleted safely. Brokers can increment log_start_offset of a partition to offset-to-delete so that data with offset < offset-to-delete will not be exposed to consumer even if it is still on the disk. And the log_start_offset will be checkpointed periodically similar to high_watermark to be persistent. 

...

Please refer to public interface section for our design of the API, request and response. In this section we will describe how broker maintains low watermark per partition, how client communicates with broker to delete old data, and how this API can be protected by authorization.

1) Interaction between user application and brokers

1) User application determines the maximum offset of data that can be deleted per partition. This information is provided to deleteDataBeforedeleteRecordsBefore() as Map<TopicPartition, Long>. If users application only knows timestamp of data that can be deleted per partition, they can use offsetsForTimes() API to convert the cutoff timestamp into offsetToDelete per partition before providing the map to deleteDataBeforeto deleteRecordsBefore() API.

2) Admin Client builds DeleteRecordsRequest using the offsetToDelete from deleteDataBeforedeleteRecordsBefore() parameter and the requestTimeoutMs is taken from the AdminClient constructor. One DeleteRecordsRequest is sent to each broker that acts as leader of any partition in the request. The request should only include partitions which the broker leads.

...

9) If admin client does not receive DeleteRecordsResponse from a broker within RequestTimeoutMs, the DeleteDataResult of the partitions on that broker will beDeleteDataResult(low_watermark = -1, error = TimeoutException). Otherwise, the DeleteDataResult of each partition will be constructed using the low_watermark and the errorof the corresponding partition which is read from the DeleteDataResponse received from its leader broker. deleteDataBeforedeleteRecordsBefore(...).get() will unblock and return Map<TopicPartition, DeleteDataResult> when DeleteDataResult of all partitions specified in the offsetForPartition param are available.

2) Routine operation in the broker

- Broker will delete those segments whose largest offset < log_start_offset.

...

- Broker will checkpoint log_start_offset for all replicas periodically in the file "log-begin-offset-checkpoint", in the same way it checkpoints high_watermark of replicas. The checkpoint file will have the same format as existing checkpoint files which map TopicPartition to Long.

3) API Authorization

Given the potential damage that can be caused if this API is used by mistake, it is important that we limit its usage to only authorized users. For this matter, we can take advantage of the existing authorization framework implemented in KIP-11deleteDataBeforedeleteRecordsBefore() will have the same authorization setting as deleteTopic(). Its operation type is be DELETE and its resource type is TOPIC.

4) ListOffsetRequest

log_start_offset of a partition will be used to decide the smallest offset of the partition that will be exposed to consumer. It will be returned when smallest_offset option is used in the ListOffsetRequest.

...

 The KIP changes the inter-broker protocol. Therefore the migration requires two rolling bounce. In the first rolling bounce we will deploy the new code but broker will still communicate using the existing protocol. In the second rolling bounce we will change the config so that broker will start to communicate with each other using the new protocol.

Test Plan

- Unit tests to validate that all the individual components work as expected.
- Integration tests to ensure that the feature works correctly end-to-end. 

Rejected Alternatives


- Using committed offset instead of an extra API to trigger data delete operation. Delete data if its offset is smaller than committed offset of all consumer groups that need to consume from this partition.
This approach is discussed in KIP-68. The advantage of this approach is that it doesn't need coordination of user applications to determine when deleteDataBeforedeleteRecordsBefore() can be called, which can be hard to do if there are multiple consumer groups interested in consuming this topic. The disadvantage of this approach is that it is less flexible than deleteDataBeforedeleteRecordsBefore() API because it re-uses committed offset to trigger data delete operation. Also, it adds complexity to broker implementation and would be more complex to implement than the deleteDataBeforedeleteRecordsBefore() API. An alternative approach is to implement this logic by running an external service which calls deleteDataBeforedeleteRecordsBefore() API based on committed offset of consumer groups.

Leader sends DeleteRecordsResponse without waiting for low_watermark of all followers to increase above the cutoff offset
This approach would be simpler to implement since it doesn't require DelayedOperationPurgatory for DeleteRecordsRequest. The leader can reply to DeleteRecordsRequest faster since it doesn't need to wait for followers. However, the deleteDataBeforethe deleteRecordsBefore() API would provide weaker guarantee in this approach because the data may not be deleted if the leader crashes right after it sends DeleteRecordsResponse. It will be useful to know for sure whether the data has been deleted, e.g. when user wants to delete problematic data from upstream so that downstream application can re-consume clean data, or if user wants to delete some sensitive data.

- Delete data on only one partition by each call to deleteDataBefore deleteRecordsBefore(...)
This approach would make the implementation of this API simpler, and would be consistent with the existing seek(TopicPartition partition, long offset) API. The downside of this approach is that it either increases the time to delete data if the number of partitions is large, or it requires user to take extra effort to parallelize the deleteDataBefore deleteRecordsBefore(...). This API may take time longer than seek() for a given partition since the broker needs to wait for follower's action before responding to deleteDataRequest. Thus we allow user to specify a map of partitions to make this API easy to use.