Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Changes that should be considered shortly after or are enabled by this KIP included:

  • Review privileges for listing and altering ACLs to be more fine grained.
  • Provide an Authorizer interface using the new Java classes used by the ACL requests/responses (KAFKA-3509)
    • Deprecate the old one to encourage transition
  • Define standard Exceptions that can be thrown by the Authorizer in the interface (KAFKA-3266)
    • Otherwise all exceptions are unknown server exception to the client
  • Consider building a sync call into the Authorizer to ensure changes are propagated

Details

1. Wire Protocol Extensions

Schema

  • General 

  • Metadata Schema

    • Consider supporting regex topic filters in the request 
    • Filter internal topics using the returned metadata
  • Topic Admin Schema

    • Support cluster consistent blocking to wait until all relevant brokers have the required metadata
    • Implement auto-topic creation client side (KAFKA-2410)
    • Add topic creation to the MirrorMaker client?
  • ACL Admin Schema

    • Review privileges for listing and altering ACLs to be more fine grained.
    • Provide an Authorizer interface using the new Java classes used by the ACL requests/responses (KAFKA-3509)
      • Deprecate the old one to encourage transition
    • Define standard Exceptions that can be thrown by the Authorizer in the interface (KAFKA-3266)
      • Otherwise all exceptions are unknown server exception to the client
    • Consider building a sync call into the Authorizer to ensure changes are propagated

Details

1. Wire Protocol Extensions

Schema

Overall the idea is to extend Wire Protocol to cover Overall the idea is to extend Wire Protocol to cover all existing admin commands so that a user does not need to talk directly to Zookeeper and all commands can be authenticated via Kafka. At the same time, since the Wire Protocol is a public API to the Kafka cluster, it was agreed that the new Admin schema needs to be "orthogonal", i.e. new messages shouldn't duplicate each other or existing requests, if those already cover particular use cases.

...

New Protocol Errors

It is proposed to use existing / add these error codes to the protocol.

Requests

Error

Description

TopicExistsException
TopicExistsTopic with this name already exists.CreateTopic
InvalidTopic (existing)Topic name contains invalid characters or doesn't exist.CreateTopic, AlterTopic, DeleteTopic
InvalidPartitionsException
InvalidPartitions
Partitions field is invalid (e.g. negative or increasing number of partitions in existing topic)CreateTopic, AlterTopic
InvalidReplicationFactor
InvalidReplicationFactorException
ReplicationFactor field is invalid (e.g. negative)CreateTopic, AlterTopic
InvalidReplicaAssignment
InvalidReplicaAssignmentException
ReplicaAssignment field is invalid (e.g. contains duplicates)

CreateTopic, AlterTopic

InvalidConfigurationException

Configuration

InvalidConfiguration

Either topic-level config setting or value is incorrect.

CreateTopic

NotControllerException
The request was routed to a broker that wasn't the active controller

Generally, a client should have enough context to provide descriptive error message.

The same notation as in  A Guide To The Kafka Protocol is used here. 

...

 

MetadataResponse => [brokers] controllerId [topic_metadata]   
brokers => node_id host port rack node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING controllerId => INT32 topic_metadata => topic_error_code topic is_internal [partition_metadata] topic_error_code => INT16 topic => STRING is_internal => BOOLEAN partition_metadata => partition_error_code partition_id leader [replicas] [isr] partition_error_code => INT16 partition_id => INT32 leader => INT32 replicas => INT32 isr => INT32

Adds rack, controller_id, and is_internal to the version 0 response.

The behavior of the replicas and isr arrays will be changed in order to support the admin tools, and better represent the state of the cluster:

  • In version 0, if a broker is down the replicas and isr array will omit the brokers entry and add a REPLICA_NOT_AVAILABLE error code.
  • In version 1, no error code will be set and a the broker id will be included in the replicas and isr array. 
    • Note: A user can still detect if the replica is not available, by checking if the broker is in the returned broker list.

...

Topic Admin Schema

...

Create Topic Request

 

ListAclsRequest
CreateTopic 
=> principal resource
Request (Version: 0) => [create_topic_requests] timeout 
  
resource
create_topic_requests => 
resource_type resource_name
topic partitions replication_factor [replica_assignment] [configs] 
    topic => STRING
    
resource_type
partitions => 
INT8
INT32
    
resource
replication_
name
factor => 
STRING
INT32
   
principal
 replica_assignment => 
