Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAdopted (3.0)

Discussion thread: here 

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-12779

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast). 

Motivation

The TaskMetadata class in KafkaStreams is used for encoding information about a particular task, such as its offsets, topic partitions, and taskId. For some reason, the taskId is stored and exposed as a String object, rather than using the actual TaskId class. We should move towards using TaskId herereturning this as a TaskId object, since that's literally what it is, and because if/when we add additional fields to the TaskId it will become more and more unwieldy to parse the string encoding in order to extract the actual information about the task.

For example, we are currently working on a feature that will make Kafka Streams more resilient to changes in independent modules of the topology, in order to enable some kinds of topological upgrades. This feature requires adding an additional "namedTopology" field to the TaskId, but as we are still in the experimental and design phase, all APIs for this feature must be internal until we feel it's ready for a KIP and a solidified API. However, we do want to be able to access this information in some way before this KIP is accepted, as part of the reason we are rolling in out in phases like this is so we can do stringent testing beyond just unit/integration tests. Therefore we need some way to access new fields without adding them as public APIs, for example by using reflection on a TaskId object.

Public Interfaces

At the same time, throughout this unexpectedly long and complicated analysis, it came to our attention how awkward the existing TaskId class is for a public API. It's clearly intended to be a simple metadata class to expose the topicGroupId (subtopology) and partition number, yet it has plenty of totally irrelevant (to a user) utility methods such as de/serialization, and instead of exposing the fields via getters it just makes them public. We should take this opportunity to clean up the TaskId class and move some of those inappropriate methods to an internal utility class.

Public Interfaces

Clean up the public TaskId class by deprecating inappropriate APIs, introducing the required new ones, and replacing topicGroupId with the more common and useful term "subtopology"New TaskIdMetadata interface added to the public API:

package
Code Block
TaskId
languagejava
title
org.apache.kafka.streams.processor.TaskId
;

/**
 * A basic wrapper class for task id metadataThe task ID representation composed as subtopology (aka topic group ID) plus the assigned partition ID.
 */
public interfaceclass TaskIdMetadataTaskId {

    /** The ID of the subtopology, aka topicGroupId. */
    @Deprecated
    public final int topicGroupId;

    /* @return* The ID of the topic group, ie the subtopology that this task executes
     */
    int topicGroupIdpartition. */
    @Deprecated
    public final int partition;

    public int subtopology();

    public int partition();

	/**
     * @returnExperimental Thefeature ID-- numberwill of the partition.return null
     */
    intpublic String partition();
}

"New" internal TaskId class that implements the public TaskIdMetadata interface:

Code Block
languagejava
titleorg.apache.streams.processor.internals.TaskId
package org.apache.streams.processor.internalstopologyName();

class TaskId implements TaskIdMetadata {/**
    // This* is@deprecated just the existing TaskId class & implementation, butsince 3.0, for internal use, will be removed
    // moved to the internals package
}

Deprecate the current public TaskId class:

Code Block
languagejava
titleorg.apache.kafka.streams.processor.TaskId
package org.apache.kafka.streams.processor;

/**
 * The task ID representation composed as topic group ID plus the assigned partition ID.
 */
@Deprecated
public class TaskId {
}

Deprecate TaskId getter on the StateStoreContext and ProcessorContext interfaces, and replace with a getter that returns TaskIdMetadata:

Code Block
languagejava
titleTaskId
public interface ProcessorContext { */
    @Deprecated
    public void writeTo(final DataOutputStream out, final int version);

    /**
     * @deprecated since 3.0, for internal use, will be removed
     */
    @Deprecated
    public static TaskId readFrom(final DataInputStream in, final int version);

    /**
     * Returns@deprecated the task id metadata.
     *since 3.0, for internal use, will be removed
     */
 @return the task id@Deprecated
 metadata
   public  */
    TaskIdMetadata taskIdMetadata(void writeTo(final ByteBuffer buf, final int version);

    /**
     /*** @deprecated since 3.0, for internal use, will be removed
     */
 Returns the task id.@Deprecated
    public *
static TaskId readFrom(final ByteBuffer buf, *final int version);
}

Deprecate the taskId() getter that returns the task id as a string and add an API to return the actual TaskId, also deprecate the string constructor and replace with one that accepts a TaskId:

Code Block
languagejava
titleTaskMetadata
class TaskMetadata {

