...
All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands) most likely the logic from those classes will be extracted and copied to separate classes (as proposed - TopicCommandHelper[32]
etc).
3. Admin Client
This component is intended to be a Kafka out-of-box client implementation for Admin commands.
Admin client will use Kafka NetworkClient
facility from /clients
for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).
Proposed API [4]:
Expand | ||
---|---|---|
| ||
public class AdminClient { /** * A client is instantiated by providing a set of key-value pairs as configuration. Most * of the settings will be related to NetworkClient * * @param properties settings related to Network client and at least one broker from KafkaCluster to connect to */ public AdminClient(Properties properties) /** * Create topic with given number of partitions and replication factor, replica assignment will be handled by Kafka cluster * * @throws ApiException */ public void createTopic(String topicName, int partitions, int replicationFactor, List<ConfigEntry> configs) throws ApiException; /** * Create topic with specified replica assignment (number of partitions and replication factor will be taken * from replica assignment string) * * @throws ApiException */ public void createTopic(String topicName, String replicaAssignment, List<ConfigEntry> configs) throws ApiException; /** * Alter existing topic partitions and/or replica assignment among Kafka brokers * * @throws ApiException */ public void alterTopic(String topicName, Integer partitions, String replicaAssignment, List<ConfigEntry> addedConfigs, List<String> deletedConfigs) throws ApiException; /** * Delete Kafka topic by name * * @throws ApiException */ public void deleteTopic(String topicName) throws ApiException; /** * List all existing topics in Kafka cluster * * @throws ApiException */ public List<String> listTopics() throws ApiException; /** * Request replication information about Kafka topic * * @throws ApiException */ public DescribeTopicOutput describeTopic(String topicName) throws ApiException; /** * Initiate long-running reassign partitions procedure * * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand) * @return future of the reassignment result which is completed once server-side partitions reassignment has succeeded or * an error occurred so that partitions reassignment cannot be started * @throws ApiException */ public Future<ReassignPartitionsResponse> reassignPartitions(String partitionsReassignment) throws ApiException; /** * Check the interim status of the partitions reassignment * * @param partitionsReassignment manual partitions assignment string (according to ReassignPartitionsCommand) * @return partition to reassignment result code (completed, in-progress, failed) * @throws ApiException */ public Map<TopicPartition, Short> verifyReassignPartitions(String partitionsReassignment) throws ApiException; /** * Initiate long-running preferred replica leader election procedure * * @param partitions serialized partitions for which preferred replica leader election will be started * (according to PreferredReplicaLeaderElectionCommand) * @return future of the election result which is completed once server-side preferred replica is elected for provided partitions or * an error has occurred * @throws ApiException */ public Future<PreferredReplicaLeaderElectionResponse> preferredReplicaLeaderElection(String partitions) throws ApiException; /** * Check the interim status of the preferred replica leader election * * @param partitions for which preferred replica leader election was started (according to PreferredReplicaLeaderElectionCommand) * @return partition to reassignment result code (completed, in-progress, failed) * @throws ApiException */ public VerifyPreferredReplicaLeaderElectionResponse verifyPreferredReplicaLeaderElection(String partitions) throws ApiException; /** * A generic facility to send Admin request and return response counterpart * * @param adminRequest AdminRequest message * @param <T> concrete AdminRequest type * @return response counterpart * @throws ApiException */ private <T extends AbstractAdminResponse> T sendAdminRequest(AbstractAdminRequest<T> adminRequest) throws ApiException; /** * Refreshes cluster metadata cache - list of brokers and controller * * @throws ApiException */ private void updateClusterMetadata() throws Exception; } |
...
# Start kafka.sh in shell mode bin/kafka.sh --shell --broker-list <host1 : port1> Connected to Kafka Controller at <host2 : port2>.
kafka> create-topic --topic my_topic --partitions 5 --replication-factor 3 Topic "my_topic" is created. kafka> alter-topic --topic my_topic --partitions 10 Topic "my_topic" is changed. # Switch topic context kafka> topic my_topic Switched to "my_topic" topic. # Execute topic command for switched topic kafka my_topic> describe-topic "my-topic" details: Topic: my_topic Partitions: 10 ... # Switch off topic context kafka my_topic> topic kafka> |
Open questions:
ClusterMetadata
duplicatesTopicMetadata
- we can extendTopicMetadata
with controllerId information and probably something else. Other alternative - is a generic server-side re-routing facility (see KAFKA-1912 for details).- We might extend error codes to fulfill all possible failures and give up using
outcome
/errorDescription
field as a generic result descriptionExtension to TopicMetadaRequest were also proposed in KIP-11 - it probably makes sense to sync our changes. - It is proposed to create a separate ticket to rework topic command to execute commands directly by the controller instead of using zookeeper admin path to notify controller about the change.
AdminClient
may need to support batching admin operations. It is considered whether we can cover it with allowing user to supply a regexp for topic name inAlterTopic
,DeleteTopic
,DescribeTopic
requests (similarly toTopicCommand.scala
)
Compatibility, Deprecation, and Migration Plan
...