Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

 

CreateTopics Response (Version: 0) => [topic_error_codes] 
  topic_error_codes => topic error_code 
    topic => STRING
    error_code => INT16

CreateTopicsResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Response semantics:

  1. When a request hits the timeout, the topics that are not "complete" will have the TimeoutException error code. 
    • The topics that did complete successfully with have no error. 
Delete Topics Request (KAFKA-2946)

 

DeleteTopics Request (Version: 0) => [topics] timeout 
  topics => STRING
  timeout => INT32

DeleteTopicsRequest is a batch request to initiate topic deletion.

Request semantics:

  1. Must be sent to the controller broker
  2. If there are multiple instructions for the same topic in one request the extra request will be ingnored
    • This is because the list of topics is modeled server side as a set
    • Multiple deletes results in the same end goal, so handling this error for the user should be okay
  3. When requesting to delete a topic that does not exist, a an InvalidTopic error will be returned for that topic.
  4. When requesting to delete a topic that is already marked for deletion, the request will wait up to the timeout until the delete is "complete" and return as usual.
    • This is to avoid errors due to concurrent delete requests. The end result is the same, the topic is deleted.
  5. The principal must be authorized to the "Delete" Operation on the "Topic" resource to delete the topic. 
    • Unauthorized requests will receive a TopicAuthorizationException if they are authorized to the "Describe" Operation on the "Topic" resource
    • Otherwise they will receive an InvalidTopicException as if the topic does not exist. 
  6. Setting a timeout > 0 will allow the request to block until the delete is "complete" on the controller node.
    • Complete means the local topic metadata cache no longer contains the topic
      • The topic metadata is updated when the controller sends out update metadata requests to the brokers
    • If a timeout error occurs, the topic could still be deleted successfully at a later time. Its up to the client to query for the state at that point.
  7. Setting a timeout <= 0 will validate arguments and trigger the delete topics and return immediately. 
    • This is essentially the fully asynchronous mode we have in the Zookeeper tools today. 
    • The error code in the response will either contain an argument validation exception or a timeout exception. If you receive a timeout exception, because you asked for 0 timeout, you can assume the message was valid and the topic deletion was triggered. 
  8. The request is not transactional. 
    1. If an error occurs on one topic, the others could still be deleted.
    2. Errors are reported independently.

QA:

  • Why is DeleteTopicsRequest a batch request?
    • Scenarios where tools or admins want to delete many topics should be able to with fewer requests
    • Example: Removing all cluster topics
  • What happens if some topics error immediately? Will it return immediately? 
    • The request will block until all topics have either been deleted, errors, or the timeout has been hit
    • There is no "short circuiting" where 1 error stops the other topics from being deleted
  • Why have a timeout at all? Deletes could take a while?
    • True some deletes may take a while or never finish, however some admin tools may want extended blocking regardless. 
    • If you don't want any blocking setting a timeout of 0 works.
    • Future changes may make deletes much faster. See the Follow Up Changes section above.
  • Why implement "partial blocking" instead of fully async or fully consistent?
  • Why require the request to go to the controller?
    • The controller is responsible for the cluster metadata and its propagation 
    • See Request Forwarding below
Delete Topics Response

 

DeleteTopics Response (Version: 0) => [topic_error_codes] 
  topic_error_codes => topic error_code 
    topic => STRING
    error_code => INT16

DeleteTopicsResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Response semantics:

  1. When a request hits the timeout, the topics that are not "complete" will have the TimeoutException error code. 
    • The topics that did complete successfully with have no error. 
Alter Topics Request

 

AlterTopicsRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment]
TopicName => string
Replicas => int32
Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AlterTopicsRequest is a batch asynchronous request to initiate topic alteration: replication parameters and replica assignment.
Request semantics:

1. If ReplicaAssignment is defined

    ReplicationFactor and Partitions arguments are ignored in this case.

    For each partition in ReplicaAssignment:

    1.1 If such partition exists and assignment is different from the current replica assignment

        It's a "reassign partition" request - add it to reassign-partitions json

    1.2 If such partition doesn't exist

        It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic

2. Else if ReplicationFactor is defined

    2.1 If Partitions is defined    

        Regenerate replica assignment for all existing and newly added partitions, goto 1.

    2.2 If Partitions is not defined     

        Regenerate replica assignment only for existing partitions, goto 1.

