Table of Contents |
---|
Status
Current state: Under DiscussionAdopted (3.0)
Discussion thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
The TaskMetadata class in KafkaStreams is used for encoding information about a particular task, such as its offsets, topic partitions, and taskId. For some reason, the taskId is stored and exposed as a String object, rather than using the actual TaskId class. We should move towards using TaskId herereturning this as a TaskId object, since that's literally what it is, and because if/when we add additional fields to the TaskId it will become more and more unwieldy to parse the string encoding in order to extract the actual information about the task.
For example, we are currently working on a feature that will make Kafka Streams more resilient to changes in independent modules of the topology, in order to enable some kinds of topological upgrades. This feature requires adding an additional "namedTopology" field to the TaskId, but as we are still in the experimental and design phase, all APIs for this feature must be internal until we feel it's ready for a KIP and a solidified API. However, we do want to be able to access this information in some way before this KIP is accepted, as part of the reason we are rolling in out in phases like this is so we can do stringent testing beyond just unit/integration tests. Therefore we need some way to access new fields without adding them as public APIs, for example by using reflection on a TaskId object.
Public Interfaces
At the same time, throughout this unexpectedly long and complicated analysis, it came to our attention how awkward the existing TaskId class is for a public API. It's clearly intended to be a simple metadata class to expose the topicGroupId (subtopology) and partition number, yet it has plenty of totally irrelevant (to a user) utility methods such as de/serialization, and instead of exposing the fields via getters it just makes them public. We should take this opportunity to clean up the TaskId class and move some of those inappropriate methods to an internal utility class.
Public Interfaces
Clean up the public TaskId class by deprecating inappropriate APIs, introducing the required new ones, and replacing topicGroupId with the more common and useful term "subtopology"New TaskIdMetadata interface added to the public API:
Code Block | ||||||
---|---|---|---|---|---|---|
| package
| |||||
; /** * A basic wrapper class for task id metadataThe task ID representation composed as subtopology (aka topic group ID) plus the assigned partition ID. */ public interfaceclass TaskIdMetadataTaskId { /** The ID of the subtopology, aka topicGroupId. */ @Deprecated public final int topicGroupId; /* @return* The ID of the topic group, ie the subtopology that this task executes */ int topicGroupIdpartition. */ @Deprecated public final int partition; public int subtopology(); public int partition(); /** * @returnExperimental Thefeature ID-- numberwill of the partition.return null */ intpublic String partition(); } |
"New" internal TaskId class that implements the public TaskIdMetadata interface:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.streams.processor.internalstopologyName(); class TaskId implements TaskIdMetadata {/** // This* is@deprecated just the existing TaskId class & implementation, butsince 3.0, for internal use, will be removed // moved to the internals package } |
Deprecate the current public TaskId class:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.processor;
/**
* The task ID representation composed as topic group ID plus the assigned partition ID.
*/
@Deprecated
public class TaskId {
} |
Deprecate TaskId getter on the StateStoreContext and ProcessorContext interfaces, and replace with a getter that returns TaskIdMetadata:
Code Block | ||||
---|---|---|---|---|
| ||||
public interface ProcessorContext { */ @Deprecated public void writeTo(final DataOutputStream out, final int version); /** * @deprecated since 3.0, for internal use, will be removed */ @Deprecated public static TaskId readFrom(final DataInputStream in, final int version); /** * Returns@deprecated the task id metadata. *since 3.0, for internal use, will be removed */ @return the task id@Deprecated metadata public */ TaskIdMetadata taskIdMetadata(void writeTo(final ByteBuffer buf, final int version); /** /*** @deprecated since 3.0, for internal use, will be removed */ Returns the task id.@Deprecated public * static TaskId readFrom(final ByteBuffer buf, *final int version); } |
Deprecate the taskId() getter that returns the task id as a string and add an API to return the actual TaskId, also deprecate the string constructor and replace with one that accepts a TaskId:
Code Block | ||||
---|---|---|---|---|
| ||||
class TaskMetadata { /**@return the task id * @deprecated use {@link #taskIdMetadata()} instead. since 3.0, not intended for public use */ @Deprecated TaskIdpublic taskId(); } public interface StateStoreContext {TaskMetadata(final String taskId, /** * Returns the task id metadata. * *final @returnSet<TopicPartition> thetopicPartitions, task id metadata */ TaskIdMetadata taskIdMetadata(); /** final Map<TopicPartition, *Long> ReturnscommittedOffsets, the task id. * * @return the task id * @deprecated usefinal {@link #taskIdMetadata()} instead. Map<TopicPartition, Long> endOffsets, */ @Deprecated TaskId taskId(); } |
Deprecate the taskId() getter that returns the task id as a string, and add an API to return the actual TaskIdMetadata object:
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.streams.processor; class TaskMetadata { private final TaskIdMetadataOptional<Long> taskIdtimeCurrentIdlingStarted); /** * @return the TaskId with additionalbasic task metadata such as partitionsubtopology and grouppartition id */ public TaskIdMetadataTaskId idgetTaskId() { return taskIdMetadatataskId; } /** * @return athe stringbasic representingtask the TaskId metadata such as partitionsubtopology and grouppartition id * @deprecated please use {@link #id#getTaskId()} instead. */ @Deprecated public String taskId() { return taskId.toString(); } } |
Note that since the name taskId() was already taken, we have to choose a different name for the getter on TaskMetadata, namely getTaskId(). Since this is outside the Kafka standards (which does not use the 'get' prefix for getters), we will actually migrate back to the plain taskId() getter later – once the deprecation period has elapsed, instead of removing the deprecated `String taskId()` we will replace it with a new `TaskId taskId()` API and then deprecate the temporary getTaskId().
Proposed Changes
To separate the implementation from the the public API, we 'll deprecate the existing o.a.k.streams.processor.TaskId class and replace it with a public interface o.a.k.streams.processor.TaskIdMetadata. The existing TaskId class implementation will be moved to a new o.a.k.streams.processor.internals.TaskId class, which will implement the added TaskIdMetadata interface.We'll introduce a new TaskIdMetadata interface in the public API that's implemented by the current TaskId class, and move the TaskId class to an internal package to clarify that it's not a publicwill deprecate the various functional methods on the TaskId class and move their implementation to an internal utility class. Similarly, we will deprecate the public fields and introduce getters for those fields in their place. Those fields will be made private once the deprecation period has passed.
To migrate from the String to TaskIdMetadata in TaskId in TaskMetadata, we'll need to deprecate the existing taskId() getter method and add a taskIdMetadata() TaskId getter in its place. Similarly, we will need to deprecate the existing taskId() getters on the ProcessorContext and StateStoreContext interfaces, and replace them with taskIdMetadata() getters that return the new public interface instead of the now-deprecated class. Unfortunately the appropriate name for this getter in the Kafka standard is just `taskId()` – which is already taken. We plan to introduce a temporary getter called `getTaskId()` for the deprecation cycle of the `String taskId()` API, and then once that is over we can migrate back to a `TaskId taskId()` getter and then deprecate the temporary getter. It's a bit awkward, but we feel it's better to leave things in the ideal state than to go out of the way to avoid deprecations, especially when it's just one relatively-uncommon method.
Compatibility, Deprecation, and Migration Plan
The TaskMetadata#taskId(), ProcessorContext#taskId(), and StateStoreContext#taskId() methods will TaskId readFrom/writeTo methods and public topicGroupId fields will all be deprecated and removed in a future release. The entire o.a.k.streams.processor.TaskId class The TaskMetadata.taskId() method will also be deprecated and later removed.
Rejected Alternatives
. However, once enough time has passed for it to be removed, we will instead replace it with a new method signature that returns a TaskId instead of a String, and then deprecate the temporary TaskMetadata.getTaskId() API introduced in this KIP.
Rejected Alternatives
Quite a few alternatives were discussed during this seemingly simple KIP. At a high level, they either involved restructuring the existing TaskId class into a hierarchy, or removing any form of TaskId from the API altogether and just decompose any APIs into separate methods for each of its fields. The former was rejected because it would mean that users could not rely on fundamental public contracts like the equals(), toString(), and compareTo() APIs, as they would become implementation details in the internal subclass/implementing class and therefore without public contract. The latter was rejected because the TaskId is and has been a fundamental concept in Kafka Streams, and something we regularly explain to users as a first-class citizen with a specific string representation that may help them make sense of anything from the logs to the local state and directory structure. Also, we feel that the taskid is a natural key on the task space of Kafka Streams and a natural way to think about the tasks themselves, more than just a simple data container class.N/A