Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

CreateTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

CreateTopicResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Alter Topic Request

 

AlterTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment [AddedConfigEntry] [DeletedConfig]]
TopicName => string
Replicas => int32
Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AddedConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
 DeletedConfig => string
AlterTopicRequest is a batch asynchronous request to initiate topic alteration: replication parameters, replica assingment or add/remove topic level configuration.
Request semantics:

1. If ReplicaAssignment is defined

    ReplicationFactor and Partitions arguments are ignored in this case.

    For each partition in ReplicaAssignment:

    1.1 If such partition exists and assignment is different from the current replica assignment

        It's a "reassign partition" request - add it reassign-partitions json

    1.2 If such partition doesn't exist

        It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic

2. Else if ReplicationFactor is defined

    2.1 If Partitions is defined    

        Regenerate replica assignment for all existing and newly added partitions, goto 1.

    2.2 If Partitions is not defined     

        Regenerate replica assignment only for existing partitions, goto 1.

3. Else if Partition is defined (ReplicaAssignment and ReplicationFactor are not defined):

    3.1 If Partitions is less than current number of partitions return error code IncreasePartitionsNotAllowed

    3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.

 

Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.

   

Alter Topic Response

 

AlterTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicResponse is similar to CreateTopicResponse.
Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string

 

DeleteTopicRequest requires only topic names which should be deleted.
One DeleteTopicRequest may include only one topic deletion command for the topic with the given name in one batch, otherwise MultipleTopicInstructionsInOneBatch is returned.
Delete Topic Response

 

DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicResponse is similar to CreateTopicResponse.

Topic Metadata Request V1

 

TopicMetadataReqeust_V1 => [TopicName]
TopicName => string
TopicMetadataRequest_V1 is an evolved version of TopicMetadataRequest. This request is intended to support two admin operations - get actual topic metadata , information and check verify whether some particular admin command (which are all asynchronousrequest (Create, Alter, Delete) has been completed by checking the expected and actual state of the topic.
TopicMetadataRequest_V1 requires only topic names. As with the first version, an empty topic name set results in returning information for all existing topics.
Also TopicMetadataRequest_V1 comparing to the previous version wondoesn't trigger topic creation automatically if the topic with the given name doesn't exist.
Topic Metadata Response V1

 

TopicMetadataResponse_V1 => [Broker][TopicMetadata]
Broker => NodeId Host Port  (any number of brokers may be returned)Port
NodeId => int32
Host => string
Port => int32
TopicMetadata => TopicErrorCode TopicName [PartitionMetadata] [ConfigEntry]
TopicErrorCode => int16
PartitionMetadata => PartitionErrorCode PartitionId Leader ReplicasLag Isr
PartitionErrorCode => int16
PartitionId => int32
Leader => int32
ReplicasLag => [int32 int32]
Isr => [int32]
ConfigEntry => string string

The new version of TopicMetadataResponse in addition to TopicMetadataResponse_V0 will include topic level configuration for each topic and replica fetch lag per partition - how far partition replica is behind from the leader replica.

Replication Commands Schema

Preferred Replica Leader Election Request

 

PreferredReplicaLeaderElectionRequest => [Topic [PartitionId]]
Topic => string
PartitionId => int32
PreferredReplicaLeaderEleactionRequest initiates preferred replica leader election procedure. Similar to Topic Admin requests this request in intended to be non-blocking. The schema consist of one field - array of partitions for which preferred replica leader should be elected.

To start preferred replica leader election procedure for all existing partition an empty partitions array should be sent.

Preferred Replica Leader Election Response

 

PreferredReplicaLeaderElectionResponse => [Topic ErrorCode]
Topic => string
ErrorCode => int16

 

PreferredReplicaLeaderElectionResponse is similar to ReassignPartitionsResponse.

Status of the procedure may be checked with TopicMetadataRequest  - the head of replicas list field and leader broker should be the same.

2. Server-side Admin Request handlers

All incoming request will be handled by specific helper classes called from KafkaApis - TopicCommandHelper for topic admin commands, ReassignPartitionsCommandHelper and PreferredReplicaLeaderElectionCommandHelper.

All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands)  most likely the logic from those classes will be extracted and copied  to separate classes (as proposed - TopicCommandHelper etc).

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).

Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.

Proposed API:

. ISR data is removed because only the leading broker manages this metadata accurately.

2. Server-side Admin Request handlers

All incoming request will be handled by specific helper classes called from KafkaApis - TopicCommandHelper for topic admin commands, ReassignPartitionsCommandHelper and PreferredReplicaLeaderElectionCommandHelper.

All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Unfortunately most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands)  most likely the logic from those classes will be extracted and copied  to separate classes (as proposed - TopicCommandHelper etc).

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).

Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.

Proposed API:

Expand
titleAdminClient API

