Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state"Under DiscussionAccepted"

Discussion thread: here 

JIRA: KAFKA-5925 

...

Code Block
languagejava
deleteRecords(Map<TopicPartition, DeleteRecordsTarget>RecordsToDelete> partitionsAndOffsetsrecordsToDelete)

Proposed Changes

AdminClient : deleteRecords()

Code Block
languagejava
public DeleteRecordsResult deleteRecords(Map<TopicPartition, DeleteRecordsTarget>RecordsToDelete> partitionsAndOffsetsrecordsToDelete)
public DeleteRecordsResult deleteRecords(Map<TopicPartition, DeleteRecordsTarget>RecordsToDelete> partitionsAndOffsetsrecordsToDelete, DeleteRecordsOptions options)

...

TopicPartition comes from org.apache.kafka.common package

DeleteRecordsTarget RecordsToDelete, DeleteRecordsOptions and DeleteRecordsResult are defined as follow. 

Code Block
languagejava
/**
 * Options for {@link AdminClient#deleteRecords(Map, DeleteRecordsOptions)}.
 */
public class DeleteRecordsOptions extends AbstractOptions<DeleteRecordsOptions> {

}
 
/**
 * Describe records as targetto delete in a call to delete{@link AdminClient#deleteRecords(Map)}
 */
public class DeleteRecordsTargetRecordsToDelete {
	private long offset;
	
	/** 
	 * Delete all the records before the given {@code offset}
	 *
	 * @param offset    the offset before which all records will be deleted
	 */
	public static DeleteRecordsTargetRecordsToDelete deleteBeforebeforeOffset(long offset) { ... }
}

/**
 * The result of the {@link AdminClient#deleteRecords(Map)} call.
 */ 
public class DeleteRecordsResult {
    // package access constructor
    Map<TopicPartition, KafkaFuture<Long>>KafkaFuture<DeleteRecords>> values() { ... }
    KafkaFuture<Long>KafkaFuture<DeleteRecords> all() { ... }
}
 
/**
 * Represents information about deleted records
 */
public class DeletedRecords {
	private final long lowWatermark;
 
	/**
 	 * Create an instance of this class with the provided parameters.
 	 *
	 * @param lowWatermark  "low watermark" for the topic partition on which the deletion was executed
	 */
	public DeletedRecords(long lowWatermark) {
    	this.lowWatermark = lowWatermark;
	}

	/**
	 * Return the "low watermark" for the topic partition on which the deletion was executed
	 */
	public long lowWatermark() {
    	return lowWatermark;
	}
}

In the DeleteRecordsResult, the Long value accessed by values() and all() method specifies the low watermark as described in the KIP-107.

The deleteRecords() will have the same authorization setting as deleteTopic(). Its operation type is be DELETE and its resource type is TOPIC.

Compatibility, Deprecation, and Migration Plan

This is a new API and won't directly affect existing users.

It should replace the deleteRecordsBefore() method provided by the Scala based Admin Client. So existing users who are using such method, should migrate to use the new Java based Admin Client for this feature.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.