Table of Contents |
---|
Status
Current state: Under DiscussionAccepted
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Currently there is no way to have the details about the states of either active or standby tasks, neither its partition assignments nor consumed offsets.
To address this use case improve Streams' debuggability we propose to expose the states of active tasks throught the public API KafkaStreams.
The most close API to this is StreamsMetadata, however it aggregates the tasks across all threads and only present the aggregated set of the assigned partitions.
This KIP add a new method to allow access to runtime information (i.e threads/tasks details of the local stream instance).
Also, exposing both active and standby tasks is important as this can be used to debug partition assigments when num.standby.replicas != 0.
The task-level information could be polled in a programmatic way for monitoring purposes.
For instance, this will allow applications to expose a REST API to get the global state of a kstreams topology. In addition, this could encourage the community to develop some KafkaStreams UI tooling.
...
This KIP will add the method KafkaStreams#threadStatesSet<ThreadMetadata> KafkaStreams#localThreadsMetadata(). This method will return a collection of ThreadState instancesset ThreadMetadata representing the current threads running into the local stream instance.
Below are the new public classes. Those classes will be declared as inner classes into KafkaStreams :
Code Block | ||||
---|---|---|---|---|
| ||||
public static class ThreadState/** * Represents the state of a thread-thread running within a {@link KafkaStreams} application. */ public class ThreadMetadata { private final String namethreadName; private StreamThread.State state;final String threadState; private final Collection<TaskState>Set<TaskMetadata> activeTasks; private final Set<TaskMetadata> standbyTasks; public ThreadStateThreadMetadata(String namethreadName, StreamThread.StateString statethreadState, Collection<TaskState>Set<TaskMetadata> activeTasks, Set<TaskMetadata> standbyTasks) { this.namethreadName = namethreadName; this.statethreadState = statethreadState; this.activeTasks = activeTasks; this.standbyTasks = standbyTasks; } public String getNamethreadState() { return namethreadState; } public StreamThread.StateString getStatethreadName() { return statethreadName; } public Collection<TaskState>Set<TaskMetadata> getActiveTasksactiveTasks() { return activeTasks; } /** * Produce apublic stringSet<TaskMetadata> representation containing useful information about this {@code ThreadState} instance such as * thread name, thread state and a representation of active tasks. * * @return a json string representation of this {@code ThreadState}. */ @Override public String toString() {standbyTasks() { return standbyTasks; } @Override public boolean equals(Object o) { if (this == o) return true; if (o == null || getClass() != o.getClass()) return false; finalThreadMetadata StringBuilder sbthat = new StringBuilder("{"ThreadMetadata) o; if sb(!threadName.append("\"name\":\"").append(name).append('\"'); equals(that.threadName)) return false; if sb(!threadState.append(",\"state\":\"").append(state).append('\"')equals(that.threadState)) return false; if sb.append(",\"activeTasks\":").append(activeTasks); (!activeTasks.equals(that.activeTasks)) return false; return sbstandbyTasks.append('}'equals(that.standbyTasks); } return sb.toString();@Override } } | ||||
Code Block | ||||
| ||||
public static class TaskStateint hashCode() { private String id; int result private Set<TopicPartition> assignments= threadName.hashCode(); private Map<TopicPartition, Long> consumedOffsetsByPartition; public TaskState(String id, Set<TopicPartition> assignments, Map<TopicPartition, Long> consumedOffsetsByPartition) { this.id = idresult = 31 * result + threadState.hashCode(); result = 31 * result + activeTasks.hashCode(); this.assignmentsresult = assignments 31 * result + standbyTasks.hashCode(); this.consumedOffsetsByPartition = consumedOffsetsByPartitionreturn result; } @Override public String getIdtoString() { return id; "ThreadMetadata{" + } public Set<TopicPartition> getAssignments() { "threadName=" + threadName + return assignments; } public Map<TopicPartition", threadState=" Long>+ getConsumedOffsetsByPartition()threadState {+ return consumedOffsetsByPartition; } @Override", activeTasks=" + activeTasks + public String toString() { final StringBuilder sb = new StringBuilder("{");", standbyTasks=" + standbyTasks + sb.append("\"id\":\"").append(id).append('\"'); sb.append(", \ '}'; } } |
Code Block | ||||
---|---|---|---|---|
| ||||
/** * Represents the state of a single (task) running within a {@link KafkaStreams} application. */ public class TaskMetadata { private final String taskId; private final Set<TopicPartition> assignedPartitions; public TaskMetadata(String taskId, Set<TopicPartition> assignedPartitions)"assignments\":["); Iterator<TopicPartition> partitionIterator = assignments.iterator(); while (partitionIterator.hasNext()) { this.taskId = taskId; sb.append("\"" + partitionIterator.next() + "\"") this.assignedPartitions = assignedPartitions; } public String taskId() { if (partitionIterator.hasNext()) return taskId; } public Set<TopicPartition> sb.append(",");assignedPartitions() { return assignedPartitions; } @Override public boolean sb.append("]");equals(Object o) { sb.append(", \"consumedOffsetsByPartition\":["); if (this == o) return true; if (o Iterator<Map.Entry<TopicPartition, Long>> consumedIterator = consumedOffsetsByPartition.entrySet().iterator(); == null || getClass() != o.getClass()) return false; whileTaskMetadata that = (consumedIterator.hasNext(TaskMetadata)) { o; if (!taskId.equals(that.taskId)) return false; Map.Entry<TopicPartition, Long> entryreturn = consumedIteratorassignedPartitions.nextequals(that.assignedPartitions); } @Override public int hashCode() sb.append("{\"topicPartition\":\"" + entry.getKey() + "\"") { int result = taskId.hashCode(); result = 31 .append(",\"offset\":\""* result + entryassignedPartitions.getKeyhashCode(); + "\"}"); return result; } @Override public if (consumedIterator.hasNextString toString()) { return "TaskMetadata{" + sb.append(",");"taskId=" + taskId + } sb.append("]");", assignedPartitions=" + assignedPartitions + sb.append('}'); return sb.toString()'}'; } } |
...
Proposed Changes
This new feature require to add a new method to StreamTask in order to retrieve last consumed offsetsKafkaStreams to expose thread/tasks details.
Code Block | ||
---|---|---|
| ||
/** * Returns information about the local stream threads running in a {@link KafkaStreams} application. * * @return latestthe consumedset offsetsof by partition{@link ThreadMetadata}. */ public Map<TopicPartition, Long> consumedOffsets() { Set<ThreadMetadata> localThreadsMetadata() { validateIsRunning(); Set<ThreadMetadata> threadMetadata = new HashSet<>(); for (int i = 0; i < threads.length; i++) return CollectionsthreadMetadata.unmodifiableMap(this.consumedOffsets)add(threads[i].threadMetadata()); return threadMetadata; } |
In addition, the current toString() method should be deprecated as it would result to return inconsistent information with the new API.
A straightforward first pass is GitHub PR 2612
...
No compatibility issues foreseen.
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way. 1. Add a new method threadStates to public API of StreamsMetadata to expose current states of running threads and tasks. This alternative was rejected because the method would return null for remote application.