3. Else if Partitions is defined (ReplicaAssignment and ReplicationFactor are not defined):

    3.1 If Partitions is less than current number of partitions return error code InvalidPartitions (since increasing number of partitions is not allowed).

    3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.

 

Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.

Alter Topics Response

 

AlterTopicsResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicsResponse is similar to CreateTopicsResponse.

ACL Admin Schema (KAFKA-3266)

List ACLs Request

 

ListAclsRequest
ListAcls Request (Version: 0) => principal resource
 
principal => NULLABLE_STRING
resource => resource_type resource_name resource_type => INT8 resource_name
=> STRING principal
 => 
NULLABLE_
STRING
Request semantics:
  1. Can be sent to any broker
  2. If a non-null principal is provided the returned ACLs will be filtered by that principle, otherwise ACLs for all principals will be listed. 
  3. If a resource with a resource_type != -1 is provided ACLs will be filtered by that resource, otherwise ACLs for all resources will be listed.
  4. Any principle can list their own ACLs where the permission type is "Allow", Otherwise the principle must be authorized to the "All" Operation on the "Cluster" resource to list ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This avoids adding a new operation that an existing authorizer implementation may not be aware of.
    • This can be reviewed and further refined/restricted as a follow up ACLs review after this KIP. See Follow Up Changes.
  5. Requesting an invalid resource_type will result in an InvalidRequest error being returned
  6. Requesting a resource or principle that does not have acls will not result in an error, instead empty response list is returned

List ACLs Response

 

ListAclsResponse
ListAcls Response (Version: 0) => [responses] error_code 
responses => resource [acls] resource => resource_type resource_name resource_type => INT8 resource_name => STRING acls => acl_principle acl_permission_type acl_host acl_operation acl_principle => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 error_code => INT16
Alter ACLs Request

 

AlterAclsRequest => [requests]   requests => resource [actions] 
    resource => resource_type resource_name 
      resource_type => INT8
      resource_name => STRING
    actions => action acl 
      acl => acl_principle acl_permission_type acl_host acl_operation 
        acl_principle => STRING
        acl_permission_type => INT8
        acl_host => STRING
        acl_operation => INT8
      action => INT8
Request semantics:
  1. Can be sent to the controller broker
  2. Multiple instructions for the same resource in one request will be silently ignored, only the last from the list will be executed.
  3. ACLs with a delete action will be processed first

  4. The request is not transactional. One failure wont stop others from running.

  5. The principle must be authorized to the "All" Operation on the "Cluster" resource to alter ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
Alter ACLs Response

 

AlterAclsResponse => [responses]       
responses => resource [results]
resource => resource_type resource_name resource_type => INT8 resource_name => STRING results => action acl error_code acl => acl_principle acl_permission_type acl_host acl_operation acl_principle => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 action => INT8 error_code => INT16

 

Config Admin Schema

Describe Configs Request

 

DescribeConfigs Request (Version: 0) => [entities]   
entities => entity_type entity_name entity_type => INT8 entity_name => STRING
Request semantics:
  1. Can be sent to any broker
  2. If there are multiple instructions for the same entity in one request the extra request will be ingnored
    • This is because the list of entities is modeled server side as a set
    • Multiple describes results in the same end goal, so handling this error for the user should be okay
    • This is similar to how delete topics handles requests
  3. Entity types are "Topic" (existing), "Client" (existing), and "Broker" (new). 
    1. Broker type is read only
  4. Below are the authorization requirements for each type:
    • Broker: Must be authorized to the "Describe" Operation on the "Cluster" resource
    • Topic: Must be authorized to the "Describe" Operation on the "Topic" resource
    • Client: Must be authorized to the "Describe" Operation on the "Client" resource
      • This is a new resource needed
      • TODO: best way to handle this...
  5. Arbitrary configurations are allowed
    1. This provides flexibility for custom clients, and allows all "plugin" or extra configs to be shown
    2. The user can validate the configs after the describe command in their client to heck for errors, but the wire protocol should relay all information.
Describe Configs Response

 

DescribeConfigs Response (Version: 0) => [responses]   
responses => entity config error_code entity => entity_type entity_name entity_type => INT8 entity_name => STRING config => config_key config_value config_key => STRING config_value => STRING error_code => INT16

Response semantics:

 

Alter Configs Request
Alter Configs Response

...