...
CreateTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string
|
CreateTopicResponse
contains a map between topic and topic creation result error code (see New Protocol Errors).
Alter Topic Request
AlterTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment [AddedConfigEntry] [DeletedConfig]]
TopicName => string Replicas => int32 Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AddedConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => string Deleted Config => string |
AlterTopicRequest
is a batch asynchronous request to initiate topic alteration: replication parameters, replica assingment or add/remove topic level configuration.
Request semantics:
1. If ReplicaAssignment is defined
ReplicationFactor and Partitions arguments are ignored in this case.
For each partition in ReplicaAssignment:
1.1 If such partition exists and assignment is different from the current replica assignment
It's a "reassign partition" request - add it reassign-partitions json
1.2 If such partition doesn't exist
It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic
2. Else if ReplicationFactor is defined
2.1 If Partitions is defined
Regenerate replica assignment for all existing and newly added partitions, goto 1.
2.2 If Partitions is not defined
Regenerate replica assignment only for existing partitions, goto 1.
3. Else if Partition is defined (ReplicaAssignment and ReplicationFactor are not defined):
3.1 If Partitions is less than current number of partitions return error code IncreasePartitionsNotAllowed
3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.
Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Alter Topic Response
AlterTopicResponse => [TopicName ErrorCode]ErrorCode => int16
TopicName => string
|
AlterTopicResponse
is similar to CreateTopicResponse
.
Delete Topic Request
DeleteTopicRequest => [TopicName]
TopicName => string
|
DeleteTopicRequest
requires only topic names which should be deleted.
One DeleteTopicRequest
may include only one topic deletion command for the topic with the given name in one batch, otherwise MultipleTopicInstructionsInOneBatch
is returned.
Delete Topic Response
DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string
|
DeleteTopicResponse
is similar to CreateTopicResponse
.
TopicMetadataReqeust_V1 => [ TopicName]TopicName => string
|
TopicMetadataRequest_V1
is an evolved version of TopicMetadataRequest
. This request is intended to support two admin operations - get actual topic metadata , information and check verify whether some particular admin command (which are all asynchronousrequest (Create, Alter, Delete) has been completed by checking the expected and actual state of the topic.
TopicMetadataRequest_V1
requires only topic names. As with the first version, an empty topic name set results in returning information for all existing topics.
Also TopicMetadataRequest_V1
comparing to the previous version wondoesn't trigger topic creation automatically if the topic with the given name doesn't exist.
TopicMetadataResponse_V1 => [Broker][TopicMetadata]
Broker => NodeId Host Port (any number of brokers may be returned)Port
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] [ConfigEntry]
TopicErrorCode => int16 PartitionMetadata => PartitionErrorCode PartitionId Leader ReplicasLag Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
ReplicasLag => [int32 int32]
Isr => [int32]
ConfigEntry => string string
|
The new version of TopicMetadataResponse
in addition to TopicMetadataResponse_V0
will include topic level configuration for each topic and replica fetch lag per partition - how far partition replica is behind from the leader replica.
Replication Commands Schema
Preferred Replica Leader Election Request
PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]]
Topic => string
PartitionId => int32
|
PreferredReplicaLeaderEleactionRequest
initiates preferred replica leader election procedure. Similar to Topic Admin requests this request in intended to be non-blocking. The schema consist of one field - array of partitions for which preferred replica leader should be elected.
To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.
Preferred Replica Leader Election Response
PreferredReplicaLeaderElectionResponse => [Topic ErrorCode]
Topic => string
ErrorCode => int16
|
PreferredReplicaLeaderElectionResponse
is similar to ReassignPartitionsResponse
.
Status of the procedure may be checked with TopicMetadataRequest
- the head of replicas
list field and leader
broker should be the same.
2. Server-side Admin Request handlers
All incoming request will be handled by specific helper classes called from KafkaApis
- TopicCommandHelper
for topic admin commands, ReassignPartitionsCommandHelper
and PreferredReplicaLeaderElectionCommandHelper
.
All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands) most likely the logic from those classes will be extracted and copied to separate classes (as proposed - TopicCommandHelper
etc).
3. Admin Client
This component is intended to be a Kafka out-of-box client implementation for Admin commands.
Admin client will use Kafka NetworkClient
facility from /clients
for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).
Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.
Proposed API:
. ISR data is removed because only the leading broker manages this metadata accurately.
2. Server-side Admin Request handlers
All incoming request will be handled by specific helper classes called from KafkaApis
- TopicCommandHelper
for topic admin commands, ReassignPartitionsCommandHelper
and PreferredReplicaLeaderElectionCommandHelper
.
All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands) most likely the logic from those classes will be extracted and copied to separate classes (as proposed - TopicCommandHelper
etc).
3. Admin Client
This component is intended to be a Kafka out-of-box client implementation for Admin commands.
Admin client will use Kafka NetworkClient
facility from /clients
for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).
Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.
Proposed API:
Expand |
---|
|
public class AdminClient {
/**
* A client is instantiated by providing a set of key-value pairs as configuration. Most
* of the settings will be related to NetworkClient
*
* @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
*/
public AdminClient(Properties properties)
|
Expand |
---|
|
public class AdminClient {
/**
* A client is instantiated by providing a set of key-value pairs as configuration. Most
* of the settings will be related to NetworkClient
*
* @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
*/
public AdminClient(Properties properties);
/**
* Initiates topics creation.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata about newly created topics was propagated
* to all brokers
*
* @param createTopicRequestBody holder (built by means of respective Builder) of all required arguments to create topics
* @return java.util.concurrent.Future which holds topics creation result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> createTopics(CreateTopicRequestBody createTopicRequestBody) throws ApiException;
/**
* Initiates topics alteration.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated metadata about altered topics was propagated
* to all brokers
*
* @param alterTopicRequestBody holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> alterTopics(AlterTopicRequestBody alterTopicRequestBody) throws ApiException;
/**
* Initiates topic deletiontopics creation.
* This is an asynchronous call, it returns immediately once the server has accepted request and marked requested topics for deletion stored respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata with updated topic list about newly created topics was propagated to
* to all brokers
*
* @param topics topic names to be deletedcreateTopicRequestBody holder (built by means of respective Builder) of all required arguments to create topics
* @return java.util.concurrent.Future which holds topics deletion creation result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic deletion creation was not even started
*/
public Future<Map<String, Errors>> deleteTopicscreateTopics(List<String> topicsCreateTopicRequestBody createTopicRequestBody) throws ApiException;
/**
* Initiates topics alteration.
*
* Lists all available topics in Kafka cluster.
* Topic is considered available if all brokers in cluster have received and cached metadata about it
*
* @return list of topic names This is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated metadata about altered topics was propagated
* to all brokers
*
* @param alterTopicRequestBody holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public List<String> listTopics(Future<Map<String, Errors>> alterTopics(AlterTopicRequestBody alterTopicRequestBody) throws ApiException;
/**
* TODO: not finalized yet
* Request replication information about Kafka topics
*
* @return a mapping between topic name and topic description
* @throws ApiException in case of global error, which means topic description cannot be fetched for all topics
*/
public Map<String, DescribeTopicOutput> describeTopics(List<String> topicNames) throws ApiException;
/**
* Initiates long-running reassign partitions procedure.
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure all that all partitions reassignments have completed.
* Note: currently there are only two possible states for reassigned partition: Completed, In Progress.
*
* @param reassignmentData schema among which replicas partitions will be reassigned
*
* @return java.util.concurrent.Future which is completed once all partitions have been reassigned
*
* @throws ApiException in case partition reassignment wasn't initiated on server
*/
public Future<Void> reassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException; /**
* Checks the interim status of the partitions reassignment.
* Reassignment for concrete partition is considered completed if partition has been removed from
* admin zookeeper path and all cluster brokers have received and cached relevant AR metadata for the
* given partition
*
* @param reassignmentData schema same as was used for reassign partitions request
*
* @return two maps - completed and partitions for which reassignment is still in progress
* @throws ApiException in case reassignment verification wasn't initiated on server
*/
public ReassignmentResult verifyReassignPartitions(PartitionReassignmentData reassignmentDataInitiates topic deletion.
* This is an asynchronous call, it returns immediately once server has accepted request and marked requested topics for deletion in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata with updated topic list was propagated to
* all brokers
*
* @param topics topic names to be deleted
* @return java.util.concurrent.Future which holds topics deletion result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic deletion was not even started
*/
public Future<Map<String, Errors>> deleteTopics(List<String> topics) throws ApiException;
/**
* Lists all available topics in Kafka cluster.
* Topic is considered available if all brokers in cluster have received and cached metadata about it
*
* @return list of topic names
*
* @throws ApiException
*/
public List<String> listTopics() throws ApiException;
/**
* TODO: not finalized yet
* Request replication information about Kafka topics
*
* @return a mapping between topic name and topic description
* @throws ApiException in case of global error, which means topic description cannot be fetched for all topics
*/
public Map<String, DescribeTopicOutput> describeTopics(List<String> topicNames) throws ApiException;
/**
* Initiates long-running preferred replica leader election procedurereassign partitions procedure.
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure all that all partitions leader has moved to
* preferred replicareassignments have completed.
* Note: currently there are only two possible states for preferred replica leader electionreassigned partition: Completed, In Progress.
*
* @param preferredReplicaElectionData that need to moved leader to preferred replica reassignmentData schema among which replicas partitions will be reassigned
*
* @return java.util.concurrent.Future which is completed once all partitions have moved leader to preferred replica been reassigned
*
* @throws ApiException in case preferred replica leader election partition reassignment wasn't initiated on server
*/
public Future<Void> preferredReplicaLeaderElectionreassignPartitions(PreferredReplicaLeaderElectionData preferredReplicaElectionDataPartitionReassignmentData reassignmentData) throws ApiException;
/**
* Checks the interim status of the preferred replica leader electionpartitions reassignment.
* Preferred replica leader election Reassignment for concrete partition is considered completed if partition has been removed from
* admin zookeeper path and all cluster brokers have received and cached
* relevant AR metadata for the
* given partition
*
* @param preferredReplicaElectionData reassignmentData schema same partitions as for preferred replica leader election was used for reassign partitions request
*
* @return two maps - completed and partitions for which procedure reassignment is still in progress
* @throws ApiException in case preferred replica election reassignment verification wasn't initiated on server
*/
public PreferredReplicaLeaderElectionResult verifyPreferredReplicaLeaderElection(PreferredReplicaLeaderElectionData preferredReplicaElectionData)
ReassignmentResult verifyReassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException;
} |
4. Interactive Shell / CLI tool
...