Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleadd reset-all-external-topics option
public class StreamsResetter {
	 /*
    'reset-all-external-topics' add flag
     */
    private static OptionSpecBuilder resetAllExternalTopicsOption;

    private void parseArguments(final String[] args) {
        ...
        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics (enable regular expressions). For these topics, the tool will reset the offset to the earliest available offset.")
            .withRequiredArg()
            .ofType(String.class)
            .withValuesSeparatedBy(',')
            .describedAs("list");
        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method) (enable regular expressions). For these topics, the tool will skip to the end.")
            .withRequiredArg()
            .ofType(String.class)
            .withValuesSeparatedBy(',')
            .describedAs("list");
        resetAllExternalTopicsOption = optionParser.accepts("reset-all-external-topics", "Reset tool such that when enabled, delete offsets for all involved topics");
    }

    private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig,
                                                                    final boolean dryRun) {
			...
            if(resetAllExternalTopics) {
                final Set<String> externalTopics = allTopics.stream()
                    .filter(topic -> !isInputTopic(topic) && !isIntermediateTopic(topic))
                    .collect(Collectors.toSet());

                final List<TopicPartition> externalPartitions = externalTopics.stream().map(client::partitionsFor)
                    .flatMap(Collection::stream)
                    .map(info -> new TopicPartition(info.topic(), info.partition()))
                    .collect(Collectors.toList());

                client.assign(externalPartitions);
                inputTopicPartitions.addAll(externalPartitions);
                partitions.addAll(externalPartitions);
            }
			...

	}

    private boolean isInputTopic(final String topic) {
        final List<String> inputTopics = options.valuesOf(inputTopicsOption);
        return inputTopics.stream().anyMatch(inputTopic -> {
            final Pattern pattern = Pattern.compile(inputTopic);
            return inputTopic.equals(topic) || pattern.matcher(topic).matches();
        });
    }

    private boolean isIntermediateTopic(final String topic) {
        final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
        return intermediateTopics.stream().anyMatch(intermediateTopic -> {
            final Pattern pattern = Pattern.compile(intermediateTopic);
            return intermediateTopic.equals(topic) || pattern.matcher(topic).matches();
        });
    }
}

...