Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

...

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

Motivation

The TaskMetadata class in KafkaStreams is used for encoding information about a particular task, such as its offsets, topic partitions, and taskId. For some reason, the taskId is stored and exposed as a String object, rather than using the actual TaskId class. We should move towards returning this as a TaskId object, since that's literally what it is, and because if/when we add additional fields to the TaskId it will become more and more unwieldy to parse the string encoding in order to extract the actual information about the task.

For example, we are currently working on a feature that will make Kafka Streams more resilient to changes in independent modules of the topology, in order to enable some kinds of topological upgrades. This feature requires adding an additional "namedTopology" field to the TaskId, but as we are still in the experimental and design phase, all APIs for this feature must be internal until we feel it's ready for a KIP and a solidified API. However, we do want to be able to access this information in some way before this KIP is accepted, as part of the reason we are rolling in out in phases like this is so we can do stringent testing beyond just unit/integration tests. Therefore we need some way to access new fields without adding them as public APIs, for example by using reflection on a TaskId object.

At the same time, throughout this unexpectedly long and complicated analysis, it came to our attention how awkward the existing TaskId class is a bit awkward, as itfor a public API. It's clearly intended to be a simple metadata class to expose the topicGroupId (subtopology) and partition number fields. Yet , yet it has plenty of totally irrelevant (to a user) utility methods , such as de/serialization. It also has completely public fields, and no actual getters for theminstead of exposing the fields via getters it just makes them public. We should take this opportunity to migrate to a new public interface with only the getters for the relevant fields, and move all the implementation details and utilities clean up the TaskId class and move some of those inappropriate methods to an internal implementation utility class.

Public Interfaces

New TaskIdInfo interface added to the public APIClean up the public TaskId class by deprecating inappropriate APIs, and replacing topicGroupId with the more common and useful term "subtopology":

package
Code Block
languagejava
titleTaskId
org.apache.kafka.streams.processor.TaskId
;

/**
 * A wrapper class for basic taskId info The task ID representation composed as subtopology (aka topic group ID) plus the assigned partition ID.
 */
public interfaceclass TaskIdInfoTaskId {

    /**
     * @return The ID of the subtopology, thataka this task executes, ie the topicGroupIdtopicGroupId. */
    @Deprecated
     */
   public final int subtopologyId()topicGroupId;

    /**
     * @return The ID number of the partition. */
    @Deprecated
 */
   public final int partition();
}

"New" internal TaskIdImpl class that implements the public TaskId interface:

Code Block
languagejava
titleTaskIdImpl
package org.apache.streams.processor.internals;

class TaskId implements TaskIdInfo {
public int   // This is just the existing TaskId class & implementation, butsubtopology() {
        return topicGroupId;
    // moved to the internals package
}

Deprecate the current public TaskId class:

Code Block
languagejava
titleorg.apache.kafka.streams.processor.TaskId
package org.apache.kafka.streams.processor;

/**
 * The task ID representation composed as topic group ID plus the assigned partition ID.
 */
@Deprecated
public class TaskId {
}

Deprecate TaskId getter on the StateStoreContext and ProcessorContext interfaces, and replace with a getter that returns the TaskIdInfo interface:

