Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

There is one additional change that needs to be performed in ThreadMetadata. Currently, this class exposes uses the newly deprecated TaskMetada class in its constructor and it's returned in activeTasks() and standbyTasks() methods. We should deprecate those and add a new pair of methods returning and constructor referencing the new Interface:

Code Block
languagejava
titleorg.apache.kafka.streams.processor.ThreadMetadata
linenumberstrue
import org.apache.kafka.streams.TaskMetadata;

public class ThreadMetadata {

	....

    /**
     * @deprecated since 3.0 use {@link #ThreadMetadata(String, String, String, String, Set, Set, Set, String)} instead
     */
    @Deprecated
    public ThreadMetadata(final String threadName,
                          final String threadState,
                          final String mainConsumerClientId,
                          final String restoreConsumerClientId,
                          final Set<String> producerClientIds,
                          final String adminClientId,
                          final Set<org.apache.kafka.streams.processor.TaskMetadata> activeTasks,
                          final Set<org.apache.kafka.streams.processor.TaskMetadata> standbyTasks) {
        this.mainConsumerClientId = mainConsumerClientId;
        this.restoreConsumerClientId = restoreConsumerClientId;
        this.producerClientIds = producerClientIds;
        this.adminClientId = adminClientId;
        this.threadName = threadName;
        this.threadState = threadState;
        this.activeTasks = Collections.unmodifiableSet(activeTasks.stream().map(metadata -> new TaskMetadataImpl(TaskId.parse(metadata.taskId()),
                metadata.topicPartitions(),
                metadata.committedOffsets(),
                metadata.endOffsets(),
                metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet()));
        this.standbyTasks = Collections.unmodifiableSet(standbyTasks.stream().map(metadata -> new TaskMetadataImpl(TaskId.parse(metadata.taskId()),
                metadata.topicPartitions(),
                metadata.committedOffsets(),
                metadata.endOffsets(),
                metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet()));
    }

    public ThreadMetadata(final String threadName,
                          final String threadState,
                          final String mainConsumerClientId,
                          final String restoreConsumerClientId,
                          final Set<String> producerClientIds,
                          final Set<TaskMetadata> activeTasks,
                          final Set<TaskMetadata> standbyTasks,
                          final String adminClientId) {
        this.mainConsumerClientId = mainConsumerClientId;
        this.restoreConsumerClientId = restoreConsumerClientId;
        this.producerClientIds = producerClientIds;
        this.adminClientId = adminClientId;
        this.threadName = threadName;
        this.threadState = threadState;
        this.activeTasks = Collections.unmodifiableSet(activeTasks);
        this.standbyTasks = Collections.unmodifiableSet(standbyTasks);
    }

    public Set<TaskMetadata> getActiveTasks() {
        return activeTasks;
    }

    public Set<TaskMetadata> getStandbyTasks() {
        return standbyTasks;
    }

    /**
     * @deprecated since 3.0 use {@link #getActiveTasks()}
     */
    @Deprecated
    public Set<org.apache.kafka.streams.processor.TaskMetadata> activeTasks() {
        return activeTasks.stream().map(metadata ->
            new org.apache.kafka.streams.processor.TaskMetadata(metadata.taskId().toString(),
                    metadata.topicPartitions(),
                    metadata.committedOffsets(),
                    metadata.endOffsets(), metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet());
    }

    /**
     * @deprecated since 3.0 use {@link #getStandbyTasks()}
     */
    @Deprecated
    public Set<org.apache.kafka.streams.processor.TaskMetadata> standbyTasks() {
        return standbyTasks.stream().map(metadata ->
                new org.apache.kafka.streams.processor.TaskMetadata(metadata.taskId().toString(),
                        metadata.topicPartitions(),
                        metadata.committedOffsets(),
                        metadata.endOffsets(), metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet());
    }

	....
}

...