THIS IS A TEST INSTANCE. ALL YOUR CHANGES WILL BE LOST!!!!
...
Code Block | ||||
---|---|---|---|---|
| ||||
public class StreamsResetter {
/*
'reset-all-external-topics' add flag
*/
private static OptionSpecBuilder resetAllExternalTopicsOption;
private void parseArguments(final String[] args) {
...
inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics (enable regular expressions). For these topics, the tool will reset the offset to the earliest available offset.")
.withRequiredArg()
.ofType(String.class)
.withValuesSeparatedBy(',')
.describedAs("list");
intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method) (enable regular expressions). For these topics, the tool will skip to the end.")
.withRequiredArg()
.ofType(String.class)
.withValuesSeparatedBy(',')
.describedAs("list");
resetAllExternalTopicsOption = optionParser.accepts("reset-all-external-topics", "Reset tool such that when enabled, delete offsets for all involved topics");
}
private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig,
final boolean dryRun) {
...
if(resetAllExternalTopics) {
final Set<String> externalTopics = allTopics.stream()
.filter(topic -> !isInputTopic(topic) && !isIntermediateTopic(topic))
.collect(Collectors.toSet());
final List<TopicPartition> externalPartitions = externalTopics.stream().map(client::partitionsFor)
.flatMap(Collection::stream)
.map(info -> new TopicPartition(info.topic(), info.partition()))
.collect(Collectors.toList());
client.assign(externalPartitions);
inputTopicPartitions.addAll(externalPartitions);
partitions.addAll(externalPartitions);
}
...
}
private boolean isInputTopic(final String topic) {
final List<String> inputTopics = options.valuesOf(inputTopicsOption);
return inputTopics.stream().anyMatch(inputTopic -> {
final Pattern pattern = Pattern.compile(inputTopic);
return inputTopic.equals(topic) || pattern.matcher(topic).matches();
});
}
private boolean isIntermediateTopic(final String topic) {
final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
return intermediateTopics.stream().anyMatch(intermediateTopic -> {
final Pattern pattern = Pattern.compile(intermediateTopic);
return intermediateTopic.equals(topic) || pattern.matcher(topic).matches();
});
}
} |
...