Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

Code Block
package org.apache.kafka.connect.health;
interface ConnectClusterState{
    /**
     * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered
     * from the current configuration.
     */
    Collection<String> connectors();
 
    /**
     * Lookup the current status of a connector.
     * @param connName name of the connector
     */
    ConnectorHealth connectorHealth(String connName);
 
}
 
package org.apache.kafka.connect.health;
public class ConnectorHealth {

    private final String name;
    private final ConnectorState connector;
    private final Map<Integer, TaskState> tasks;
    private final ConnectorType type;


    public ConnectorHealth(String name,
                           ConnectorState connector,
                           Map<Integer, TaskState> tasks,
                           ConnectorType type) {
        this.name = name;
        this.connector = connector;
        this.tasks = tasks;
        this.type = type;
    }


    public String name() {
        return name;
    }


    public ConnectorState connectorState() {
        return connector;
    }


    public Map<Integer, TaskState> tasksState() {
        return tasks;
    }


    public ConnectorType type() {
        return type;
    }

 
}
 
public abstract class AbstractState {

    private final String state;
    private final String trace;
    private final String workerId;

    public AbstractState(String state, String workerId, String trace) {
        this.state = state;
        this.workerId = workerId;
        this.trace = trace;
    }

    public String state() {
        return state;
    }

    public String workerId() {
        return workerId;
    }

    public String trace() {
        return trace;
    }
}


public class ConnectorState extends AbstractState {

    public ConnectorState(String state, String worker, String msg) {
        super(state, worker, msg);
    }
}
 
public class TaskState extends AbstractState implements Comparable<TaskState> {

    private final int taskId;

    public TaskState(int taskId, String state, String workerworkerId, String msg) {
        super(state, workerworkerId, msg);
        this.taskId = taskId;
    }

    public int taskId() {
        return taskId;
    }

    @Override
    public int compareTo(TaskState that) {
        return Integer.compare(this.taskId, that.taskId);
    }

    @Override
    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof TaskState)) {
            return false;
        }
        TaskState other = (TaskState) o;
        return compareTo(other) == 0;
    }

    @Override
    public int hashCode() {
        return Objects.hash(taskId);
    }
}



This also introduces a new configuration that  rest.extension.classes that allows to configure a comma separated list of Rest extension implementations.

...