	/**@return the task id
     * @deprecated use {@link #taskIdMetadata()} instead. since 3.0, not intended for public use
     */
    @Deprecated
    TaskIdpublic taskId();
}

public interface StateStoreContext {TaskMetadata(final String taskId,
    /**
     * Returns the task id metadata.
     *
     *final @returnSet<TopicPartition> thetopicPartitions,
 task id metadata
     */
    TaskIdMetadata taskIdMetadata();
    
    /**
   final Map<TopicPartition, *Long> ReturnscommittedOffsets,
 the task id.
     *
     * @return the task id
     * @deprecated usefinal {@link #taskIdMetadata()} instead.
Map<TopicPartition, Long> endOffsets,
       */
    @Deprecated
    TaskId taskId();
}

Deprecate the taskId() getter that returns the task id as a string, and add an API to return the actual TaskIdMetadata object:

Code Block
languagejava
titleTaskMetadata
package org.apache.kafka.streams.processor;

class TaskMetadata {

    private final TaskIdMetadataOptional<Long> taskIdtimeCurrentIdlingStarted);

    /**
     * @return the TaskId with additionalbasic task metadata such as partitionsubtopology and grouppartition id
     */
    public TaskIdMetadataTaskId idgetTaskId() {
        return taskIdMetadatataskId;
    }

    /**
     * @return athe stringbasic representingtask the TaskId metadata such as partitionsubtopology and grouppartition id
     * @deprecated please use {@link #id#getTaskId()} instead.
     */
    @Deprecated
    public String taskId() {
        return taskId.toString();
    }
}

Note that since the name taskId() was already taken, we have to choose a different name for the getter on TaskMetadata, namely getTaskId(). Since this is outside the Kafka standards (which does not use the 'get' prefix for getters), we will actually migrate back to the plain taskId() getter later – once the deprecation period has elapsed, instead of removing the deprecated `String taskId()` we will replace it with a new `TaskId taskId()` API and then deprecate the temporary getTaskId().

Proposed Changes

To separate the implementation from the the public API, we 'll deprecate the existing o.a.k.streams.processor.TaskId class and replace it with a public interface o.a.k.streams.processor.TaskIdMetadata. The existing TaskId class implementation will be moved to a new o.a.k.streams.processor.internals.TaskId class, which will implement the added TaskIdMetadata interface.We'll introduce a new TaskIdMetadata interface in the public API that's implemented by the current TaskId class, and move the TaskId class to an internal package to clarify that it's not a publicwill deprecate the various functional methods on the TaskId class and move their implementation to an internal utility class. Similarly, we will deprecate the public fields and introduce getters for those fields in their place. Those fields will be made private once the deprecation period has passed.

To migrate from the String to TaskIdMetadata in TaskId in TaskMetadata, we'll need to deprecate the existing taskId() getter method and add a taskIdMetadata() TaskId getter in its place. Similarly, we will need to deprecate the existing taskId() getters on the ProcessorContext and StateStoreContext interfaces, and replace them with taskIdMetadata() getters that return the new public interface instead of the now-deprecated class. Unfortunately the appropriate name for this getter in the Kafka standard is just `taskId()` – which is already taken. We plan to introduce a temporary getter called `getTaskId()` for the deprecation cycle of the `String taskId()` API, and then once that is over we can migrate back to a `TaskId taskId()` getter and then deprecate the temporary getter. It's a bit awkward, but we feel it's better to leave things in the ideal state than to go out of the way to avoid deprecations, especially when it's just one relatively-uncommon method.

Compatibility, Deprecation, and Migration Plan

The TaskMetadata#taskId(), ProcessorContext#taskId(), and StateStoreContext#taskId() methods will TaskId readFrom/writeTo methods and public topicGroupId fields will all be deprecated and removed in a future release. The entire o.a.k.streams.processor.TaskId class The TaskMetadata.taskId() method will also be deprecated and later removed.

Rejected Alternatives

. However, once enough time has passed for it to be removed, we will instead replace it with a new method signature that returns a TaskId instead of a String, and then deprecate the temporary TaskMetadata.getTaskId() API introduced in this KIP.

Rejected Alternatives

Quite a few alternatives were discussed during this seemingly simple KIP. At a high level, they either involved restructuring the existing TaskId class into a hierarchy, or removing any form of TaskId from the API altogether and just decompose any APIs into separate  methods for each of its fields. The former was rejected because it would mean that users could not rely on fundamental public contracts like the equals(), toString(), and compareTo() APIs, as they would become implementation details in the internal subclass/implementing class and therefore without public contract. The latter was rejected because the TaskId is and has been a fundamental concept in Kafka Streams, and something we regularly explain to users as a first-class citizen with a specific string representation that may help them make sense of anything from the logs to the local state and directory structure. Also, we feel that the taskid is a natural key on the task space of Kafka Streams and a natural way to think about the tasks themselves, more than just a simple data container class.N/A