Current state: Under Discussion
Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]
JIRA: here
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
In Kafka 0.10.1.0, toString() methods have been added to the public API of the KafkaStreams class to print useful information about the representation of the topology DAG.
If this method can be used to debug topologies during development we cannot used it to monitor a KafkaStreams application in an production environment.
Currently there is no way to have the details about the states of active tasks, neither partition assignments nor consumed offsets.
To address this use case we propose to expose the states of active tasks throught the public API KafkaStreams.
For instance, this will allow applications to expose a REST API to get the global state of a kstreams topology. In addition, this could encourage the community to develop some KafkaStreams UI tooling.
This KIP will add the method KafkaStreams#threadStates(). This method will return a collection of ThreadState instances.
Below are the new public classes. Those classes will be declared as inner classes into KafkaStreams :
public static class ThreadState { private String name; private StreamThread.State state; private Collection<TaskState> activeTasks; public ThreadState(String name, StreamThread.State state, Collection<TaskState> activeTasks) { this.name = name; this.state = state; this.activeTasks = activeTasks; } public String getName() { return name; } public StreamThread.State getState() { return state; } public Collection<TaskState> getActiveTasks() { return activeTasks; } /** * Produce a string representation containing useful information about this {@code ThreadState} instance such as * thread name, thread state and a representation of active tasks. * * @return a json string representation of this {@code ThreadState}. */ @Override public String toString() { final StringBuilder sb = new StringBuilder("{"); sb.append("\"name\":\"").append(name).append('\"'); sb.append(",\"state\":\"").append(state).append('\"'); sb.append(",\"activeTasks\":").append(activeTasks); sb.append('}'); return sb.toString(); } } |
public static class TaskState { private String id; private Set<TopicPartition> assignments; private Map<TopicPartition, Long> consumedOffsetsByPartition; public TaskState(String id, Set<TopicPartition> assignments, Map<TopicPartition, Long> consumedOffsetsByPartition) { this.id = id; this.assignments = assignments; this.consumedOffsetsByPartition = consumedOffsetsByPartition; } public String getId() { return id; } public Set<TopicPartition> getAssignments() { return assignments; } public Map<TopicPartition, Long> getConsumedOffsetsByPartition() { return consumedOffsetsByPartition; } @Override public String toString() { final StringBuilder sb = new StringBuilder("{"); sb.append("\"id\":\"").append(id).append('\"'); sb.append(", \"assignments\":["); Iterator<TopicPartition> partitionIterator = assignments.iterator(); while (partitionIterator.hasNext()) { sb.append("\"" + partitionIterator.next() + "\""); if (partitionIterator.hasNext()) sb.append(","); } sb.append("]"); sb.append(", \"consumedOffsetsByPartition\":["); Iterator<Map.Entry<TopicPartition, Long>> consumedIterator = consumedOffsetsByPartition.entrySet().iterator(); while (consumedIterator.hasNext()) { Map.Entry<TopicPartition, Long> entry = consumedIterator.next(); sb.append("{\"topicPartition\":\"" + entry.getKey() + "\"") .append(",\"offset\":\"" + entry.getKey() + "\"}"); if (consumedIterator.hasNext()) sb.append(","); } sb.append("]"); sb.append('}'); return sb.toString(); } } |
This new feature require to add a new method to StreamTask in order to retrieve last consumed offsets.
/** * @return latest consumed offsets by partition */ public Map<TopicPartition, Long> consumedOffsets() { return Collections.unmodifiableMap(this.consumedOffsets); } |
A straightforward first pass is GitHub PR 2612
No compatibility issues foreseen.
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.