Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: Vote Accepted

Discussion thread: here 

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-6776

PRhttps://github.com/apache/kafka/pull/4931

Released: 2.0.0

Motivation

Connect Framework offers REST API that is used to mange the lifecycle of the connector. Its imperative in most enterprises to secure the API and also add authorization to the end points. We could add the ability for authentication and authorization in the framework. But the security requirements are so broad that it's not practical to support all of them in the framework.  Hence we must provide ability for users to plug resources that help achieve the required capabilities.

...

 

Code Block
package org.apache.kafka.connect.health;
interface ConnectClusterState{
    /**
     * Get a list of connectors currently running in this cluster. This is a full list of connectors in the cluster gathered
     * from the current configuration.
     */
    Collection<String> connectors();
 
    /**
     * Lookup the current status of a connector.
     * @param connName name of the connector
     */
    ConnectorHealth connectorHealth(String connName);
 
}
 
package org.apache.kafka.connect.health;
public class ConnectorHealth {

    private final String name;
    private final ConnectorState connector;
    private final Map<Integer, TaskState> tasks;
    private final ConnectorType type;


    public ConnectorHealth(String name,
                           ConnectorState connector,
                           Map<Integer, TaskState> tasks,
                           ConnectorType type) {
        this.name = name;
        this.connector = connector;
        this.tasks = tasks;
        this.type = type;
    }


    public String name() {
        return name;
    }


    public ConnectorState connectorState() {
        return connector;
    }


    public Map<Integer, TaskState> tasksState() {
        return tasks;
    }


    public ConnectorType type() {
        return type;
    }

 
}
 
public abstract class AbstractState {

    private final String state;
    private final String trace;
    private final String workerId;

    public AbstractState(String state, String workerId, String trace) {
        this.state = state;
        this.workerId = workerId;
        this.trace = trace;
    }

    public String state() {
        return state;
    }

    public String workerId() {
        return workerId;
    }

    public String trace() {
        return trace;
    }
}


public class ConnectorState extends AbstractState {

    public ConnectorState(String state, String worker, String msg) {
        super(state, worker, msg);
    }
}
 
public class TaskState extends AbstractState implements Comparable<TaskState> {

    private final int taskId;

    public TaskState(int taskId, String state, String workerworkerId, String msg) {
        super(state, workerworkerId, msg);
        this.taskId = taskId;
    }

    public int taskId() {
        return taskId;
    }

    @Override
    public int compareTo(TaskState that) {
        return Integer.compare(this.taskId, that.taskId);
    }

    @Override
    public boolean equals(Object o) {
        if (o == this) {
            return true;
        }
        if (!(o instanceof TaskState)) {
            return false;
        }
        TaskState other = (TaskState) o;
        return compareTo(other) == 0;
    }

    @Override
    public int hashCode() {
        return Objects.hash(taskId);
    }
}



This also introduces a new configuration that  rest.extension.classes that allows to configure a comma separated list of Rest extension implementations.

...