Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Code Block
languagejava
titleorg.apache.kafka.streams.processor.TaskMetadata
linenumberstrue
// This method will be removed in this KIP
/**
 * @return the basic task metadata such as subtopology and partition id
*/
public TaskId getTaskId() {
   return taskId;
}

There is one additional change that needs to be performed in ThreadMetadata. Currently, this class exposes the newly deprecated TaskMetada class in activeTasks() and standbyTasks() methods. We should deprecate those and add a new pair of methods returning the new Interface:

Code Block
languagejava
titleorg.apache.kafka.streams.processor.ThreadMetadata
linenumberstrue
import org.apache.kafka.streams.TaskMetadata;

public class ThreadMetadata {

	....

    public Set<TaskMetadata> getActiveTasks() {
        return activeTasks;
    }

    public Set<TaskMetadata> getStandbyTasks() {
        return standbyTasks;
    }

    /**
     * @deprecated since 3.0 use {@link #getActiveTasks()}
     */
    @Deprecated
    public Set<org.apache.kafka.streams.processor.TaskMetadata> activeTasks() {
        return activeTasks.stream().map(metadata ->
            new org.apache.kafka.streams.processor.TaskMetadata(metadata.taskId().toString(),
                    metadata.topicPartitions(),
                    metadata.committedOffsets(),
                    metadata.endOffsets(), metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet());
    }

    /**
     * @deprecated since 3.0 use {@link #getStandbyTasks()}
     */
    @Deprecated
    public Set<org.apache.kafka.streams.processor.TaskMetadata> standbyTasks() {
        return standbyTasks.stream().map(metadata ->
                new org.apache.kafka.streams.processor.TaskMetadata(metadata.taskId().toString(),
                        metadata.topicPartitions(),
                        metadata.committedOffsets(),
                        metadata.endOffsets(), metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet());
    }

	....
}

This last change could be avoided, if we allow ourselves to break source compatibility for this particular section. Otherwise, we need to go for a cycle of deprecation for activeTasks() and standbyTasks() for then, to move the non-normative getActiveTasks() and getStandbyTasks() back to the proper naming scheme.


Proposed Changes

By introducing a new interface on a different package, we are able to keep the same name (no need to come up with some forced name), and also, we are freed from carrying over changes like the one introduced in https://github.com/apache/kafka/pull/10755 where a method getTaskId was introduced (going against Kafka naming convention) because taskId was already taken and needed to be deprecated.
Together with this change, a new class TaskMetadataImpl will be created under org.apache.kafka.streams.processor.internals, which will implement the aforementioned interface with the implementation present on the org.apache.kafka.streams.processor.TaskMetadata. The rest of the Kafka code base using the newly deprecated TaskMetadata will be migrated to use the new classes instead.

...