Table of Contents |
---|
Status
Current state: Voting in progress Accepted
Discussion thread: here
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
PR : https://github.com/apache/kafka/pull/4931
Released: 2.0.0
Motivation
Connect Framework offers REST API that is used to mange the lifecycle of the connector. Its imperative in most enterprises to secure the API and also add authorization to the end points. We could add the ability for authentication and authorization in the framework. But the security requirements are so broad that it's not practical to support all of them in the framework. Hence we must provide ability for users to plug resources that help achieve the required capabilities.
...
Code Block |
---|
package org.apache.kafka.connect.health; interface ConnectClusterState{ /** * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered * from the current configuration. */ Collection<String> connectors(); /** * Lookup the current status of a connector. * @param connName name of the connector */ ConnectorHealth connectorHealth(String connName); } package org.apache.kafka.connect.health; public class ConnectorHealth { private final String name; private final ConnectorState connector; private final Map<Integer, TaskState> tasks; private final ConnectorType type; public ConnectorHealth(String name, ConnectorState connector, Map<Integer, TaskState> tasks, ConnectorType type) { this.name = name; this.connector = connector; this.tasks = tasks; this.type = type; } public String name() { return name; } public ConnectorState connectorState() { return connector; } public Map<Integer, TaskState> tasksState() { return tasks; } public ConnectorType type() { return type; } } public abstract class AbstractState { private final String state; private final String trace; private final String workerId; public AbstractState(String state, String workerId, String trace) { this.state = state; this.workerId = workerId; this.trace = trace; } public String state() { return state; } public String workerId() { return workerId; } public String trace() { return trace; } } public class ConnectorState extends AbstractState { public ConnectorState(String state, String worker, String msg) { super(state, worker, msg); } } public class TaskState extends AbstractState implements Comparable<TaskState> { private final int taskId; public TaskState(int taskId, String state, String workerworkerId, String msg) { super(state, workerworkerId, msg); this.taskId = taskId; } public int taskId() { return taskId; } @Override public int compareTo(TaskState that) { return Integer.compare(this.taskId, that.taskId); } @Override public boolean equals(Object o) { if (o == this) { return true; } if (!(o instanceof TaskState)) { return false; } TaskState other = (TaskState) o; return compareTo(other) == 0; } @Override public int hashCode() { return Objects.hash(taskId); } } |
This also introduces a new configuration that rest.extension.classes that allows to configure a comma separated list of Rest extension implementations.
...