NULLABLE_STRING
Request semantics:
  1. Can be sent to any broker
  2. If a non-null principal is provided the returned ACLs will be filtered by that principle, otherwise ACLs for all principals will be listed. 
  3. If a resource with a resource_type != -1 is provided ACLs will be filtered by that resource, otherwise ACLs for all resources will be listed.
  4. Any principle can list their own ACLs where the permission type is "Allow", Otherwise the principle must be authorized to the "All" Operation on the "Cluster" resource to list ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
List ACLs Response

 

ListAclsResponse => [responses] error_code 
responses => resource [acls] resource => resource_type resource_name resource_type => INT8 resource_name => STRING acls => acl_principle acl_permission_type acl_host acl_operation acl_principle => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 error_code => INT16
Alter ACLs Request

 

AlterAclsRequest => [requests]   requests => resource [actions] 
    resource => resource_type resource_name 
      resource_type => INT8
      resource_name => STRING
    actions => action acl 
      acl => acl_principle acl_permission_type acl_host acl_operation 
        acl_principle => STRING
        acl_permission_type => INT8
        acl_host => STRING
        acl_operation => INT8
      action => INT8
Request semantics:
  1. Can be sent to the controller broker
  2. Multiple instructions for the same resource in one request will be silently ignored, only the last from the list will be executed.
  3. ACLs with a delete action will be processed first

  4. The request is not transactional. One failure wont stop others from running.

  5. The principle must be authorized to the "All" Operation on the "Cluster" resource to alter ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
Alter ACLs Response

 

AlterAclsResponse => [responses]       
responses => resource [results]
resource => resource_type resource_name resource_type => INT8 resource_name => STRING results => action acl error_code acl => acl_principle acl_permission_type acl_host acl_operation acl_principle => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 action => INT8 error_code => INT16

 

Topic Admin Schema

Create Topic Request
partition_id [replicas] 
      partition_id => INT32
      replicas => INT32
    configs => config_key config_value 
      config_key => STRING
      config_value => STRING
  timeout => INT32

CreateTopicRequest is a batch request to initiate topic creation with either predefined or automatic replica assignment and optionally topic configuration.

Request semantics:

  1. Must be sent to the controller broker
  2. Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
    • This is because the list of topics is modeled server side as a map with TopicName as the key
  3. The principle must be authorized to the "Create" Operation on the "Cluster" resource to create topics. 
    • Unauthorized requests will receive a ClusterAuthorizationException
  4. Only one from ReplicaAssignment or (Partitions + ReplicationFactor), can be defined in one instruction. If both parameters are specified - ReplicaAssignment takes precedence.

    • In the case ReplicaAssignment is defined number of partitions and replicas will be calculated from the supplied ReplicaAssignment
    • In the case of defined (Partitions + ReplicationFactor) replica assignment will be automatically generated by the server.
    • One or the other must be defined. The existing broker side auto create defaults will not be used (default.replication.factor, num.partitions). The client implementation can have defaults for these options when generating the messages.
  5. Setting a timeout > 0 will allow the request to block until the topic metadata is "complete" on the controller node.
    • Complete means the topic metadata has been completely populated (leaders, replicas, ISRs)
    • If a timeout error occurs, the topic could still be created successfully at a later time. Its up to the client to query for the state at that point.
  6. The request is not transactional. 
    1. If an error occurs on one topic, the other could still be created.
    2. Errors are reported independently.

QA:

  • Why is CreateTopicRequest a batch request?
    • Scenarios where tools or admins want to create many topics should be able to with fewer requests
    • Example: MirrorMaker may want to create the topics downstream
  • Why impliment "partial blocking" instead of fully async of fully consistent?
  • What happens if some topics error immediately? Will it return immediately
    • The request will block until all topics have either been created, errors, or the timeout has been hit. 
    • There is no "short circuiting" where 1 error stops the other topics from being created. 
Create Topic Response

 

CreateTopic Response (Version: 0) => [topic_error_codes] 
  topic_error_codes => topic error_code 
    topic => STRING
    error_code => INT16

CreateTopicResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string

 

DeleteTopicRequest requires only topic names which should be deleted.
Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Delete Topic Response

 

DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicResponse is similar to CreateTopicResponse.

Alter Topic Request

 

AlterTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment]
TopicName => string
Replicas => int32
Partitions

 

CreateTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment [ConfigEntry]]
TopicName => string
Partitions => int32
Replicas
CreateTopicRequest is
=> int32
ReplicaAssignment => [PartitionId [ReplicaId]]
ConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
AlterTopicRequest is a batch asynchronous request to initiate topic
creation with either predefined or automatic
alteration: replication parameters and replica assignment
and optionally topic configuration
.
Request semantics:
  1. Only one from (Partitions + ReplicationFactor), ReplicaAssignment can be defined in one instruction. If both parameters are specified - ReplicaAssignment takes precedence.
  2. In case ReplicaAssignment is defined number of partitions and replicas will be calculated from the supplied ReplicaAssignment. In case of defined (Partitions + ReplicationFactor) replica assignment will be automatically generated by the server.
  3. Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Create Topic Response