Code Block
languagejava
public interface ProcessorContext {}

    public int partition() {
        return partition;
    }

    /**
     * Returns@deprecated the task id.
     *since 3.0, for internal use, will be removed
     */
 @return the task id@Deprecated
    public  */
    TaskIdInfo TaskIdInfo(void writeTo(final DataOutputStream out, final int version);

    /**
     * Returns@deprecated the task id.
     *since 3.0, for internal use, will be removed
     */
 @return the task id@Deprecated
    public *static @deprecated use {@link #TaskIdInfo()} instead.
     */
    @Deprecated
    TaskId taskId();
}

public interface StateStoreContext {
    TaskId readFrom(final DataInputStream in, final int version);

    /**
     * Returns@deprecated thesince task id.
     *3.0, for internal use, will be removed
     */
 @return the task id@Deprecated
     */
    TaskIdInfo TaskIdInfo();
    public void writeTo(final ByteBuffer buf, final int version);

    /**
     * Returns@deprecated thesince task id.
     *3.0, for internal use, will be removed
     */
 @return the task id@Deprecated
    public *static @deprecated use {@link #TaskIdInfo()} instead.
     */
    @Deprecated
    TaskId taskId(TaskId readFrom(final ByteBuffer buf, final int version);
}

Deprecate the taskId() getter that returns the task id as a string, and add an API to return the actual TaskIdInfoTaskId:

Code Block
languagejava
titleTaskMetadata
package org.apache.kafka.streams.processor;

class TaskMetadata {

    private final TaskId taskId;

    /**
     * @return the basic task metadata such as subtopology and partition id
     */
    public TaskIdInfoTaskId TaskIdInfogetTaskId() {
        return taskId;
    }

    /**
     * @return the basic task metadata such as subtopology and partition id
     * @deprecated please use {@link #TaskIdInfo#getTaskId()} instead.
     */
    @Deprecated
    public String taskId() {
        return taskId.toString();
    }
}

Note that since the name taskId() was already taken, we have to choose a different name for the getter on TaskMetadata, namely getTaskId(). Since this is outside the Kafka standards (which does not use the 'get' prefix for getters), we will actually migrate back to the plain taskId() getter later – once the deprecation period has elapsed, instead of removing the deprecated `String taskId()` we will replace it with a new `TaskId taskId()` API and then deprecate the temporary getTaskId().

Proposed Changes

To separate the implementation from the the public API, we 'll deprecate the existing o.a.k.streams.processor.TaskId class and replace it with a public interface TaskIdInfo. The existing TaskId class implementation will be moved to a new o.a.k.streams.processor.internals.TaskId class, which will implement the TaskIdInfo interfacewill deprecate the various functional methods on the TaskId class and move their implementation to an internal utility class. Similarly, we will deprecate the public fields and introduce getters for those fields in their place. Those fields will be made private once the deprecation period has passed.

To migrate from the String to TaskIdInfo in TaskMetadata, we'll need to deprecate the existing taskId() getter method and add a TaskIdInfoTaskId() getter in its place. Similarly, we will need to deprecate the existing taskId() getters on the ProcessorContext and StateStoreContext interfaces, and replace them with TaskIdInfo() getters that return the new public interface instead of the now-deprecated class. Unfortunately the appropriate name for this getter in the Kafka standard is just `taskId()` – which is already taken. We plan to introduce a temporary getter called `getTaskId()` for the deprecation cycle of the `String taskId()` API, and then once that is over we can migrate back to a `TaskId taskId()` getter and then deprecate the temporary getter. It's a bit awkward, but we feel it's better to leave things in the ideal state than to go out of the way to avoid deprecations, especially when it's just one relatively-uncommon method.

Compatibility, Deprecation, and Migration Plan

The TaskMetadata#taskId(), ProcessorContext#taskId(), and StateStoreContext#taskId() methods will TaskId readFrom/writeTo methods and public topicGroupId fields will all be deprecated and removed in a future release. The entire o.a.k.streams.processor.TaskId class The TaskMetadata.taskId() method will also be deprecated and later removed.

Rejected Alternatives

. However, once enough time has passed for it to be removed, we will instead replace it with a new method signature that returns a TaskId instead of a String, and then deprecate the temporary TaskMetadata.getTaskId() API introduced in this KIP.

Rejected Alternatives

Quite a few alternatives were discussed during this seemingly simple KIP. At a high level, they either involved restructuring the existing TaskId class into a hierarchy, or removing any form of TaskId from the API altogether and just decompose any APIs into separate  methods for each of its fields. The former was rejected because it would mean that users could not rely on fundamental public contracts like the equals(), toString(), and compareTo() APIs, as they would become implementation details in the internal subclass/implementing class and therefore without public contract. The latter was rejected because the TaskId is and has been a fundamental concept in Kafka Streams, and something we regularly explain to users as a first-class citizen with a specific string representation that may help them make sense of anything from the logs to the local state and directory structure. Also, we feel that the taskid is a natural key on the task space of Kafka Streams and a natural way to think about the tasks themselves, more than just a simple data container class.N/A