THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
- A new resetAllExternalTopicsOption will be added for user to cleanup all externally committed offsets
- `reset--all-externalinput-topics` flag add
- `--input-topic`and `--intermediate-topic` flag allowing regular expressions
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class StreamsResetter { /* 'reset-all-externalinput-topics' add flag */ private static OptionSpecBuilder resetAllExternalTopicsOptionallInputTopicsOption; private void parseArguments(final String[] args) { ... inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics (enable regular expressions). For these topics, the tool will reset the offset to the earliest available offset.") .withRequiredArg() .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method) (enable regular expressions). For these topics, the tool will skip to the end.") .withRequiredArg() .ofType(String.class) .withValuesSeparatedBy(',') .describedAs("list"); resetAllExternalTopicsOptionallInputTopicsOption = optionParser.accepts("reset-all-externalinput-topics", "Reset tool such that when enabled, delete offsets for all involved topics"); } private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig, final boolean dryRun) { ... if(resetAllExternalTopicsallInputTopics) { final Set<String> externalTopics = allTopics.stream() .filter(topic -> !isInputTopic(topic) && !isIntermediateTopic(topic)) .collect(Collectors.toSet()); final List<TopicPartition> externalPartitions = externalTopics.stream().map(client::partitionsFor) .flatMap(Collection::stream) .map(info -> new TopicPartition(info.topic(), info.partition())) .collect(Collectors.toList()); client.assign(externalPartitions); inputTopicPartitions.addAll(externalPartitions); partitions.addAll(externalPartitions); } ... } private boolean isInputTopic(final String topic) { final List<String> inputTopics = options.valuesOf(inputTopicsOption); return inputTopics.stream().anyMatch(inputTopic -> { final Pattern pattern = Pattern.compile(inputTopic); return inputTopic.equals(topic) || pattern.matcher(topic).matches(); }); } private boolean isIntermediateTopic(final String topic) { final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption); return intermediateTopics.stream().anyMatch(intermediateTopic -> { final Pattern pattern = Pattern.compile(intermediateTopic); return intermediateTopic.equals(topic) || pattern.matcher(topic).matches(); }); } } |
...