...

 

CreateTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

CreateTopicResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Alter Topic Request

 

AlterTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment]
TopicName => string
Replicas => int32
Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AlterTopicRequest is a batch asynchronous request to initiate topic alteration: replication parameters and replica assignment.
Request semantics:

1. If ReplicaAssignment is defined

    ReplicationFactor and Partitions arguments are ignored in this case.

    For each partition in ReplicaAssignment:

    1.1 If such partition exists and assignment is different from the current replica assignment

        It's a "reassign partition" request - add it to reassign-partitions json

    1.2 If such partition doesn't exist

        It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic

2. Else if ReplicationFactor is defined

    2.1 If Partitions is defined    

        Regenerate replica assignment for all existing and newly added partitions, goto 1.

    2.2 If Partitions is not defined     

        Regenerate replica assignment only for existing partitions, goto 1.

3. Else if Partitions is defined (ReplicaAssignment and ReplicationFactor are not defined):

    3.1 If Partitions is less than current number of partitions return error code InvalidPartitions (since increasing number of partitions is not allowed).

    3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.

 

Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.

   

Alter Topic Response

...

 

AlterTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string
AlterTopicResponse is similar to CreateTopicResponse.
Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string

 

DeleteTopicRequest requires only topic names which should be deleted.
Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Delete Topic Response

...

 

DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicResponse is similar to CreateTopicResponse.

2. Server-side Admin Request handlers

At the highest level, admin requests will be handled on the brokers the same way that all message types are. However, because admin messages modify cluster metadata they should be handled by the controller. This allows the controller to propagate the changes to the rest of the cluster.  However, because the messages need to be handled by the controller does not necessarily mean they need to be sent directly to the controller. A message forwarding mechanism can be used to forward the message from any broker to the correct broker for handling. 

Because supporting all of this is quite the undertaking I will describe the "ideal functionality" and then the "intermediate functionality" that gets us some basic administrative support quickly while working towards the optimal state. 

Ideal Functionality:

  1. A client sends an admin request to any broker
  2. The admin request is forwarded to the required broker (likely the controller)
  3. The request is handled and the server blocks until a timeout is reached or the requested operation is completed (failure or success)
    1. An operation is considered complete/successful when all required nodes have the correct/current state
    2. Immediate follow up requests to any broker will succeed.
    3. Requests that timeout may still be completed after the timeout. The users would need to poll to check the state. 
  4. The response is generated and forwarded back to the broker that received the request.
  5. A response is sent back to the client. 

Intermediate Functionality:

  1. A client sends an admin write requests to the controller broker. Read requests can still go to any broker. 
    1. As a follow up request forwarding can be added transparently. (see below)
  2. The request is handled and the server blocks until a timeout is reached or the requested operation is completed (failure or success)
    1. An operation is considered complete/successful when the controller node has the correct/current state.
    2. Immediate follow up requests to the controller will succeed. Others (not to the controller) are likely to succeed or cause a retriable exception that would eventually succeed. 
    3. Requests that timeout may still be completed after the timeout. The users would need to poll to check the state. 
  3. A response is sent back to the client. 

The ideal functionality has 2 features that are more challenging initially. For that reason those features will be removed from the initial changes, but will be tracked as follow up improvements. However, this intermediate solution should allow for a relatively transparent  transition to the ideal functionality. 

Request ForwardingKAFKA-1912

Request forwarding is relevant to any message the needs to be sent to the "correct" broker (ex: partition leader, group coordinator, etc). Though at first it may seam simple it has many technicall challenges that need to be decided in regards to connections, failure, retries, etc. Today, we depend on the client to choose the correct broker and clients that want to utilize the cluster "optimally" would likely continue to do so. For those reasons it can be handled it can be handled generically as an independent feature. 

Cluster Consistent Blocking:

Blocking an admin request until the entire cluster is aware of the correct/current state is difficult based on Kafka's current approach for propagating metadata. This approach varies based on the the metadata changing.

  • Topic metadata changes are propagated via UpdateMetadata and LeaderAndIsr requests
  • Config changes are propagated via zookeeper and listeners
  • ACL changes depend on the implementation of the Authorizer interface 
    • The default SimpleACLAuthorizer uses zookeeper and listeners

Though all of these mechanisms are different, they are all commonly "eventually consistent". None of the mechanisms, as currently implemented, will block until the metadata has been propagated successfully. Changing this behavior would require a large amount of change to the KafkaController, additional inter-broker messages, and potentially a change to the Authorizer interface. These are are all changes that should not block the implementation of KIP-4.

