You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 17 Next »

Status

Current stateUnder Discussion vote

Discussion thread: https://lists.apache.org/thread.html/r2dc1cabfb75df6117bc464c29f0700dd1214fd1352bacfa99381b9bf%40%3Cdev.kafka.apache.org%3E

JIRA: Unable to render Jira issues macro, execution error.

Motivation

  • user has to specify input topic name in the stream reset tool in order to purge its offsets
  • Adding a new option which could automatically detect all external topics and reset their corresponding offsets

Public Interfaces

  • A new resetAllExternalTopicsOption will be added for user to cleanup all externally committed offsets
  • `reset-all-external-topics` flag add
  • `--input-topic`and `--intermediate-topic` flag allowing regular expressions


add reset-all-external-topics option
public class StreamsResetter {
    private static OptionSpecBuilder resetAllExternalTopicsOption;

    private void parseArguments(final String[] args) {
        ...
        inputTopicsOption = optionParser.accepts("input-topics", "Comma-separated list of user input topics (enable regular expressions). For these topics, the tool will reset the offset to the earliest available offset.")
            .withRequiredArg()
            .ofType(String.class)
            .withValuesSeparatedBy(',')
            .describedAs("list");
        intermediateTopicsOption = optionParser.accepts("intermediate-topics", "Comma-separated list of intermediate user topics (topics used in the through() method) (enable regular expressions). For these topics, the tool will skip to the end.")
            .withRequiredArg()
            .ofType(String.class)
            .withValuesSeparatedBy(',')
            .describedAs("list");
        resetAllExternalTopicsOption = optionParser.accepts("reset-all-external-topics", "Reset tool such that when enabled, delete offsets for all involved topics");
    }

    private int maybeResetInputAndSeekToEndIntermediateTopicOffsets(final Map consumerConfig,
                                                                    final boolean dryRun) {
			...
            if(resetAllExternalTopics) {
                final Set<String> externalTopics = allTopics.stream()
                    .filter(topic -> !isInputTopic(topic) && !isIntermediateTopic(topic))
                    .collect(Collectors.toSet());

                final List<TopicPartition> externalPartitions = externalTopics.stream().map(client::partitionsFor)
                    .flatMap(Collection::stream)
                    .map(info -> new TopicPartition(info.topic(), info.partition()))
                    .collect(Collectors.toList());

                client.assign(externalPartitions);
                inputTopicPartitions.addAll(externalPartitions);
                partitions.addAll(externalPartitions);
            }
			...

	}

    private boolean isInputTopic(final String topic) {
        final List<String> inputTopics = options.valuesOf(inputTopicsOption);
        return inputTopics.stream().anyMatch(inputTopic -> {
            final Pattern pattern = Pattern.compile(inputTopic);
            return inputTopic.equals(topic) || pattern.matcher(topic).matches();
        });
    }

    private boolean isIntermediateTopic(final String topic) {
        final List<String> intermediateTopics = options.valuesOf(intermediateTopicsOption);
        return intermediateTopics.stream().anyMatch(intermediateTopic -> {
            final Pattern pattern = Pattern.compile(intermediateTopic);
            return intermediateTopic.equals(topic) || pattern.matcher(topic).matches();
        });
    }
}

Compatibility, Deprecation, and Migration Plan

  • Because it is an option addition Compatibility, Deprecation, and Migration Plan are not a problem

Rejected Alternatives

Not applicable.

  • No labels