Versions Compared


  • This line was added.
  • This line was removed.
  • Formatting was changed.


All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands)  most likely the logic from those classes will be extracted and copied  to separate classes (as proposed - TopicCommandHelper[32] etc).

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).

Proposed API [4]:

titleAdminClient API
public class AdminClient {
     * A client is instantiated by providing a set of key-value pairs as configuration. Most
	 * of the settings will be related to NetworkClient
     * @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
    public AdminClient(Properties properties) 
     * Create topic with given number of partitions and replication factor, replica assignment will be handled by Kafka cluster
     * @throws ApiException
    public void createTopic(String topicName, int partitions, int replicationFactor, List<ConfigEntry> configs) throws ApiException;
     * Create topic with specified replica assignment (number of partitions and replication factor will be taken
     * from replica assignment string)
     * @throws ApiException
    public void createTopic(String topicName, String replicaAssignment, List<ConfigEntry> configs) throws ApiException;
     * Alter existing topic partitions and/or replica assignment among Kafka brokers
     * @throws ApiException
    public void alterTopic(String topicName, Integer partitions, String replicaAssignment,
                                    List<ConfigEntry> addedConfigs, List<String> deletedConfigs) throws ApiException;
     * Delete Kafka topic by name
     * @throws ApiException
    public void deleteTopic(String topicName) throws ApiException;
     * List all existing topics in Kafka cluster
     * @throws ApiException
    public List<String> listTopics() throws ApiException;
     * Request replication information about Kafka topic
     * @throws ApiException
    public DescribeTopicOutput describeTopic(String topicName) throws ApiException;
     * Initiate long-running reassign partitions procedure
     * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
     * @return future of the reassignment result which is completed once server-side partitions reassignment has succeeded or
     * an error occurred so that partitions reassignment cannot be started
     * @throws ApiException
    public Future<ReassignPartitionsResponse> reassignPartitions(String partitionsReassignment) throws ApiException;

     * Check the interim status of the partitions reassignment
     * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand)
     * @return partition to reassignment result code (completed, in-progress, failed)
     * @throws ApiException
    public Map<TopicPartition, Short> verifyReassignPartitions(String partitionsReassignment) throws ApiException;
     * Initiate long-running preferred replica leader election procedure
     * @param partitions serialized partitions for which preferred replica leader election will be started
     *                   (according to PreferredReplicaLeaderElectionCommand)
     * @return future of the election result which is completed once server-side preferred replica is elected for provided partitions or
     * an error has occurred
     * @throws ApiException
    public Future<PreferredReplicaLeaderElectionResponse> preferredReplicaLeaderElection(String partitions) throws ApiException;

     * Check the interim status of the preferred replica leader election
     * @param partitions for which preferred replica leader election was started (according to PreferredReplicaLeaderElectionCommand)
     * @return partition to reassignment result code (completed, in-progress, failed)
     * @throws ApiException
    public VerifyPreferredReplicaLeaderElectionResponse verifyPreferredReplicaLeaderElection(String partitions)
            throws ApiException;
     * A generic facility to send Admin request and return response counterpart
     * @param adminRequest AdminRequest message
     * @param <T>          concrete AdminRequest type
     * @return response counterpart
     * @throws ApiException
    private <T extends AbstractAdminResponse> T sendAdminRequest(AbstractAdminRequest<T> adminRequest) throws ApiException;

     * Refreshes cluster metadata cache - list of brokers and controller
     * @throws ApiException
    private void updateClusterMetadata() throws Exception;



# Start in shell mode
bin/ --shell --broker-list <host1 : port1>
Connected to Kafka Controller at <host2 : port2>.

kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3
Topic "my_topic" is created.

kafka> alter-topic --topic my_topic --partitions 10
Topic "my_topic" is changed.
# Switch topic context
kafka> topic my_topic
Switched to "my_topic" topic.

# Execute topic command for switched topic
kafka my_topic> describe-topic
"my-topic" details:
Topic: my_topic Partitions: 10 ...

# Switch off topic context
kafka my_topic> topic

Open questions:

  1. ClusterMetadata duplicates TopicMetadata - we can extend TopicMetadata with controllerId information and probably something else. Other alternative - is a generic server-side re-routing facility (see KAFKA-1912 for details).
  2. We might extend error codes to fulfill all possible failures and give up using outcome / errorDescription field as a generic result descriptionExtension to TopicMetadaRequest were also proposed in KIP-11 - it probably makes sense to sync our changes.
  3. It is proposed to create a separate ticket to rework topic command to execute commands directly by the controller instead of using zookeeper admin path to notify controller about the change.AdminClient may need to support batching admin operations. It is considered whether we can cover it with allowing user to supply a regexp for topic name in AlterTopic, DeleteTopic, DescribeTopic requests (similarly to TopicCommand.scala)

Compatibility, Deprecation, and Migration Plan
