Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Current stateUnder Discussion vote

Discussion thread: https://lists.apache.org/thread.html/r2dc1cabfb75df6117bc464c29f0700dd1214fd1352bacfa99381b9bf%40%3Cdevrdc2c6426e957f68c311fde0752c6611666d3e00e5b7b8dcd69386906@%3Cdev.kafka.apache.org%3E

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-9042

...

  • A new resetAllExternalTopicsOption will be added for user to cleanup all externally committed offsets
  • `reset--all-externaluser-topics` flag add`--input-topic`and `--intermediate-topic` flag allowing regular expressions


Code Block
languagejava
titleadd reset-all-external-topics option
public class StreamsResetter {
    private static OptionSpecBuilder resetAllExternalTopicsOption;
	 /*
    private void parseArguments(final String[] args) {
        ...
        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics (enable regular expressions). For these topics, the tool will reset the offset to the earliest available offset.")'all-user-topics' add flag
            .withRequiredArg()*/
    private static       .ofType(String.class)
            .withValuesSeparatedBy(',')
            .describedAs("list");
        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method) (enable regular expressions). For these topics, the tool will skip to the end.")
            .withRequiredArg()
            .ofType(String.class)
            .withValuesSeparatedBy(',')
            .describedAs("list");
        resetAllExternalTopicsOption = optionParser.accepts("reset-all-external-topics", "Reset tool such that when enabled, delete offsets for all involved topics");
    }

    private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig,
                                                                    final boolean dryRun) {
			...
            if(resetAllExternalTopics) {
                final Set<String> externalTopics = allTopics.stream()
                    .filter(topic -> !isInputTopic(topic) && !isIntermediateTopic(topic))
                    .collect(Collectors.toSet());

                final List<TopicPartition> externalPartitions = externalTopics.stream().map(client::partitionsFor)
                    .flatMap(Collection::stream)
                    .map(info -> new TopicPartition(info.topic(), info.partition()))
                    .collect(Collectors.toList());

                client.assign(externalPartitions);
                inputTopicPartitions.addAll(externalPartitions);
                partitions.addAll(externalPartitions);
            }
			...

	}

    private boolean isInputTopic(final String topic) {
        final List<String> inputTopics = options.valuesOf(inputTopicsOption);
        return inputTopics.stream().anyMatch(inputTopic -> {
            final Pattern pattern = Pattern.compile(inputTopic);
            return inputTopic.equals(topic) || pattern.matcher(topic).matches();
        });
    }

    private boolean isIntermediateTopic(final String topic) {
        final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
        return intermediateTopics.stream().anyMatch(intermediateTopic -> {
            final Pattern pattern = Pattern.compile(intermediateTopic);
            return intermediateTopic.equals(topic) || pattern.matcher(topic).matches();
        });
    }
}OptionSpecBuilder allUserTopicsOption;
}

Proposed Changes

  • --all-user-topics Reset tool to delete offsets for all topics related to available groups

Compatibility, Deprecation, and Migration Plan

...