Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Currently there is no way to have the details about the states of either active or standby tasks, neither its partition assignments nor consumed offsets.

To address this use case improve Streams' debuggability we propose to expose the states of active tasks throught the public API KafkaStreams.

 

The most close API to this is StreamsMetadata, however it aggregates the tasks across all threads and only present the aggregated set of the assigned partitions.

This KIP add a new method to allow access to runtime information (i.e threads/tasks details of the local stream instance).

Also, exposing both active and standby tasks is important as this can be used to debug partition assigments when num.standby.replicas != 0.

 

The task-level information could be polled in a programmatic way for monitoring purposes.

For instance, this will allow applications to expose a REST API to get the global state of a kstreams topology. In addition, this could encourage the community to develop some KafkaStreams UI tooling.

...

This KIP will add the method KafkaStreams#threadStatesSet<ThreadMetadata> KafkaStreams#localThreadsMetadata(). This method will return a collection of ThreadState instancesset ThreadMetadata representing the current threads running into the local stream instance.

Below are the new public classes. Those classes will be declared as inner classes into KafkaStreams :

Code Block
languagejava
titleThreadState
public static/**
 * Represents the state of a thread-thread running within a {@link KafkaStreams} application.
 */
public class ThreadStateThreadMetadata {

    private final String namethreadName;

    private final StreamThread.StateString statethreadState;

    private final Collection<TaskState>Set<TaskMetadata> activeTasks;

    private final Set<TaskMetadata> standbyTasks;

    public ThreadStateThreadMetadata(String namethreadName, StreamThread.StateString statethreadState, Collection<TaskState>Set<TaskMetadata> activeTasks, Set<TaskMetadata> standbyTasks) {
        this.namethreadName = namethreadName;
        this.statethreadState = statethreadState;
        this.activeTasks = activeTasks;
        this.standbyTasks = standbyTasks;
    }

    public String getNamethreadState() {
        return namethreadState;
    }

    public StreamThread.StateString getStatethreadName() {
        return statethreadName;
    }

    public Collection<TaskState>Set<TaskMetadata> getActiveTasksactiveTasks() {
        return activeTasks;
    }

	/**
 	 * Produce apublic stringSet<TaskMetadata> representation containing useful information about this {@code ThreadState} instance such as
 	 * thread name, thread state and a representation of active tasks.
 	 *
 	 * @return a json string representation of this {@code ThreadState}.
 	*/
    @Override
    public String toString() {standbyTasks() {
        return standbyTasks;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        finalThreadMetadata StringBuilder sbthat = new StringBuilder("{"ThreadMetadata) o;

        if sb(!threadName.append("\"name\":\"").append(name).append('\"');
equals(that.threadName)) return false;
        if sb(!threadState.append(",\"state\":\"").append(state).append('\"');
equals(that.threadState)) return false;
        if sb.append(",\"activeTasks\":").append(activeTasks);
(!activeTasks.equals(that.activeTasks)) return false;
        return  sbstandbyTasks.append('}'equals(that.standbyTasks);

    }

    return sb.toString();@Override
    }
}
Code Block
languagejava
titleTaskState
public static class TaskStateint hashCode() {
    private String id;
  int result private Set<TopicPartition> assignments= threadName.hashCode();
    private   Map<TopicPartition, Long> consumedOffsetsByPartition;

    public TaskState(String id, Set<TopicPartition> assignments, Map<TopicPartition, Long> consumedOffsetsByPartition) {
        this.id = idresult = 31 * result + threadState.hashCode();
        result = 31 * result + activeTasks.hashCode();
        this.assignmentsresult = assignments 31 * result + standbyTasks.hashCode();
        this.consumedOffsetsByPartition = consumedOffsetsByPartitionreturn result;
    }

    @Override
    public String getIdtoString() {
        return id;
 "ThreadMetadata{" +
         }

     public Set<TopicPartition> getAssignments() {
   "threadName=" + threadName +
      return assignments;
    }

    public Map<TopicPartition", threadState=" Long>+ getConsumedOffsetsByPartition()threadState {+
           return consumedOffsetsByPartition;
    }

    @Override", activeTasks=" + activeTasks +
    public String toString() {
        final StringBuilder sb", standbyTasks=" + new StringBuilder("{");
standbyTasks +
                sb.append("\"id\":\"").append(id).append('\"')'}';
    }
}
Code Block
languagejava
titleTaskState
/**
 * Represents the state of a single sb.append(", \"assignments\":[");

   (task) running within a {@link KafkaStreams} application.
 */
public class TaskMetadata {

    private Iterator<TopicPartition>final partitionIterator = assignments.iterator();
    String taskId;

    private final Set<TopicPartition> assignedPartitions;

    whilepublic (partitionIterator.hasNext()TaskMetadata(String taskId, Set<TopicPartition> assignedPartitions) {
        this.taskId = taskId;
      sb.append("\"" + partitionIteratorthis.next()assignedPartitions + "\"")= assignedPartitions;
    }

    public String taskId() {
  if (partitionIterator.hasNext())
     return taskId;
    }

    public Set<TopicPartition> sb.append(",");
assignedPartitions() {
        return assignedPartitions;
    }

    @Override
    public boolean sb.append("]");equals(Object o) {
        sb.append(", \"consumedOffsetsByPartition\":[");
if (this == o) return true;
        if (o  Iterator<Map.Entry<TopicPartition, Long>> consumedIterator = consumedOffsetsByPartition.entrySet().iterator();
== null || getClass() != o.getClass()) return false;

        TaskMetadata that while= (consumedIterator.hasNext(TaskMetadata)) {
o;

        if (!taskId.equals(that.taskId)) return false;
      Map.Entry<TopicPartition, Long> entryreturn = consumedIteratorassignedPartitions.nextequals(that.assignedPartitions);

    }

    @Override
    public int hashCode()  sb.append("{\"topicPartition\":\"" + entry.getKey() + "\"")
{
        int result = taskId.hashCode();
        result = 31  .append(",\"offset\":\""* result + entryassignedPartitions.getKeyhashCode();
     + "\"}");
  return result;
    }

    @Override
    public if (consumedIterator.hasNextString toString())
 {
        return "TaskMetadata{" +
                sb.append(",");"taskId=" + taskId +
        }
        sb.append("]");
 ", assignedPartitions=" + assignedPartitions +
       sb.append('}');
        return sb.toString() '}';
    }
}

...

 

Proposed Changes

This new feature require to add a new method to StreamTask in order to retrieve last consumed offsetsKafkaStreams to expose thread/tasks details.

Code Block
languagejava
/**
 * Returns information about the local stream threads running in a {@link KafkaStreams} application.
 *
 * @return latestthe consumedset offsetsof by partition{@link ThreadMetadata}.
 */
public Map<TopicPartition, Long> consumedOffsets() {
 Set<ThreadMetadata> localThreadsMetadata() {
    validateIsRunning();
    Set<ThreadMetadata> threadMetadata = new HashSet<>();
    for (int i = 0; i < threads.length; i++)
       return CollectionsthreadMetadata.unmodifiableMap(this.consumedOffsets)add(threads[i].threadMetadata());
    return threadMetadata;
}

 

In addition, the current toString() method should be deprecated as it would result to return inconsistent information with the new API.

A straightforward first pass is GitHub PR 2612

...

No compatibility issues foreseen.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way. 1. Add a new method threadStates to public API of StreamsMetadata to expose current states of running threads and tasks. This alternative was rejected because the method would return null for remote application.