Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

This KIP add a new method to have allow access to runtime information (i.e threads/tasks details of the local stream instance).

AsoAlso, exposing both active and standby tasks is important as this can be used to debug partition assigments when num.standby.replicas != 0.

...

This KIP will add the method Set<ThreadMetadata> StreamsMetadata#streamThreadsKafkaSreams#localThreadsMetadata(). This method will return a set ThreadMetadata representing the current threads running into the local stream instance.

...

Code Block
languagejava
titleThreadState
/**
 * Represents the state of a thread-thread running within a {@link KafkaStreams} application.
 */
public class ThreadMetadata {

    private final String threadName;

    private final String threadState;

    private final Set<TaskMetadata> activeTasks;

    private final Set<TaskMetadata> standbyTasks;

    public ThreadMetadata(String threadName, String threadState, Set<TaskMetadata> activeTasks, Set<TaskMetadata> standbyTasks) {
        this.threadName = threadName;
        this.threadState = threadState;
        this.activeTasks = activeTasks;
        this.standbyTasks = standbyTasks;
    }

    public String threadState() {
        return threadState;
    }

    public String threadName() {
        return threadName;
    }

    public Set<TaskMetadata> activeTasks() {
        return activeTasks;
    }

    public Set<TaskMetadata> standbyTasks() {
        return standbyTasks;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        ThreadMetadata that = (ThreadMetadata) o;

        if (!threadName.equals(that.threadName)) return false;
        if (!threadState.equals(that.threadState)) return false;
        if (!activeTasks.equals(that.activeTasks)) return false;
        return  standbyTasks.equals(that.standbyTasks);

    }

    @Override
    public int hashCode() {
        int result = threadName.hashCode();
        result = 31 * result + threadState.hashCode();
        result = 31 * result + activeTasks.hashCode();
        result = 31 * result + standbyTasks.hashCode();
        return result;
    }

    @Override
    public String toString() {
        return "ThreadMetadata{" +
                "threadName=" + threadName +
                ", threadState=" + threadState +
                ", activeTasks=" + activeTasks +
                ", standbyTasks=" + standbyTasks +
                '}';
    }
}

...

This new feature require to add a new method to KafkaStreams to StreamsMetadata to expose thread/tasks details.

Code Block
languagejava
/**
 * Returns information about the local stream threads running in a {@link KafkaStreams} application.
 *
 Note this method will return <code>null</code> if called
 * on {@link StreamsMetadata} which represent a remote application.
 *
 * @return the set of {@link ThreadMetadata}.
 */
public Set<ThreadMetadata> streamThreads() {* @return the set of {@link ThreadMetadata}.
 */
public Set<ThreadMetadata> localThreadsMetadata() {
    validateIsRunning();
    Set<ThreadMetadata> threadMetadata = new HashSet<>();
    for (int i = 0; i < threads.length; i++)
        threadMetadata.add(threads[i].threadMetadata());
    return this.streamThreadsthreadMetadata;
}

 

A straightforward first pass is GitHub PR 2612

...

No compatibility issues foreseen.

Rejected Alternatives

 1.  Directly add Add a new method threadStates to public API of KafkaStreams StreamsMetadata to expose current states of running threads and tasks. This alternative was rejected because the method would return null for remote application.