public class AdminClient {

/**
* A client is instantiated by providing a set of key-value pairs as configuration. Most
* of the settings will be related to NetworkClient
*
* @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
*/
public AdminClient(Properties properties)

Expand
titleAdminClient API

public class AdminClient {
/**
* A client is instantiated by providing a set of key-value pairs as configuration. Most
* of the settings will be related to NetworkClient
*
* @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
*/
public AdminClient(Properties properties);
/**
* Initiates topics creation.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata about newly created topics was propagated
* to all brokers
*
* @param createTopicRequestBody holder (built by means of respective Builder) of all required arguments to create topics
* @return java.util.concurrent.Future which holds topics creation result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> createTopics(CreateTopicRequestBody createTopicRequestBody) throws ApiException;
/**
* Initiates topics alteration.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated metadata about altered topics was propagated
* to all brokers
*
* @param alterTopicRequestBody holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> alterTopics(AlterTopicRequestBody alterTopicRequestBody) throws ApiException;

/**
* Initiates topic deletiontopics creation.
* This is an asynchronous call, it returns immediately once the server has accepted request and marked requested topics for deletion stored respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata with updated topic list about newly created topics was propagated to
* to all brokers
*
* @param topics topic names to be deletedcreateTopicRequestBody holder (built by means of respective Builder) of all required arguments to create topics
* @return java.util.concurrent.Future which holds topics deletion creation result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic deletion creation was not even started
*/
public Future<Map<String, Errors>> deleteTopicscreateTopics(List<String> topicsCreateTopicRequestBody createTopicRequestBody) throws ApiException;

/**
* Initiates topics alteration.
*
* Lists all available topics in Kafka cluster.
* Topic is considered available if all brokers in cluster have received and cached metadata about it
*
* @return list of topic namesThis is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated metadata about altered topics was propagated
* to all brokers
*
* @param alterTopicRequestBody holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public List<String> listTopics(Future<Map<String, Errors>> alterTopics(AlterTopicRequestBody alterTopicRequestBody) throws ApiException;

/**
* TODO: not finalized yet
* Request replication information about Kafka topics
*
* @return a mapping between topic name and topic description
* @throws ApiException in case of global error, which means topic description cannot be fetched for all topics
*/
public Map<String, DescribeTopicOutput> describeTopics(List<String> topicNames) throws ApiException;
/**
* Initiates long-running reassign partitions procedure.
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure all that all partitions reassignments have completed.
* Note: currently there are only two possible states for reassigned partition: Completed, In Progress.
*
* @param reassignmentData schema among which replicas partitions will be reassigned
*
* @return java.util.concurrent.Future which is completed once all partitions have been reassigned
*
* @throws ApiException in case partition reassignment wasn't initiated on server
*/
public Future<Void> reassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException;/**
* Checks the interim status of the partitions reassignment.
* Reassignment for concrete partition is considered completed if partition has been removed from
* admin zookeeper path and all cluster brokers have received and cached relevant AR metadata for the
* given partition
*
* @param reassignmentData schema same as was used for reassign partitions request
*
* @return two maps - completed and partitions for which reassignment is still in progress
* @throws ApiException in case reassignment verification wasn't initiated on server
*/
public ReassignmentResult verifyReassignPartitions(PartitionReassignmentData reassignmentDataInitiates topic deletion.
* This is an asynchronous call, it returns immediately once server has accepted request and marked requested topics for deletion in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata with updated topic list was propagated to
* all brokers
*
* @param topics topic names to be deleted
* @return java.util.concurrent.Future which holds topics deletion result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic deletion was not even started
*/
public Future<Map<String, Errors>> deleteTopics(List<String> topics) throws ApiException;

/**
* Lists all available topics in Kafka cluster.
* Topic is considered available if all brokers in cluster have received and cached metadata about it
*
* @return list of topic names
*
* @throws ApiException
*/
public List<String> listTopics() throws ApiException;

/**
* TODO: not finalized yet
* Request replication information about Kafka topics
*
* @return a mapping between topic name and topic description
* @throws ApiException in case of global error, which means topic description cannot be fetched for all topics
*/
public Map<String, DescribeTopicOutput> describeTopics(List<String> topicNames) throws ApiException;

/**
* Initiates long-running preferred replica leader election procedurereassign partitions procedure.
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure all that all partitions leader has moved to
* preferred replicareassignments have completed.
* Note: currently there are only two possible states for preferred replica leader electionreassigned partition: Completed, In Progress.
*
* @param preferredReplicaElectionData that need to moved leader to preferred replica reassignmentData schema among which replicas partitions will be reassigned
*
* @return java.util.concurrent.Future which is completed once all partitions have moved leader to preferred replica been reassigned
*
* @throws ApiException in case preferred replica leader election partition reassignment wasn't initiated on server
*/
public Future<Void> preferredReplicaLeaderElectionreassignPartitions(PreferredReplicaLeaderElectionData preferredReplicaElectionDataPartitionReassignmentData reassignmentData) throws ApiException;


/**
* Checks the interim status of the preferred replica leader electionpartitions reassignment.
* Preferred replica leader election Reassignment for concrete partition is considered completed if partition has been removed from
* admin zookeeper path and all cluster brokers have received and cached
* relevant AR metadata for the
* given partition
*
* @param preferredReplicaElectionData reassignmentData schema same partitions as for preferred replica leader election was used for reassign partitions request
*
* @return two maps - completed and partitions for which procedure reassignment is still in progress
* @throws ApiException in case preferred replica election reassignment verification wasn't initiated on server
*/
public PreferredReplicaLeaderElectionResult verifyPreferredReplicaLeaderElection(PreferredReplicaLeaderElectionData preferredReplicaElectionData)
ReassignmentResult verifyReassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException;

}

4. Interactive Shell / CLI tool

...