You are viewing an old version of this page. View the current version.

Compare with Current View Page History

Version 1 Next »

Status

Current stateUnder Discussion

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA: here 

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

In Kafka 0.10.1.0, toString() methods have been added to the public API of the KafkaStreams class to print useful information about the representation of the topology DAG.

If this method can be used to debug topologies during development we cannot used it to monitor a KafkaStreams application in an production environment.

 

Currently there is no way to have the details about the states of active tasks, neither partition assignments nor consumed offsets.

To address this use case we propose to expose the states of active tasks throught the public API KafkaStreams.

 

For instance, this will allow applications to expose a REST API to get the global state of a kstreams topology. In addition, this could encourage the community to develop some KafkaStreams UI tooling.

Public Interfaces

This KIP will add the method KafkaStreams#threadStates(). This method will return a collection of ThreadState instances.

Below are the new public classes. Those classes will be declared as inner classes into KafkaStreams :

ThreadState
public static class ThreadState {

    private String name;
    private StreamThread.State state;
    private Collection<TaskState> activeTasks;

    public ThreadState(String name, StreamThread.State state, Collection<TaskState> activeTasks) {
        this.name = name;
        this.state = state;
        this.activeTasks = activeTasks;
    }

    public String getName() {
        return name;
    }

    public StreamThread.State getState() {
        return state;
    }

    public Collection<TaskState> getActiveTasks() {
        return activeTasks;
    }

	/**
 	 * Produce a string representation containing useful information about this {@code ThreadState} instance such as
 	 * thread name, thread state and a representation of active tasks.
 	 *
 	 * @return a json string representation of this {@code ThreadState}.
 	*/
    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("{");
        sb.append("\"name\":\"").append(name).append('\"');
        sb.append(",\"state\":\"").append(state).append('\"');
        sb.append(",\"activeTasks\":").append(activeTasks);
        sb.append('}');
        return sb.toString();
    }
}
TaskState
public static class TaskState {
    private String id;
    private Set<TopicPartition> assignments;
    private Map<TopicPartition, Long> consumedOffsetsByPartition;

    public TaskState(String id, Set<TopicPartition> assignments, Map<TopicPartition, Long> consumedOffsetsByPartition) {
        this.id = id;
        this.assignments = assignments;
        this.consumedOffsetsByPartition = consumedOffsetsByPartition;
    }

    public String getId() {
        return id;
    }

    public Set<TopicPartition> getAssignments() {
        return assignments;
    }

    public Map<TopicPartition, Long> getConsumedOffsetsByPartition() {
        return consumedOffsetsByPartition;
    }

    @Override
    public String toString() {
        final StringBuilder sb = new StringBuilder("{");
        sb.append("\"id\":\"").append(id).append('\"');
        sb.append(", \"assignments\":[");

        Iterator<TopicPartition> partitionIterator = assignments.iterator();
        while (partitionIterator.hasNext()) {
            sb.append("\"" + partitionIterator.next() + "\"");
            if (partitionIterator.hasNext())
                sb.append(",");
        }
        sb.append("]");
        sb.append(", \"consumedOffsetsByPartition\":[");
        Iterator<Map.Entry<TopicPartition, Long>> consumedIterator = consumedOffsetsByPartition.entrySet().iterator();
        while (consumedIterator.hasNext()) {
            Map.Entry<TopicPartition, Long> entry = consumedIterator.next();
                sb.append("{\"topicPartition\":\"" + entry.getKey() + "\"")
                  .append(",\"offset\":\"" + entry.getKey() + "\"}");
                if (consumedIterator.hasNext())
                    sb.append(",");
        }
        sb.append("]");
        sb.append('}');
        return sb.toString();
    }
}

 

 

Proposed Changes

This new feature require to add a new method to StreamTask in order to retrieved last consumed offsets.

/**
 * @return latest consumed offsets by partition
 */
public Map<TopicPartition, Long> consumedOffsets() {
    return Collections.unmodifiableMap(this.consumedOffsets);
}

 

Compatibility, Deprecation, and Migration Plan

No compatibility issues foreseen.

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

  • No labels