Status
Current state: Under Discussion vote
Discussion thread: https://lists.apache.org/thread.html/r2dc1cabfb75df6117bc464c29f0700dd1214fd1352bacfa99381b9bf%40%3Cdev.kafka.apache.org%3E
JIRA:
Motivation
- user has to specify input topic name in the stream reset tool in order to purge its offsets
Adding a new option which could automatically detect all external topics and reset their corresponding offsets
Public Interfaces
- A new resetAllExternalTopicsOption will be added for user to cleanup all externally committed offsets
- `reset-all-external-topics` flag add
- `--input-topic`and `--intermediate-topic` flag allowing regular expressions
add reset-all-external-topics option
public class StreamsResetter { private static OptionSpecBuilder resetAllExternalTopicsOption; private void parseArguments(final String[] args) { ... inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics (enable regular expressions). For these topics, the tool will reset the offset to the earliest available offset.") .withRequiredArg() .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method) (enable regular expressions). For these topics, the tool will skip to the end.") .withRequiredArg() .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); resetAllExternalTopicsOption = optionParser.accepts("reset-all-external-topics", "Reset tool such that when enabled, delete offsets for all involved topics"); } private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig, final boolean dryRun) { ... if(resetAllExternalTopics) { final Set<String> externalTopics = allTopics.stream() .filter(topic -> !isInputTopic(topic) && !isIntermediateTopic(topic)) .collect(Collectors.toSet()); final List<TopicPartition> externalPartitions = externalTopics.stream().map(client::partitionsFor) .flatMap(Collection::stream) .map(info -> new TopicPartition(info.topic(), info.partition())) .collect(Collectors.toList()); client.assign(externalPartitions); inputTopicPartitions.addAll(externalPartitions); partitions.addAll(externalPartitions); } ... } private boolean isInputTopic(final String topic) { final List<String> inputTopics = options.valuesOf(inputTopicsOption); return inputTopics.stream().anyMatch(inputTopic -> { final Pattern pattern = Pattern.compile(inputTopic); return inputTopic.equals(topic) || pattern.matcher(topic).matches(); }); } private boolean isIntermediateTopic(final String topic) { final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption); return intermediateTopics.stream().anyMatch(intermediateTopic -> { final Pattern pattern = Pattern.compile(intermediateTopic); return intermediateTopic.equals(topic) || pattern.matcher(topic).matches(); }); } }
Compatibility, Deprecation, and Migration Plan
Because it is an option addition Compatibility, Deprecation, and Migration Plan are not a problem
Rejected Alternatives
Not applicable.