...
All admin messages listed below are required to be sent only to Controller broker. Only controller will process such messages. If Admin message is sent to an ordinary broker a special error code is returned (code 22
). In case of other failure during processing message AdminRequestFailedError
is returned [2].
Error | Code | Description |
---|---|---|
| 21 | Unexpected error occurred while processing Admin request. |
NotControllerForAdminRequest | 22 | Target broker (id=<this_broker_id>) is not serving a controller's role. |
ClusterMetadata Schema
Cluster Metadata Request
ClusterMetadataRequest => |
Cluster Metadata Response
ClusterMetadataResponse => ErrorCode [Broker] ?(Controller)
Broker => NodeId Host Port NodeId => int32
Port => int32 Controller => Broker |
ClusteMetadataRequest
is a request with no arguments.
ClusterMetadataResponse
holds error code (0
in case of successful result), list of brokers in cluster and optionally broker serving a Controller's role (returning empty Controller most likely means either error during request processing or cluster being in some intermediate state).
ClusterMetadataRequest
is required for admin clients to get the Kafka brokers, specifically the controller's location, as only controller may execute admin command [2].
Topic Admin Schema
...
Admin client will use Kafka NetworkClient
facility from /clients
for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (as e.g. reassign partitions).
Proposed API:
Code Block | ||||||
---|---|---|---|---|---|---|
| ||||||
public class AdminClient {
/**
* A producer is instantiated by providing a set of key-value pairs as configuration. Most
* of the settings will be related to NetworkClient
*
* @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
*/
public AdminClient(Properties properties)
/**
* Create topic with given number of partitions and replication factor, replica assignment will be handled by Kafka cluster
*
* @throws ApiException
*/
public void createTopic(String topicName, int partitions, int replicationFactor, List<ConfigEntry> configs) throws ApiException;
/**
* Create topic with specified replica assignment (number of partitions and replication factor will be taken
* from replica assignment string)
*
* @throws ApiException
*/
public void createTopic(String topicName, String replicaAssignment, List<ConfigEntry> configs) throws ApiException;
/**
* Alter existing topic partitions and/or replica assignment among Kafka brokers
*
* @throws ApiException
*/
public void alterTopic(String topicName, Integer partitions, String replicaAssignment,
List<ConfigEntry> addedConfigs, List<String> deletedConfigs) throws ApiException;
/**
* Delete Kafka topic by name
*
* @throws ApiException
*/
public void deleteTopic(String topicName) throws ApiException;
/**
* List all existing topics in Kafka cluster
*
* @throws ApiException
*/
public List<String> listTopics() throws ApiException;
/**
* Request replication information about Kafka topic
*
* @throws ApiException
*/
public DescribeTopicOutput describeTopic(String topicName) throws ApiException;
/**
* Initiate long-running reassign partitions procedure
*
* @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
* @return future of the reassignment result which is completed once server-side partitions reassignment has succeeded or
* an error occurred so that partitions reassignment cannot be started
* @throws ApiException
*/
public Future<ReassignPartitionsResponse> reassignPartitions(String partitionsReassignment) throws ApiException;
/**
* Check the interim status of the partitions reassignment
*
* @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
* @return partition to reassignment result code (completed, in-progress, failed)
* @throws ApiException
*/
public Map<TopicPartition, Short> verifyReassignPartitions(String partitionsReassignment) throws ApiException;
/**
* Initiate long-running preferred replica leader election procedure
*
* @param partitions serialized partitions for which preferred replica leader election will be started
* (according to PreferredReplicaLeaderElectionCommand)
* @return future of the election result which is completed once server-side preferred replica is elected for provided partitions or
* an error has occurred
* @throws ApiException
*/
public Future<PreferredReplicaLeaderElectionResponse> preferredReplicaLeaderElection(String partitions) throws ApiException;
/**
* Check the interim status of the preferred replica leader election
*
* @param partitions for which preferred replica leader election was started (according to PreferredReplicaLeaderElectionCommand)
* @return partition to reassignment result code (completed, in-progress, failed)
* @throws ApiException
*/
public VerifyPreferredReplicaLeaderElectionResponse verifyPreferredReplicaLeaderElection(String partitions)
throws ApiException;
/**
* A generic facility to send Admin request and return response counterpart
*
* @param adminRequest AdminRequest message
* @param <T> concrete AdminRequest type
* @return response counterpart
* @throws ApiException
*/
private <T extends AbstractAdminResponse> T sendAdminRequest(AbstractAdminRequest<T> adminRequest) throws ApiException;
/**
* Refreshes cluster metadata cache - list of brokers and controller
*
* @throws ApiException
*/
private void updateClusterMetadata() throws Exception;
} |
4. Interactive Shell / CLI tool
4. Interactive Shell / CLI tool
This component will wrap AdminClient
and provide an interactive shell-like environment for executing administrative commands. The goal of these changes is This component will wrap AdminClient
and provide an interactive shell-like environment for executing administrative commands. The goal of these changes is let people use existing standalone tools but from a single script, optionally running commands in shell, so commands arguments/names are not changed comparing to existing tools, where possible.
...
sudo bin/kafka.sh --list-topics --broker <host : port>
Open questions:
1
Compatibility, Deprecation, and Migration Plan
...