Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder DiscussionAccepted

Discussion thread: here

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

This KIP add a new method to have allow access to runtime information (i.e threads/tasks details of the local stream instance).

AsoAlso, exposing both active and standby tasks is important as this can be used to debug partition assigments when num.standby.replicas != 0.

...

This KIP will add the method Set<ThreadMetadata> StreamsMetadata#streamThreadsKafkaStreams#localThreadsMetadata(). This method will return a set ThreadMetadata representing the current threads running into the local stream instance.

...

Code Block
languagejava
titleThreadState
/**
 * Represents the state of a thread-thread running within a {@link KafkaStreams} application.
 */
public class ThreadMetadata {

    private final String threadName;

    private final String threadState;

    private final Set<TaskMetadata> activeTasks;

    private final Set<TaskMetadata> standbyTasks;

    public ThreadMetadata(String threadName, String threadState, Set<TaskMetadata> activeTasks, Set<TaskMetadata> standbyTasks) {
        this.threadName = threadName;
        this.threadState = threadState;
        this.activeTasks = activeTasks;
        this.standbyTasks = standbyTasks;
    }

    public String threadState() {
        return threadState;
    }

    public String threadName() {
        return threadName;
    }

    public Set<TaskMetadata> activeTasks() {
        return activeTasks;
    }

    public Set<TaskMetadata> standbyTasks() {
        return standbyTasks;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;

        ThreadMetadata that = (ThreadMetadata) o;

        if (!threadName.equals(that.threadName)) return false;
        if (!threadState.equals(that.threadState)) return false;
        if (!activeTasks.equals(that.activeTasks)) return false;
        return  standbyTasks.equals(that.standbyTasks);

    }

    @Override
    public int hashCode() {
        int result = threadName.hashCode();
        result = 31 * result + threadState.hashCode();
        result = 31 * result + activeTasks.hashCode();
        result = 31 * result + standbyTasks.hashCode();
        return result;
    }

    @Override
    public String toString() {
        return "ThreadMetadata{" +
                "threadName=" + threadName +
                ", threadState=" + threadState +
                ", activeTasks=" + activeTasks +
                ", standbyTasks=" + standbyTasks +
                '}';
    }
}

...

This new feature require to add a new method to KafkaStreams to StreamsMetadata to expose thread/tasks details.

Code Block
languagejava
/**
 * Returns information about the local stream threads running in a {@link KafkaStreams} application.
 *
 * @return Notethe thisset methodof will return <code>null</code> if called
 * on {@link StreamsMetadata} which represent a remote application.
 *
 * @return the set of {@link ThreadMetadata}.
 */
public Set<ThreadMetadata> streamThreads() {{@link ThreadMetadata}.
 */
public Set<ThreadMetadata> localThreadsMetadata() {
    validateIsRunning();
    Set<ThreadMetadata> threadMetadata = new HashSet<>();
    for (int i = 0; i < threads.length; i++)
        threadMetadata.add(threads[i].threadMetadata());
    return this.streamThreadsthreadMetadata;
}

 

In addition, the current toString() method should be deprecated as it would result to return inconsistent information with the new API.

A straightforward first pass is GitHub PR 2612

...

No compatibility issues foreseen.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way. 1. Add a new method threadStates to public API of StreamsMetadata to expose current states of running threads and tasks. This alternative was rejected because the method would return null for remote application.