The intermediate changes in KIP-4 should allow an easy transition to "complete blocking" when the work can be done. This is supported by providing optional local blocking in the mean time. This local blocking only blocks until the local state on the controller is correct. We will still provide a polling mechanism for users that do not want to block at all. A polling mechanism is required in the optimal implementation too because users still need a way to check state after a timeout occurs because operations like "create topic" are not transactional. Local blocking has the added benefit of avoiding wasted poll requests to other brokers when its impossible for the request to be completed. If the controllers state is not correct, then the other brokers cant be either. Clients who don't want to validate the entire cluster state is correct can block on the controller and avoid polling all together with reasonable confidence that though they may get a retriable error on follow up requests, the requested change was successful and the cluster will be accurate eventually.

Because we already add a timeout field to the requests wire protocols, changing the behavior to block until the cluster is consistent in the future would not require a protocol change. Though the version could be bumped to indicate a behavior change. 

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).

Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.

Proposed API:

...

titleAdminClient API

...

public Map<String, DescribeTopicOutput> describeTopics(List<String> topicNames) throws ApiException;

/**
* Initiates config alteration. This is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated configs were persisted
*

* @param entityType Type of entity being described (topic, client etc..)
* @param alterConfigRequest holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - complete after image of the entity configs
*
* @throws ApiException in case of the configs could be altered for all topics
*/
public Future<Map<String, EntityConfig>> alterConfig(String entityType, AlterConfigRequest) throws ApiException;

/**

* Describes config for any entity

* @param entityType Type of entity being described (topic, client etc..)

* @param entityNames Array of entity names to describe (topic names, client id's etc)

* @return a mapping between entity name and it's config. If config cannot be fetched for a particular entity, an error value of EntityConfig is returned
* @throws ApiException in case config cannot be fetched for all topics
*/
public Map<String, EntityConfig> describeConfig(String entityType, List<String> entityNames) throws ApiException;

...

1. If ReplicaAssignment is defined

    ReplicationFactor and Partitions arguments are ignored in this case.

    For each partition in ReplicaAssignment:

    1.1 If such partition exists and assignment is different from the current replica assignment

        It's a "reassign partition" request - add it to reassign-partitions json

    1.2 If such partition doesn't exist

        It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic

2. Else if ReplicationFactor is defined

    2.1 If Partitions is defined    

        Regenerate replica assignment for all existing and newly added partitions, goto 1.

    2.2 If Partitions is not defined     

        Regenerate replica assignment only for existing partitions, goto 1.

3. Else if Partitions is defined (ReplicaAssignment and ReplicationFactor are not defined):

    3.1 If Partitions is less than current number of partitions return error code InvalidPartitions (since increasing number of partitions is not allowed).

    3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.

 

Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.

Alter Topic Response

 

AlterTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicResponse is similar to CreateTopicResponse.

ACL Admin Schema

List ACLs Request

 

ListAclsRequest => principal resource 
  resource => resource_type resource_name 
    resource_type => INT8
    resource_name => STRING
  principal => NULLABLE_STRING
Request semantics:
  1. Can be sent to any broker
  2. If a non-null principal is provided the returned ACLs will be filtered by that principle, otherwise ACLs for all principals will be listed. 
  3. If a resource with a resource_type != -1 is provided ACLs will be filtered by that resource, otherwise ACLs for all resources will be listed.
  4. Any principle can list their own ACLs where the permission type is "Allow", Otherwise the principle must be authorized to the "All" Operation on the "Cluster" resource to list ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
List ACLs Response

 

ListAclsResponse => [responses] error_code 
responses => resource [acls] resource => resource_type resource_name resource_type => INT8 resource_name => STRING acls => acl_principle acl_permission_type acl_host acl_operation acl_principle => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 error_code => INT16
Alter ACLs Request

 

AlterAclsRequest => [requests]   requests => resource [actions] 
    resource => resource_type resource_name 
      resource_type => INT8
      resource_name => STRING
    actions => action acl 
      acl => acl_principle acl_permission_type acl_host acl_operation 
        acl_principle => STRING
        acl_permission_type => INT8
        acl_host => STRING
        acl_operation => INT8
      action => INT8
Request semantics:
  1. Can be sent to the controller broker
  2. Multiple instructions for the same resource in one request will be silently ignored, only the last from the list will be executed.
  3. ACLs with a delete action will be processed first

  4. The request is not transactional. One failure wont stop others from running.

  5. The principle must be authorized to the "All" Operation on the "Cluster" resource to alter ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
