You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 7 Next »

Status

Current stateUnder Discussion

Discussion thread: here 

JIRA: Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

The TaskMetadata class in KafkaStreams is used for encoding information about a particular task, such as its offsets, topic partitions, and taskId. For some reason, the taskId is stored and exposed as a String object, rather than using the actual TaskId class. We should move towards returning this as a TaskId object, since that's literally what it is, and because if/when we add additional fields to the TaskId it will become more and more unwieldy to parse the string encoding in order to extract the actual information about the task.

At the same time, the existing TaskId class is a bit awkward, as it's clearly intended to be a simple metadata class to expose the topicGroupId (subtopology) and partition number fields. Yet it has plenty of totally irrelevant (to a user) utility methods, such as de/serialization. It also has completely public fields, and no actual getters for them. We should take this opportunity to migrate to a new public interface with only the getters for the relevant fields, and move all the implementation details and utilities to an internal implementation class.

Public Interfaces

New TaskInfo interface added to the public API:

TaskId
package org.apache.kafka.streams;

/**
 * A wrapper class for basic task info
 */
public interface TaskInfo {
    /**
     * @return The ID of the subtopology that this task executes, ie the topicGroupId
     */
    int subtopologyId();

    /**
     * @return The ID number of the partition.
     */
    int partition();
}

"New" internal TaskIdImpl class that implements the public TaskId interface:

TaskIdImpl
package org.apache.streams.processor.internals;

class TaskId implements TaskInfo {
    // This is just the existing TaskId class & implementation, but
    // moved to the internals package
}

Deprecate the current public TaskId class:

org.apache.kafka.streams.processor.TaskId
package org.apache.kafka.streams.processor;

/**
 * The task ID representation composed as topic group ID plus the assigned partition ID.
 */
@Deprecated
public class TaskId {
}

Deprecate TaskId getter on the StateStoreContext and ProcessorContext interfaces, and replace with a getter that returns the TaskInfo interface:

public interface ProcessorContext {

    /**
     * Returns the task info.
     *
     * @return the task info
     */
    TaskInfo taskInfo();

    /**
     * Returns the task id.
     *
     * @return the task id
     * @deprecated use {@link #taskInfo()} instead.
     */
    @Deprecated
    TaskId taskId();
}

public interface StateStoreContext {
    /**
     * Returns the task info.
     *
     * @return the task info
     */
    TaskInfo taskInfo();
    
    /**
     * Returns the task id.
     *
     * @return the task id
     * @deprecated use {@link #taskInfo()} instead.
     */
    @Deprecated
    TaskId taskId();
}

Deprecate the taskId() getter that returns the task id as a string, and add an API to return the actual TaskInfo:

TaskMetadata
package org.apache.kafka.streams.processor;

class TaskMetadata {

    private final TaskId taskId;

    /**
     * @return the basic task metadata such as subtopology and partition id
     */
    public TaskInfo taskInfo() {
        return taskId;
    }

    /**
     * @return the basic task metadata such as subtopology and partition id
     * @deprecated please use {@link #taskInfo()} instead.
     */
    @Deprecated
    public String taskId() {
        return taskId.toString();
    }
}

Proposed Changes

To separate the implementation from the the public API, we'll deprecate the existing o.a.k.streams.processor.TaskId class and replace it with a public interface TaskInfo. The existing TaskId class implementation will be moved to a new o.a.k.streams.processor.internals.TaskId class, which will implement the TaskInfo interface.

To migrate from the String to TaskInfo in TaskMetadata, we'll need to deprecate the existing taskId() getter method and add a taskInfo() getter in its place. Similarly, we will need to deprecate the existing taskId() getters on the ProcessorContext and StateStoreContext interfaces, and replace them with taskInfo() getters that return the new public interface instead of the now-deprecated class.

Compatibility, Deprecation, and Migration Plan

The TaskMetadata#taskId(), ProcessorContext#taskId(), and StateStoreContext#taskId() methods will be deprecated and removed in a future release. The entire o.a.k.streams.processor.TaskId class will also be deprecated and later removed.

Rejected Alternatives

N/A


  • No labels