Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

The TaskMetadata class in KafkaStreams is used for encoding information about a particular task, such as its offsets, topic partitions, and taskId. For some reason, the taskId is stored and exposed as a String object, rather than using the actual TaskId class. We should move towards using TaskId here, since that's literally what it is, and because if/when we add additional fields to the TaskId it will become more and more unwieldy to parse the string encoding in order to extract the actual information about the task.

Public Interfaces

New TaskIdMetadata interface added to the public API:

Code Block
languagejava
titleTaskId
package org.apache.kafka.streams.processor;

/**
 * A basic wrapper class for task id metadata
 */
public interface TaskIdMetadata {
    /**
     * @return The ID of the topic group, ie the subtopology that this task executes
     */
    int topicGroupId();

    /**
     * @return The ID number of the partition.
     */
    int partition();
}

"New" internal TaskId class that implements the public TaskIdMetadata interface but is itself internal:

Code Block
languagejava
titleorg.apache.streams.processor.internals.TaskId
package org.apache.streams.processor.internals;

class TaskId implements TaskIdMetadata {
    // This is just the existing TaskId class & implementation, but
    // moved to the internals package
}


Deprecate the current public TaskId class:

Code Block
languagejava
titleorg.apache.kafka.streams.processor.TaskId
package org.apache.kafka.streams.processor;

/**
 * The task ID representation composed as topic group ID plus the assigned partition ID.
 */
@Deprecated
public class TaskId {
}

Deprecate TaskId getter on the StateStoreContext and ProcessorContext interfaces, and replace with a getter that returns TaskIdMetadata:

Code Block
languagejava
titleTaskId
public interface ProcessorContext {
    /**
     * Returns the task id metadata.
     *
     * @return the task id metadata
     */
    TaskIdMetadata taskIdMetadata();
    
    /**
     * Returns the task id.
     *
     * @return the task id
     * @deprecated use {@link #taskIdMetadata()} instead.
     */
    @Deprecated
    TaskId taskId();
}

public interface StateStoreContext {
    /**
     * Returns the task id metadata.
     *
     * @return the task id metadata
     */
    TaskIdMetadata taskIdMetadata();
    
    /**
     * Returns the task id.
     *
     * @return the task id
     * @deprecated use {@link #taskIdMetadata()} instead.
     */
    @Deprecated
    TaskId taskId();
}

Deprecate the taskId() getter that returns the task id as a string, and add an API to return the actual TaskIdMetadata object:

Code Block
languagejava
titleTaskMetadata
package org.apache.kafka.streams.processor;

class TaskMetadata {

    private final TaskIdMetadata taskId;

    /**
     * @return the TaskId with additional task metadata such as partition and group id
     */
    public TaskIdMetadata id() {
        return taskIdMetadata;
    }

    /**
     * @return a string representing the TaskId metadata such as partition and group id
     * @deprecated please use {@link #id()} instead.
     */
    @Deprecated
    public String taskId() {
        return taskId.toString();
    }
}

Proposed Changes

To migrate separate the implementation from the String to TaskIdMetadatathe public API, we'll need to deprecate the existing getter method and add a new one.o.a.k.streams.processor.TaskId class and replace it with a public interface o.a.k.streams.processor.TaskIdMetadata. The existing TaskId class implementation will be moved to a new o.a.k.streams.processor.internals.TaskId class, which will implement the added TaskIdMetadata interface.

WeSince the actual existing TaskId class is not really a public API, and currently has a handful of public utility methods, we don't want to explicitly pull this into into the public API. Instead, we'll introduce a new TaskIdMetadata interface in the public API that's implemented by the current TaskId class, and move the TaskId class to an internal package to clarify that it's not a public.

To migrate from the String to TaskIdMetadata in TaskMetadata, we'll need to deprecate the existing taskId() getter method and add a taskIdMetadata() getter in its place. Similarly, we will need to deprecate the existing taskId() getters on the ProcessorContext and StateStoreContext interfaces, and replace them with taskIdMetadata() getters that return the new public interface instead of the now-deprecated class.

Compatibility, Deprecation, and Migration Plan

The TaskMetadata#taskId(), ProcessorContext#taskId() method , and StateStoreContext#taskId() methods will be deprecated and removed in a future release. The entire o.a.k.streams.processor.TaskId class will also be deprecated and later removed.

Rejected Alternatives

N/A

...