Alter ACLs Response

 

AlterAclsResponse => [responses]       
responses => resource [results]
resource => resource_type resource_name resource_type => INT8 resource_name => STRING results => action acl error_code acl => acl_principle acl_permission_type acl_host acl_operation acl_principle => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 action => INT8 error_code => INT16

 

2. Server-side Admin Request handlers

At the highest level, admin requests will be handled on the brokers the same way that all message types are. However, because admin messages modify cluster metadata they should be handled by the controller. This allows the controller to propagate the changes to the rest of the cluster.  However, because the messages need to be handled by the controller does not necessarily mean they need to be sent directly to the controller. A message forwarding mechanism can be used to forward the message from any broker to the correct broker for handling. 

Because supporting all of this is quite the undertaking I will describe the "ideal functionality" and then the "intermediate functionality" that gets us some basic administrative support quickly while working towards the optimal state. 

Ideal Functionality:

  1. A client sends an admin request to any broker
  2. The admin request is forwarded to the required broker (likely the controller)
  3. The request is handled and the server blocks until a timeout is reached or the requested operation is completed (failure or success)
    1. An operation is considered complete/successful when all required nodes have the correct/current state
    2. Immediate follow up requests to any broker will succeed.
    3. Requests that timeout may still be completed after the timeout. The users would need to poll to check the state. 
  4. The response is generated and forwarded back to the broker that received the request.
  5. A response is sent back to the client. 

Intermediate Functionality:

  1. A client sends an admin write requests to the controller broker. Read requests can still go to any broker. 
    1. As a follow up request forwarding can be added transparently. (see below)
  2. The request is handled and the server blocks until a timeout is reached or the requested operation is completed (failure or success)
    1. An operation is considered complete/successful when the controller node has the correct/current state.
    2. Immediate follow up requests to the controller will succeed. Others (not to the controller) are likely to succeed or cause a retriable exception that would eventually succeed. 
    3. Requests that timeout may still be completed after the timeout. The users would need to poll to check the state. 
  3. A response is sent back to the client. 

The ideal functionality has 2 features that are more challenging initially. For that reason those features will be removed from the initial changes, but will be tracked as follow up improvements. However, this intermediate solution should allow for a relatively transparent  transition to the ideal functionality. 

Request ForwardingKAFKA-1912

Anchor
request-forwarding
request-forwarding

Request forwarding is relevant to any message the needs to be sent to the "correct" broker (ex: partition leader, group coordinator, etc). Though at first it may seam simple it has many technicall challenges that need to be decided in regards to connections, failure, retries, etc. Today, we depend on the client to choose the correct broker and clients that want to utilize the cluster "optimally" would likely continue to do so. For those reasons it can be handled it can be handled generically as an independent feature. 

Cluster Consistent Blocking:

Anchor
cluster-consistent-blocking
cluster-consistent-blocking

Blocking an admin request until the entire cluster is aware of the correct/current state is difficult based on Kafka's current approach for propagating metadata. This approach varies based on the the metadata changing.

  • Topic metadata changes are propagated via UpdateMetadata and LeaderAndIsr requests
  • Config changes are propagated via zookeeper and listeners
  • ACL changes depend on the implementation of the Authorizer interface 
    • The default SimpleACLAuthorizer uses zookeeper and listeners

Though all of these mechanisms are different, they are all commonly "eventually consistent". None of the mechanisms, as currently implemented, will block until the metadata has been propagated successfully. Changing this behavior would require a large amount of change to the KafkaController, additional inter-broker messages, and potentially a change to the Authorizer interface. These are are all changes that should not block the implementation of KIP-4.

The intermediate changes in KIP-4 should allow an easy transition to "complete blocking" when the work can be done. This is supported by providing optional local blocking in the mean time. This local blocking only blocks until the local state on the controller is correct. We will still provide a polling mechanism for users that do not want to block at all. A polling mechanism is required in the optimal implementation too because users still need a way to check state after a timeout occurs because operations like "create topic" are not transactional. Local blocking has the added benefit of avoiding wasted poll requests to other brokers when its impossible for the request to be completed. If the controllers state is not correct, then the other brokers cant be either. Clients who don't want to validate the entire cluster state is correct can block on the controller and avoid polling all together with reasonable confidence that though they may get a retriable error on follow up requests, the requested change was successful and the cluster will be accurate eventually.

Because we already add a timeout field to the requests wire protocols, changing the behavior to block until the cluster is consistent in the future would not require a protocol change. Though the version could be bumped to indicate a behavior change. 

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).

Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.

Proposed API: TBD

...


Compatibility, Deprecation, and Migration Plan

...