Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

At the same time, the existing TaskId class is a bit awkward, as it's clearly intended to be a simple metadata class to expose the topicGroupId (subtopology) and partition number fields. Yet it has plenty of totally irrelevant (to a user) utility methods, such as de/serialization. It also has completely public fields, and no actual getters for them. We should take this opportunity to migrate to a new public interface with only the getters for the relevant fields, and move all the implementation details and utilities to an internal implementation class.

Public Interfaces

New TaskInfo TaskIdInfo interface added to the public API:

Code Block
languagejava
titleTaskId
package org.apache.kafka.streams.processor;

/**
 * A wrapper class for basic tasktaskId info
 */
public interface TaskInfoTaskIdInfo {
    /**
     * @return The ID of the subtopology that this task executes, ie the topicGroupId
     */
    int subtopologyId();

    /**
     * @return The ID number of the partition.
     */
    int partition();
}

...

Code Block
languagejava
titleTaskIdImpl
package org.apache.streams.processor.internals;

class TaskId implements TaskInfoTaskIdInfo {
    // This is just the existing TaskId class & implementation, but
    // moved to the internals package
}

...

Deprecate TaskId getter on the StateStoreContext and ProcessorContext interfaces, and replace with a getter that returns the TaskInfo TaskIdInfo interface:

Code Block
languagejava
public interface ProcessorContext {

    /**
     * Returns the task infoid.
     *
     * @return the task infoid
     */
    TaskInfoTaskIdInfo taskInfoTaskIdInfo();

    /**
     * Returns the task id.
     *
     * @return the task id
     * @deprecated use {@link #taskInfo#TaskIdInfo()} instead.
     */
    @Deprecated
    TaskId taskId();
}

public interface StateStoreContext {
    /**
     * Returns the task infoid.
     *
     * @return the task infoid
     */
    TaskInfoTaskIdInfo taskInfoTaskIdInfo();
    
    /**
     * Returns the task id.
     *
     * @return the task id
     * @deprecated use {@link #taskInfo#TaskIdInfo()} instead.
     */
    @Deprecated
    TaskId taskId();
}

Deprecate the taskId() getter that returns the task id as a string, and add an API to return the actual TaskInfoTaskIdInfo:

Code Block
languagejava
titleTaskMetadata
package org.apache.kafka.streams.processor;

class TaskMetadata {

    private final TaskId taskId;

    /**
     * @return the basic task metadata such as subtopology and partition id
     */
    public TaskInfoTaskIdInfo taskInfoTaskIdInfo() {
        return taskId;
    }

    /**
     * @return the basic task metadata such as subtopology and partition id
     * @deprecated please use {@link #taskInfo#TaskIdInfo()} instead.
     */
    @Deprecated
    public String taskId() {
        return taskId.toString();
    }
}

...

To separate the implementation from the the public API, we'll deprecate the existing o.a.k.streams.processor.TaskId class and replace it with a public interface TaskInfoTaskIdInfo. The existing TaskId class implementation will be moved to a new o.a.k.streams.processor.internals.TaskId class, which will implement the TaskInfo TaskIdInfo interface.

To migrate from the String to TaskInfo TaskIdInfo in TaskMetadata, we'll need to deprecate the existing taskId() getter method and add a taskInfoTaskIdInfo() getter in its place. Similarly, we will need to deprecate the existing taskId() getters on the ProcessorContext and StateStoreContext interfaces, and replace them with taskInfoTaskIdInfo() getters that return the new public interface instead of the now-deprecated class.

...