Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: here
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
This KIP add a new method to have allow access to runtime information (i.e threads/tasks details of the local stream instance).
AsoAlso, exposing both active and standby tasks is important as this can be used to debug partition assigments when num.standby.replicas != 0.
...
This KIP will add the method Set<ThreadMetadata> StreamsMetadata#streamThreadsKafkaStreams#localThreadsMetadata(). This method will return a set ThreadMetadata representing the current threads running into the local stream instance.
...
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents the state of a thread-thread running within a {@link KafkaStreams} application. */ public class ThreadMetadata { private final String threadName; private final String threadState; private final Set<TaskMetadata> activeTasks; private final Set<TaskMetadata> standbyTasks; public ThreadMetadata(String threadName, String threadState, Set<TaskMetadata> activeTasks, Set<TaskMetadata> standbyTasks) { this.threadName = threadName; this.threadState = threadState; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; } public String threadState() { return threadState; } public String threadName() { return threadName; } public Set<TaskMetadata> activeTasks() { return activeTasks; } public Set<TaskMetadata> standbyTasks() { return standbyTasks; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; ThreadMetadata that = (ThreadMetadata) o; if (!threadName.equals(that.threadName)) return false; if (!threadState.equals(that.threadState)) return false; if (!activeTasks.equals(that.activeTasks)) return false; return standbyTasks.equals(that.standbyTasks); } @Override public int hashCode() { int result = threadName.hashCode(); result = 31 * result + threadState.hashCode(); result = 31 * result + activeTasks.hashCode(); result = 31 * result + standbyTasks.hashCode(); return result; } @Override public String toString() { return "ThreadMetadata{" + "threadName=" + threadName + ", threadState=" + threadState + ", activeTasks=" + activeTasks + ", standbyTasks=" + standbyTasks + '}'; } } |
...
This new feature require to add a new method to KafkaStreams to StreamsMetadata to expose thread/tasks details.
Code Block | ||
---|---|---|
| ||
/** * Returns information about the local stream threads running in a {@link KafkaStreams} application. * * @return Notethe thisset methodof will return <code>null</code> if called * on {@link StreamsMetadata} which represent a remote application. * * @return the set of {@link ThreadMetadata}. */ public Set<ThreadMetadata> streamThreads() {{@link ThreadMetadata}. */ public Set<ThreadMetadata> localThreadsMetadata() { validateIsRunning(); Set<ThreadMetadata> threadMetadata = new HashSet<>(); for (int i = 0; i < threads.length; i++) threadMetadata.add(threads[i].threadMetadata()); return this.streamThreadsthreadMetadata; } |
In addition, the current toString() method should be deprecated as it would result to return inconsistent information with the new API.
A straightforward first pass is GitHub PR 2612
...
No compatibility issues foreseen.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way. 1. Add a new method threadStates to public API of StreamsMetadata to expose current states of running threads and tasks. This alternative was rejected because the method would return null for remote application.