Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • A new resetAllExternalTopicsOption will be added for user to cleanup all externally committed offsets
  • `--all-inputuser-topics` flag add
  • `--input-topic`and `--intermediate-topic` flag allowing regular expressions


Code Block
languagejava
titleadd reset-all-external-topics option
public class StreamsResetter {
	 /*
    'all-inputuser-topics' add flag
     */
    private static OptionSpecBuilder allInputTopicsOption;

    private void parseArguments(final String[] args) {
        ...
        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics (enable regular expressions). For these topics, the tool will reset the offset to the earliest available offset.")
            .withRequiredArg()
            .ofType(String.class)
            .withValuesSeparatedBy(',')
            .describedAs("list");
        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method) (enable regular expressions). For these topics, the tool will skip to the end.")
            .withRequiredArg()
            .ofType(String.class)
            .withValuesSeparatedBy(',')
            .describedAs("list");
        allInputTopicsOption = optionParser.accepts("all-input-topics", "Reset tool such that when enabled, delete offsets for all involved topics");
    }

    private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig,
                                                                    final boolean dryRun) {
			...
            if(allInputTopics) {
                final Set<String> externalTopics = allTopics.stream()
                    .filter(topic -> !isInputTopic(topic) && !isIntermediateTopic(topic))
                    .collect(Collectors.toSet());

                final List<TopicPartition> externalPartitions = externalTopics.stream().map(client::partitionsFor)
                    .flatMap(Collection::stream)
                    .map(info -> new TopicPartition(info.topic(), info.partition()))
                    .collect(Collectors.toList());

                client.assign(externalPartitions);
                inputTopicPartitions.addAll(externalPartitions);
                partitions.addAll(externalPartitions);
            }
			...

	}

    private boolean isInputTopic(final String topic) {
        final List<String> inputTopics = options.valuesOf(inputTopicsOption);
        return inputTopics.stream().anyMatch(inputTopic -> {
            final Pattern pattern = Pattern.compile(inputTopic);
            return inputTopic.equals(topic) || pattern.matcher(topic).matches();
        });
    }

    private boolean isIntermediateTopic(final String topic) {
        final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
        return intermediateTopics.stream().anyMatch(intermediateTopic -> {
            final Pattern pattern = Pattern.compile(intermediateTopic);
            return intermediateTopic.equals(topic) || pattern.matcher(topic).matches();
        });
    }
}allUserTopicsOption;
}

Proposed Changes

  • --all-user-topics Reset tool to delete offsets for all topics related to available groups

Compatibility, Deprecation, and Migration Plan

...