Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Currently there is no way to have the details about the states of either active or standby tasks, neither its partition assignments nor consumed offsets.

To address this use case improve Streams' debuggability we propose to expose the states of active tasks throught the public API KafkaStreams.

 

The most close API to this is StreamsMetadata, however it aggregates the tasks across all threads and only present the aggregated set of the assigned partitions.

This KIP add a new method to have access to threads/tasks details of the local stream instance.

Aso, exposing both active and standby tasks is important as this can be used to debug partition assigments when num.standby.replicas != 0.

 

The task-level information could be polled in a programmatic way for monitoring purposes.

For instance, this will allow applications to expose a REST API to get the global state of a kstreams topology. In addition, this could encourage the community to develop some KafkaStreams UI tooling.

...

This KIP will add the method KafkaStreams#threadStatesSet<ThreadMetadata> StreamsMetadata#streamThreads(). This method will return a collection of ThreadState instancesset ThreadMetadata representing the current threads running into the local stream instance.

Below are the new public classes. Those classes will be declared as inner classes into KafkaStreams :

Code Block
languagejava
titleThreadState
public static class ThreadState/**
 * Represents the state of a thread-thread running within a {@link KafkaStreams} application.
 */
public class ThreadMetadata {

    private final String namethreadName;

    private StreamThread.State statefinal Set<TaskMetadata> activeTasks;

    private Collection<TaskState>final Set<TaskMetadata> activeTasksstandbyTasks;

    public ThreadStateThreadMetadata(String namethreadName, StreamThread.StateSet<TaskMetadata> stateactiveTasks, Collection<TaskState>Set<TaskMetadata> activeTasksstandbyTasks) {
        this.namethreadName = namethreadName;
        this.stateactiveTasks = stateactiveTasks;
        this.activeTasksstandbyTasks = activeTasksstandbyTasks;
    }

    public String getNamethreadName() {
        return namethreadName;
    }

    public StreamThread.StateSet<TaskMetadata> getStateactiveTasks() {
        return stateactiveTasks;
    }

    public Collection<TaskState>Set<TaskMetadata> getActiveTasksstandbyTasks() {
        return activeTasksstandbyTasks;
    }

	/**
 	 * Produce a@Override
 string representation containing usefulpublic informationboolean about thisequals(Object o) {@code
  ThreadState} instance such as
 	 * threadif name, thread state and a representation of active tasks.
 	 *
 	 * @return a json string representation of this {@code ThreadState}.
 	*/
    @Override
    public String toString() {(this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        ThreadMetadata that = (ThreadMetadata) o;

        final StringBuilder sb = new StringBuilder("{")if (!threadName.equals(that.threadName)) return false;
        sb.append("\"name\":\"").append(name).append('\"')if (!activeTasks.equals(that.activeTasks)) return false;
        sb.append(",\"state\":\"").append(state).append('\"');
return  standbyTasks.equals(that.standbyTasks);

    }

    @Override
    sb.append(",\"activeTasks\":").append(activeTasks);
public int hashCode() {
        int result = sbthreadName.appendhashCode('}');
        return sb.toStringresult = 31 * result + activeTasks.hashCode();
    }
}
Code Block
languagejava
titleTaskState
public static class TaskState {
result = 31 * privateresult String id+ standbyTasks.hashCode();
       private Set<TopicPartition>return assignmentsresult;
    }

 private Map<TopicPartition, Long> consumedOffsetsByPartition;@Override

    public String TaskState(String id, Set<TopicPartition> assignments, Map<TopicPartition, Long> consumedOffsetsByPartition) {toString() {
        return "ThreadMetadata{" +
        this.id = id;
          "threadName=" + threadName +
      this.assignments = assignments;
        this.consumedOffsetsByPartition", activeTasks=" consumedOffsetsByPartition;
+ activeTasks   }+

    public String getId() {
        return id;
    }
", standbyTasks=" + standbyTasks +
    public Set<TopicPartition> getAssignments() {
        return assignments'}';
    }
}
Code Block
languagejava
titleTaskState
/**
 * Represents the public Map<TopicPartition, Long> getConsumedOffsetsByPartition() {
        return consumedOffsetsByPartition;
    }

    @Overridestate of a single (task) running within a {@link KafkaStreams} application.
 */
public class TaskMetadata {

    private final String taskId;

    private final Set<TopicPartition> assignedPartitions;

    public TaskMetadata(String toString(taskId, Set<TopicPartition> assignedPartitions) {
        final StringBuilder sb = new StringBuilder("{")this.taskId = taskId;
        sb.append("\"id\":\"").append(id).append('\"')this.assignedPartitions = assignedPartitions;
    }

    public String sb.append(", \"assignments\":[");

taskId() {
        Iterator<TopicPartition> partitionIterator = assignments.iterator()return taskId;
    }

    while (partitionIterator.hasNext()public Set<TopicPartition> assignedPartitions() {
        return assignedPartitions;
    sb.append("\"" + partitionIterator.next() + "\"");}

    @Override
    public boolean equals(Object o) {
        if (partitionIterator.hasNext())this == o) return true;
        if (o == null || getClass() !=  sbo.appendgetClass(",");)) return false;

        }
TaskMetadata that       sb.append("]");
= (TaskMetadata) o;

        if sb(!taskId.append(", \"consumedOffsetsByPartition\":[")equals(that.taskId)) return false;
        Iterator<Map.Entry<TopicPartition,return Long>> consumedIterator = consumedOffsetsByPartition.entrySet().iterator();assignedPartitions.equals(that.assignedPartitions);

    }

    while (consumedIterator.hasNext()) {
@Override
    public int hashCode() {
       Map.Entry<TopicPartition, Long>int entryresult = consumedIteratortaskId.nexthashCode();
        result = 31 *     sb.append("{\"topicPartition\":\"" + entry.getKey() + "\"")
result + assignedPartitions.hashCode();
        return result;
    }

    @Override
    .append(",\"offset\":\"" + entry.getKey() + "\"}");public String toString() {
        return "TaskMetadata{" +
      if (consumedIterator.hasNext())
         "taskId=" + taskId +
        sb.append(",");
        }
        sb.append("]");
 ", assignedPartitions=" + assignedPartitions +
       sb.append('}');
        return sb.toString() '}';
    }
}

 

 

Proposed Changes

This new feature require to add a new method to StreamTask in order to retrieve last consumed offsetsStreamsMetadata to expose thread/tasks details.

Code Block
languagejava
/**
 * Returns information about the stream threads running in a {@link KafkaStreams} application.
 * Note this method will return <code>null</code> if called
 * on {@link StreamsMetadata} which represent a remote application.
 *
 * @return latestthe consumedset offsetsof by partition{@link ThreadMetadata}.
 */
public Map<TopicPartition, Long> consumedOffsetsSet<ThreadMetadata> streamThreads() {
    return Collections.unmodifiableMap(this.consumedOffsets)streamThreads;
}

 

A straightforward first pass is GitHub PR 2612

...