Table of Contents |
---|
Status
Current state: Under DiscussionAdopted
Discussion thread: https://lists.apache.org/x/thread.html/r1d20fb6dbd6b01bb84cbb17e992f4d08308980dfc5f2e0a68d674413@%3Cdev.kafka.apache.org%3E
...
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
This KIP could be seen as an spin-off of KIP-740: Clean up public API in TaskId and fix TaskMetadata#taskId(). During the one of the Pull Requests that were created to implement the aforementioned KIP, it became apparent that TaskMetadata was never a class that the user would ever need to instantiate, but rather an API for exposing metadata. For this reason, it would make sense to separate the public interface from the internal implementation, hence this proposal to create the TaskMetadata interface. See https://github.com/apache/kafka/pull/10755#discussion_r639338584 for further context on this topic.
Within this KIP discussion, it was also pointed out that ThreadMetadata shares the same problems as TaskMetadata. Hence, this KIP proposes, as well, to convert ThreadMetadata into an interface and keep the implementation as an internal class. The reasoning behind this is the same as with the previous class, ThreadMetadata was never meant to be instantiated by end users, but was rather meant as an API.
Also it is important to notice that the goal of this KIP needs to still be consistent with that of KIP-740, which was to not expose TaskID as a String in the public APIs.Also it is important to notice that the goal of this KIP needs to still be consistent with that of KIP-740, which was to not expose TaskID as a String in the public APIs.
For reasons described in further detail under "Public Interfaces" this KIP should be part of release 3.0.0, so we can keep the APIs consistent without the need to wait for a full deprecation cycle.
Public Interfaces
Create a new Interface named TaskMetadata under package org.apache.kafka.streams, which will contain all currently non deprecated methods:
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/**
* Represents the state of a single task running within a {@link KafkaStreams} application.
*/
public interface TaskMetadata {
/**
* @return the basic task metadata such as subtopology and partition id
*/
TaskId taskId();
/**
* This function will return a set of the current TopicPartitions
*/
Set<TopicPartition> topicPartitions();
/**
* This function will return a map of TopicPartitions and the highest committed offset seen so far
*/
Map<TopicPartition, Long> committedOffsets();
/**
* This function will return a map of TopicPartitions and the highest offset seen so far in the Topic
*/
Map<TopicPartition, Long> endOffsets();
/**
* This function will return the time task idling started, if the task is not currently idling it will return empty
*/
Optional<Long> timeCurrentIdlingStarted();
} |
Additionally, TaskMetadata under org.apache.kafka.streams.processor, will be deprecated:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * RepresentsCompares the statespecified ofobject awith singlethis taskTaskMetadata. runningReturns within a {@link@code KafkaStreams} application. * @deprecated since 3.0, use {@link org.apache.kafka.streams.TaskMetadata} instead */ @Deprecated public class TaskMetadata { ... } |
Furthermore, the method getTaskId newly introduced in KIP-740 (but not yet released) will be removed, as it won't be needed any more. For this reason, it makes sense to include this KIP in release 3.0.0 so we can avoid a full cycle of deprecation for this newly introduced method.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
// This method will be removed in this KIP
/**
* @return the basic task metadata such as subtopology and partition id
*/
public TaskId getTaskId() {
return taskId;
} |
There is one additional change that needs to be performed in ThreadMetadata. Currently, this class uses the newly deprecated TaskMetada class in its constructor and it's returned in activeTasks() and standbyTasks() methods. We should deprecate those and add new methods and constructor referencing the new Interface:
true} if and only if the specified object is
* also a TaskMetadata and both {@code taskId()} and {@code topicPartitions()} are equal.
*/
boolean equals(final Object o);
/**
* Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
* <pre>
* {@code
* Objects.hash(taskId(), topicPartitions());
* }
* </pre>
*/
int hashCode();
} |
Additionally, TaskMetadata under org.apache.kafka.streams.processor, will be deprecated:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
Code Block | ||||||
| ||||||
import org.apache.kafka.streams.TaskMetadata; public class ThreadMetadata { .... /** /** * Represents the state of a single task running within a {@link KafkaStreams} application. * @deprecated since 3.0, use {@link #ThreadMetadata(String, String, String, String, Set, Set, Set, String)org.apache.kafka.streams.TaskMetadata} instead */ @Deprecated public ThreadMetadata(finalclass StringTaskMetadata threadName, final String threadState, final String mainConsumerClientId, final String restoreConsumerClientId, final Set<String> producerClientIds, final String adminClientId, final Set<org{ ... } |
Furthermore, the method getTaskId newly introduced in KIP-740 (but not yet released) will be removed, as it won't be needed any more. For this reason, it makes sense to include this KIP in release 3.0.0 so we can avoid a full cycle of deprecation for this newly introduced method.
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
// This method will be removed in this KIP
/**
* @return the basic task metadata such as subtopology and partition id
*/
public TaskId getTaskId() {
return taskId;
} |
ThreadMetadata class under org.apache.kafka.streams.processor will be deprecated:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**TaskMetadata> activeTasks, * Represents the state of a single thread running within a {@link KafkaStreams} application. * @deprecated since 3.0 use final Set<org{@link org.apache.kafka.streams.ThreadMetadata} instead */ @Deprecated public class ThreadMetadata { ... } |
A new interface named ThreadMetadata will be introduced under org.apache.kafka.streams:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Represents the state of a single thread running within a {@link KafkaStreams} application. */ public interface ThreadMetadata { /** * @return the state of the Threadprocessor.TaskMetadata> standbyTasks) { this.mainConsumerClientId = mainConsumerClientId; this.restoreConsumerClientId = restoreConsumerClientId; this.producerClientIds = producerClientIds; this.adminClientId = adminClientId; this.threadName = threadName; */ this.threadState =String threadState(); /** this.activeTasks = Collections.unmodifiableSet(activeTasks.stream().map(metadata -> new TaskMetadataImpl(TaskId.parse(metadata.taskId()), * @return the name of the Thread */ String threadName(); metadata.topicPartitions(),/** * This function will return the set of the {@link metadata.committedOffsets(), TaskMetadata} for the current active tasks */ metadata.endOffsets(), metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet()));Set<TaskMetadata> activeTasks(); /** * This function will return the set of the {@link TaskMetadata} for the current standby tasks */ this.standbyTasks =Set<TaskMetadata> Collections.unmodifiableSet(standbyTasks.stream().map(metadata -> new TaskMetadataImpl(TaskId.parse(metadata.taskId()),; /** * @return the consumer Client Id metadata.topicPartitions(),*/ String consumerClientId(); /** * metadata.committedOffsets(), @return the restore consumer Client Id */ metadata.endOffsets(), String restoreConsumerClientId(); /** * This function will return the set metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet())); } of Client Ids for the producers public ThreadMetadata(final String threadName, */ Set<String> producerClientIds(); /** * @return the admin Client Id */ final String threadState,adminClientId(); /** * Compares the specified object with this ThreadMetadata. Returns {@code true} if and only if the specified final String mainConsumerClientId,object is * also a TaskMetadata and both {@code threadName()} are equal, {@code threadState()} are equal, {@code activeTasks()} contain the same final String* restoreConsumerClientIdelements, {@code standbyTasks()} contain the same elements, {@code mainConsumerClientId()} are equal, {@code restoreConsumerClientId()} * are equal, {@code producerClientIds()} are equal, {@code producerClientIds} contain finalthe Set<String>same producerClientIdselements, and {@code adminClientId()} are equal. */ boolean equals(Object o); /** * finalReturns Set<TaskMetadata>the activeTasks, hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation: final Set<TaskMetadata> standbyTasks,* <pre> * {@code * Objects.hash( * final String adminClientId) { threadName, * this.mainConsumerClientId = mainConsumerClientId; this.restoreConsumerClientId = restoreConsumerClientId;threadState, * this.producerClientIds = producerClientIds; activeTasks, this.adminClientId = adminClientId; * this.threadName = threadName; standbyTasks, this.threadState = threadState; * this.activeTasks = Collections.unmodifiableSet(activeTasks); mainConsumerClientId, * this.standbyTasks = Collections.unmodifiableSet(standbyTasks); }restoreConsumerClientId, public Set<TaskMetadata>* getActiveTasks() { return activeTasks; producerClientIds, } * public Set<TaskMetadata> getStandbyTasks() { return standbyTasksadminClientId); } * </**pre> */ @deprecated since 3.0 use {@link #getActiveTasksint hashCode(); } |
Additionally, KafkaStreams class needs to adapted to return the new ThreadMetadata interface, and have the old method (returning the old class) deprecated as follows:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/** * Returns runtime information about the local threads of this {@link KafkaStreams} instance. * * @return the set of {@link org */ @Deprecated public Set<org.apache.kafka.streams.processor.TaskMetadata> activeTasks() { return activeTasks.stream().map(metadata -> new org.apache.kafka.streams.processor.TaskMetadata(metadata.taskId().toString(),ThreadMetadata}. * @deprecated since 3.0 use {@link #threadsMetadata()} */ @Deprecated public Set<org.apache.kafka.streams.processor.ThreadMetadata> localThreadsMetadata() { metadata.topicPartitions(), return threadsMetadata().stream().map(threadMetadata -> new org.apache.kafka.streams.processor.ThreadMetadata( threadMetadata.threadName(), metadatathreadMetadata.committedOffsetsthreadState(), threadMetadata.consumerClientId(), metadatathreadMetadata.endOffsetsrestoreConsumerClientId(), metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet()); } /** * @deprecated since 3.0 use {@link #getStandbyTasks()}threadMetadata.producerClientIds(), */ @Deprecated public Set<org.apache.kafka.streams.processor.TaskMetadata> standbyTasks() { threadMetadata.adminClientId(), return standbyTasksthreadMetadata.activeTasks().stream().map(metadatataskMetadata -> new TaskMetadata( new orgtaskMetadata.apache.kafka.streams.processor.TaskMetadata(metadata.taskId().taskId().toString(), metadatataskMetadata.topicPartitions(), taskMetadata.committedOffsets(), metadatataskMetadata.committedOffsetsendOffsets(), taskMetadata.timeCurrentIdlingStarted()) metadata.endOffsets(), metadata.timeCurrentIdlingStarted())).collect(Collectors.toSet()); } .... } |
This last change could be avoided, if we allow ourselves to break source compatibility for this particular section. Otherwise, we need to go for a cycle of deprecation for activeTasks() and standbyTasks() for then, to move the non-normative getActiveTasks() and getStandbyTasks() back to the proper naming scheme.
Proposed Changes
).collect(Collectors.toSet()),
threadMetadata.standbyTasks().stream().map(taskMetadata -> new TaskMetadata(
taskMetadata.taskId().toString(),
taskMetadata.topicPartitions(),
taskMetadata.committedOffsets(),
taskMetadata.endOffsets(),
taskMetadata.timeCurrentIdlingStarted())
).collect(Collectors.toSet())))
.collect(Collectors.toSet());
}
/**
* Returns runtime information about the local threads of this {@link KafkaStreams} instance.
*
* @return the set of {@link ThreadMetadata}.
*/
public Set<ThreadMetadata> threadsMetadata() {
final Set<ThreadMetadata> threadMetadata = new HashSet<>();
processStreamThread(thread -> {
synchronized (thread.getStateLock()) {
if (thread.state() != StreamThread.State.DEAD) {
threadMetadata.add(thread.getThreadMetadata());
}
}
});
return threadMetadata;
} |
UPDATE
During the vote thread discussion it was requested if it would be possible to align this KIP with
Jira | ||||||
---|---|---|---|---|---|---|
|
The first one is converting StreamsMetadata into an interface under org.apache.kafka.streams with an internal implementation:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public interface StreamsMetadata {
/**
* The value of {@link StreamsConfig#APPLICATION_SERVER_CONFIG} configured for the streams
* instance, which is typically host/port
*
* @return {@link HostInfo} corresponding to the streams instance
*/
HostInfo hostInfo();
/**
* State stores owned by the instance as an active replica
*
* @return set of active state store names
*/
Set<String> stateStoreNames();
/**
* Topic partitions consumed by the instance as an active replica
*
* @return set of active topic partitions
*/
Set<TopicPartition> topicPartitions();
/**
* (Source) Topic partitions for which the instance acts as standby.
*
* @return set of standby topic partitions
*/
Set<TopicPartition> standbyTopicPartitions();
/**
* State stores owned by the instance as a standby replica
*
* @return set of standby state store names
*/
Set<String> standbyStateStoreNames();
/**
* This method is equivalent to call {@code StreamsMetadata.hostInfo().host();}
*/
String host();
/**
* This method is equivalent to call {@code StreamsMetadata.hostInfo().port();}
*/
int port();
/**
* Compares the specified object with this StreamsMetadata. Returns {@code true} if and only if the specified object is
* also a StreamsMetadata and for both {@code hostInfo()} are equal, and {@code stateStoreNames()}, {@code topicPartitions()},
* {@code standbyStateStoreNames()}, and {@code standbyTopicPartitions()} contain the same elements.
*/
boolean equals(Object o);
/**
* Returns the hash code value for this TaskMetadata. The hash code of a list is defined to be the result of the following calculation:
* <pre>
* {@code
* Objects.hash(hostInfo(), stateStoreNames(), topicPartitions(), standbyStateStoreNames(), standbyTopicPartitions());
* }
*/
int hashCode();
}
|
Along with this change, the org.apache.kafka.streams.state.StreamsMetadata is deprecated:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Represents the state of an instance (process) in a {@link KafkaStreams} application.
* It contains the user supplied {@link HostInfo} that can be used by developers to build
* APIs and services to connect to other instances, the Set of state stores available on
* the instance and the Set of {@link TopicPartition}s available on the instance.
* NOTE: This is a point in time view. It may change when rebalances happen.
* @deprecated since 3.0.0 use {@link org.apache.kafka.streams.StreamsMetadata}
*/
@Deprecated
public class StreamsMetadata {
...
} |
For this change, KafkaSTreams class needs to be adapted to return the new StreamsMetadata interface instead, while at the same time deprecate the old method:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
/**
* Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same
* {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
* the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
* <p>
* Note: this is a point in time view and it may change due to partition reassignment.
*
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
* @deprecated since 3.0.0 use {@link KafkaStreams#allRunningMetadata}
*/
@Deprecated
public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadata() {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadata().stream().map(streamsMetadata ->
new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
streamsMetadata.stateStoreNames(),
streamsMetadata.topicPartitions(),
streamsMetadata.standbyStateStoreNames(),
streamsMetadata.standbyTopicPartitions()))
.collect(Collectors.toSet());
}
/**
* Find all currently running {@code KafkaStreams} instances (potentially remotely) that use the same
* {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all instances that belong to
* the same Kafka Streams application) and return {@link StreamsMetadata} for each discovered instance.
* <p>
* Note: this is a point in time view and it may change due to partition reassignment.
*
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances of this application
*/
public Collection<StreamsMetadata> allRunningMetadata() {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadata();
}
/**
* Find all currently running {@code KafkaStreams} instances (potentially remotely) that
* <ul>
* <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
* instances that belong to the same Kafka Streams application)</li>
* <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
* </ul>
* and return {@link StreamsMetadata} for each discovered instance.
* <p>
* Note: this is a point in time view and it may change due to partition reassignment.
*
* @param storeName the {@code storeName} to find metadata for
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
* this application
* @deprecated since 3.0.0 use {@link KafkaStreams#allMetadataForGivenStore} instead
*/
@Deprecated
public Collection<org.apache.kafka.streams.state.StreamsMetadata> allMetadataForStore(final String storeName) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadataForStore(storeName).stream().map(streamsMetadata ->
new org.apache.kafka.streams.state.StreamsMetadata(streamsMetadata.hostInfo(),
streamsMetadata.stateStoreNames(),
streamsMetadata.topicPartitions(),
streamsMetadata.standbyStateStoreNames(),
streamsMetadata.standbyTopicPartitions()))
.collect(Collectors.toSet());
}
/**
* Find all currently running {@code KafkaStreams} instances (potentially remotely) that
* <ul>
* <li>use the same {@link StreamsConfig#APPLICATION_ID_CONFIG application ID} as this instance (i.e., all
* instances that belong to the same Kafka Streams application)</li>
* <li>and that contain a {@link StateStore} with the given {@code storeName}</li>
* </ul>
* and return {@link StreamsMetadata} for each discovered instance.
* <p>
* Note: this is a point in time view and it may change due to partition reassignment.
*
* @param storeName the {@code storeName} to find metadata for
* @return {@link StreamsMetadata} for each {@code KafkaStreams} instances with the provide {@code storeName} of
* this application
*/
public Collection<StreamsMetadata> allMetadataForGivenStore(final String storeName) {
validateIsRunningOrRebalancing();
return streamsMetadataState.getAllMetadataForStore(storeName);
} |
Proposed Changes
By introducing a new TaskMetadata interface on a different package, we are able to keep the same name (no need to come up with some forced name), and also, we are freed from carrying over changes like the one introduced in https://github.com/apache/kafka/pull/10755 where a method getTaskId was introduced (going against Kafka naming convention) because taskId was already taken and needed to be deprecated.
Together with this change, a new class TaskMetadataImpl will be created under org.apache.kafka.streams.processor.internals, which will implement the aforementioned interface with the implementation present on the org.apache.kafka.streams.processor.TaskMetadata. The rest of the Kafka code base using the newly deprecated TaskMetadata will be migrated to use the new classes instead.
The changes related to ThreadMetadata follow a similar pattern. A new class ThreadMetadataImpl will be created under org.apache.kafka.streams.processor.internals, which will implement the newly created interface org.apache.kafka.streams.ThreadMetadata. The rest of the Kafka code base will be migrated to use the newly created classes.
The changes related to StreamsMetadata follow a similar pattern. A new class StreamsMetadataImpl will be created under org.apache.kafka.streams.state.internals, which will implement the newly created interface org.apache.kafka.streams.StreamsMetadata. The rest of the Kafka code base will be migrated to use the newly created classes.
Unsure change, KafkaStreams would return a Set of StreamsMetadata in threadsMetadata, instead of a Set of ThreadMetada.
Also, by introducing new interfaces altogether, we ease the migration of any possible current user of the newly deprecated classes, offering a smooth transition to the new APIs.
Compatibility, Deprecation, and Migration Plan
As we are creating new interfaces, these changes are 100% compatible. Users who currently use org.apache.kafka.streams.processor.TaskMetadata, should migrate to org.apache.kafka.streams.TaskMetadata.
Same goes for users who currently use org.apache.kafka.streams.processor.ThreadMetadata and org.apache.kafka.streams.state.StreamsMetadta, they should migrate to org.apache.kafka.streams.ThreadMetadata and org.apache.kafka.streams.StreamsMetadata respectively.
Users using KafkaStreams#localThreadsMetadata should migrate to KafkaStreams#threadsMetadata.
Classes By introducing a new interface on a different package, we are able to keep the same name (no need to come up with some forced name), and also, we are freed from carrying over changes like the one introduced in https://github.com/apache/kafka/pull/10755 where a method getTaskId was introduced (going against Kafka naming convention) because taskId was already taken and needed to be deprecated.
Together with this change, a new class TaskMetadataImpl will be created under org.apache.kafka.streams.processor.internals, which will implement the aforementioned interface with the implementation present on the TaskMetadata and org.apache.kafka.streams.processor.TaskMetadata. The rest of the Kafka code base using the newly deprecated TaskMetadata will be migrated to use the new classes instead.
Also, by introducing a new interface altogether, we ease the migration of any possible current user of this class meant to be for internal use only.
Compatibility, Deprecation, and Migration Plan
As we create a new interface, these changes are 100% compatible. Users who currently use org.apache.kafka.streams.processor.TaskMetadata, should migrate to org.apache.kafka.streams.TaskMetadata.
Class org.apache.kafka.streams.processor.TaskMetadata will be deprecated and removed in future releases
Rejected Alternatives
An alternative that was rejected, was to directly transform org.apache.kafka.streams.TaskMetadata into an interface with this change. This was seen as a breaking change as the current constructor won't be present any more in this alternative, making possible users of this fail to compile.
...
ThreadMetadata will be deprecated and removed in future releases.
Rejected Alternatives
An alternative that was rejected, was to directly transform org.apache.kafka.streams.TaskMetadata into an interface with this change. This was seen as a breaking change as the current constructor won't be present any more in this alternative, making possible users of this fail to compile.
Additionally, this change forced us to keep the method `getTaskId` for a while, until we could deprecate and remove the current `taskId` method.
The inclusion of more complex changes mentioned in
Jira | ||||||
---|---|---|---|---|---|---|
|