Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

  • General 

  • Metadata Schema

    • Consider supporting regex topic filters in the request 
    • Filter internal topics using the returned metadata
  • Topic Admin Schema

    • Improve the broker side delete topic implementation
      • Delete is likely to get used more once creation/deletion is made easier with the client. The broker side implimentation has had many jiras. 
      • Currently can't delete unhealthy topics. 
    • Support cluster consistent blocking to wait until all relevant brokers have the required metadata
    • Implement auto-topic creation client side (KAFKA-2410)
    • Add topic creation to the MirrorMaker client?
  • ACL Admin Schema

    • Review privileges for listing and altering ACLs to be more fine grained.
    • Provide an Authorizer interface using the new Java classes used by the ACL requests/responses (KAFKA-3509)
      • Deprecate the old one to encourage transition
    • Define standard Exceptions that can be thrown by the Authorizer in the interface (KAFKA-3266)
      • Otherwise all exceptions are unknown server exception to the client
    • Consider building a sync call into the Authorizer to ensure changes are propagated

Details

1. Wire Protocol Extensions

Schema

Overall the idea is to extend Wire Protocol to cover all existing admin commands so that a user does not need to talk directly to Zookeeper and all commands can be authenticated via Kafka. At the same time, since the Wire Protocol is a public API to the Kafka cluster, it was agreed that the new Admin schema needs to be "orthogonal", i.e. new messages shouldn't duplicate each other or existing requests, if those already cover particular use cases.

Finally, admin requests are likely to be used not only in CLI tools, where the common use case is create/change/delete a single entity. Since Kafka is able to maintain a huge number of topics it is vital user can efficiently request many commands at one time. That's why all admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls. At the same time to make Schema usage transparent and compliant with existing requests (such as Produce and Fetch) if batch request includes more than one instruction for a specific topic only the last from the list will be executed, others will be silently ignored.

 

New Protocol Errors

It is proposed to use existing / add these error codes to the protocol.

Error

Description

TopicExistsException
Topic with this name already exists
InvalidTopic (existing)Topic name contains invalid characters or doesn't exist
InvalidPartitionsException
Partitions field is invalid (e.g. negative or increasing number of partitions in existing topic)
InvalidReplicationFactorException
ReplicationFactor field is invalid (e.g. negative)
InvalidReplicaAssignmentException
ReplicaAssignment field is invalid (e.g. contains duplicates)
InvalidConfigurationException

Configuration setting or value is incorrect

NotControllerException
The request was routed to a broker that wasn't the active controller

Generally, a client should have enough context to provide descriptive error message.

The same notation as in  A Guide To The Kafka Protocol is used here. 

Metadata Schema (Voted and Adopted in 0.10.0.0)

Metadata Request (version 1)

 

MetadataRequest => [topics] 
Stays the same as version 0 however behavior changes. 
In version 0 there was no way to request no topics, and and empty list signified all topics.
In version 1 a null topics list (size -1 on the wire) will indicate that a user wants ALL topic metadata. Compared to an empty list (size 0) which indicates metadata for NO topics should be returned. 
Metadata Response (version 1)
    • Support renaming topics (KAFKA-2333)
      • This might required unique ids for topics instead of using the name (this would improve delete too)
    • Improve reliability and speed of topic deletion
      • Support force deleting unhealthy topics
      • Support marking for deletion and async data cleanup
        • This would required unique ids for topics instead of using the name (this is needed for rename too)
        • The topic can then be marked as deleted instead of requiring all data to be removed immediatly and in the mean time a new topic with the same name can be created. 
  • ACL Admin Schema

    • Review privileges for listing and altering ACLs to be more fine grained.
    • Provide an Authorizer interface using the new Java classes used by the ACL requests/responses (KAFKA-3509)
      • Deprecate the old one to encourage transition
    • Define standard Exceptions that can be thrown by the Authorizer in the interface (KAFKA-3266)
      • Otherwise all exceptions are unknown server exception to the client
    • Consider building a sync call into the Authorizer to ensure changes are propagated

Details

1. Wire Protocol Extensions

Schema

Overall the idea is to extend Wire Protocol to cover all existing admin commands so that a user does not need to talk directly to Zookeeper and all commands can be authenticated via Kafka. At the same time, since the Wire Protocol is a public API to the Kafka cluster, it was agreed that the new Admin schema needs to be "orthogonal", i.e. new messages shouldn't duplicate each other or existing requests, if those already cover particular use cases.

Finally, admin requests are likely to be used not only in CLI tools, where the common use case is create/change/delete a single entity. Since Kafka is able to maintain a huge number of topics it is vital user can efficiently request many commands at one time. That's why all admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls. At the same time to make Schema usage transparent and compliant with existing requests (such as Produce and Fetch) if batch request includes more than one instruction for a specific topic only the last from the list will be executed, others will be silently ignored.

 

New Protocol Errors

It is proposed to use existing / add these error codes to the protocol.

Error

Description

TopicExistsException
Topic with this name already exists
InvalidTopic (existing)Topic name contains invalid characters or doesn't exist
InvalidPartitionsException
Partitions field is invalid (e.g. negative or increasing number of partitions in existing topic)
InvalidReplicationFactorException
ReplicationFactor field is invalid (e.g. negative)
InvalidReplicaAssignmentException
ReplicaAssignment field is invalid (e.g. contains duplicates)
InvalidConfigurationException

Configuration setting or value is incorrect

NotControllerException
The request was routed to a broker that wasn't the active controller

Generally, a client should have enough context to provide descriptive error message.

The same notation as in  A Guide To The Kafka Protocol is used here. 

Metadata Schema (Voted and Adopted in 0.10.0.0)

Metadata Request (version 1)

 

MetadataRequest => [topics] 
Stays the same as version 0 however behavior changes. 
In version 0 there was no way to request no topics, and and empty list signified all topics.
In version 1 a null topics list (size -1 on the wire) will indicate that a user wants ALL topic metadata. Compared to an empty list (size 0) which indicates metadata for NO topics should be returned. 
Metadata Response (version 1)

 

MetadataResponse => [brokers] controllerId [topic_metadata]   
brokers => node_id host port rack node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING controllerId => INT32 topic_metadata => topic_

 

MetadataResponse => [brokers] controllerId [topic_metadata]   
brokers => node_id host port rack node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING controllerId => INT32 topic_metadata => topic_error_code topic is_internal [partition_metadata] topic_error_code => INT16 topic => STRING is_internal => BOOLEAN partition_metadata => partition_error_code partition_id leader [replicas] [isr] partition_error_code => INT16 partition_id => INT32 leader => INT32 replicas => INT32 isr => INT32

Adds rack, controller_id, and is_internal to the version 0 response.

The behavior of the replicas and isr arrays will be changed in order to support the admin tools, and better represent the state of the cluster:

  • In version 0, if a broker is down the replicas and isr array will omit the brokers entry and add a REPLICA_NOT_AVAILABLE error code.
  • In version 1, no error code will be set and a the broker id will be included in the replicas and isr array. 
    • Note: A user can still detect if the replica is not available, by checking if the broker is in the returned broker list.

...

 

CreateTopics Request (Version: 0) => [create_topic_requests] timeout 
  create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs] 
    topic => STRING
    num_partitions => INT32
    replication_factor => INT16
    replica_assignment => partition_id [replicas] 
      partition_id => INT32
      replicas => INT32
    configs => config_key config_value 
      config_key => STRING
      config_value => STRING
  timeout => INT32

CreateTopicsRequest is a batch request to initiate topic creation with either predefined or automatic replica assignment and optionally topic configuration.

Request semantics:

  1. Must be sent to the controller broker
  2. If there are multiple instructions for the same topic in one request an InvalidRequestException will be logged on the broker and the client will be disconnected. 
    • This is because the list of topics is modeled server side as a map with TopicName as the key
  3. The principal must be authorized to the "Create" Operation on the "Cluster" resource to create topics. 
    • Unauthorized requests will receive a ClusterAuthorizationException
  4. Only one from ReplicaAssignment or (num_partitions + replication_factor), can be defined in one instruction. 

    • If both parameters are specified an InvalidRequestException will be logged on the broker and the client will be disconnected. 
    • In the case ReplicaAssignment is defined number of partitions and replicas will be calculated from the supplied replica_assignment
    • In the case of defined (num_partitions + replication_factor) replica assignment will be automatically generated by the server.
    • One or the other must be defined. The existing broker side auto create defaults will not be used (default.replication.factor, num.partitions). The client implementation can have defaults for these options when generating the messages.
    • The first replica in [replicas] is assumed to be the preferred leader. This matches current behavior elsewhere.
  5. Setting a timeout > 0 will allow the request to block until the topic metadata is "complete" on the controller node.
    • Complete means the local topic metadata cache been completely populated and all partitions have leaders
      • The topic metadata is updated when the controller sends out update metadata requests to the brokers
    • If a timeout error occurs, the topic could still be created successfully at a later time. Its up to the client to query for the state at that point.
  6. Setting a timeout <= 0 will validate arguments and trigger the create topics and return immediately. 
    • This is essentially the fully asynchronous mode we have in the Zookeeper tools today. 
    • The error code in the response will either contain an argument validation exception or a timeout exception. If you receive a timeout exception, because you asked for 0 timeout, you can assume the message was valid and the topic creation was triggered. 
  7. The request is not transactional. 
    1. If an error occurs on one topic, the others could still be created.
    2. Errors are reported independently.

QA:

  • Why is CreateTopicsRequest a batch request?
    • Scenarios where tools or admins want to create many topics should be able to with fewer requests
    • Example: MirrorMaker may want to create the topics downstream
  • What happens if some topics error immediately? Will it return immediately
    • The request will block until all topics have either been created, errors, or the timeout has been hit
    • There is no "short circuiting" where 1 error stops the other topics from being created
  • Why implement "partial blocking" instead of fully async or fully consistent?
  • Why require the request to go to the controller?
    • The controller is responsible for the cluster metadata and its propagation 
    • See Request Forwarding below
Create Topics Response
 => STRING
  timeout => INT32

CreateTopicsRequest is a batch request to initiate topic creation with either predefined or automatic replica assignment and optionally topic configuration.

Request semantics:

  1. Must be sent to the controller broker
  2. If there are multiple instructions for the same topic in one request an InvalidRequestException will be logged on the broker and the client will be disconnected. 
    • This is because the list of topics is modeled server side as a map with TopicName as the key
  3. The principal must be authorized to the "Create" Operation on the "Cluster" resource to create topics. 
    • Unauthorized requests will receive a ClusterAuthorizationException
  4. Only one from ReplicaAssignment or (num_partitions + replication_factor), can be defined in one instruction. 

    • If both parameters are specified an InvalidRequestException will be logged on the broker and the client will be disconnected. 
    • In the case ReplicaAssignment is defined number of partitions and replicas will be calculated from the supplied replica_assignment
    • In the case of defined (num_partitions + replication_factor) replica assignment will be automatically generated by the server.
    • One or the other must be defined. The existing broker side auto create defaults will not be used (default.replication.factor, num.partitions). The client implementation can have defaults for these options when generating the messages.
    • The first replica in [replicas] is assumed to be the preferred leader. This matches current behavior elsewhere.
  5. Setting a timeout > 0 will allow the request to block until the topic metadata is "complete" on the controller node.
    • Complete means the local topic metadata cache been completely populated and all partitions have leaders
      • The topic metadata is updated when the controller sends out update metadata requests to the brokers
    • If a timeout error occurs, the topic could still be created successfully at a later time. Its up to the client to query for the state at that point.
  6. Setting a timeout <= 0 will validate arguments and trigger the create topics and return immediately. 
    • This is essentially the fully asynchronous mode we have in the Zookeeper tools today. 
    • The error code in the response will either contain an argument validation exception or a timeout exception. If you receive a timeout exception, because you asked for 0 timeout, you can assume the message was valid and the topic creation was triggered. 
  7. The request is not transactional. 
    1. If an error occurs on one topic, the others could still be created.
    2. Errors are reported independently.

QA:

  • Why is CreateTopicsRequest a batch request?
    • Scenarios where tools or admins want to create many topics should be able to with fewer requests
    • Example: MirrorMaker may want to create the topics downstream
  • What happens if some topics error immediately? Will it return immediately
    • The request will block until all topics have either been created, errors, or the timeout has been hit
    • There is no "short circuiting" where 1 error stops the other topics from being created
  • Why implement "partial blocking" instead of fully async or fully consistent?
  • Why require the request to go to the controller?
    • The controller is responsible for the cluster metadata and its propagation 
    • See Request Forwarding below
Create Topics Response

 

CreateTopics Response (Version: 0) => [topic_error_codes] 
  topic_error_codes => topic error_code 
    topic => STRING
    error_code => INT16

CreateTopicsResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Delete Topics Request

 

DeleteTopics Request (Version: 0) => [topics] timeout 
  topics => STRING
  timeout => INT32

DeleteTopicsRequest is a batch request to initiate topic deletion.

Request semantics:

  1. Must be sent to the controller broker
  2. If there are multiple instructions for the same topic in one request an InvalidRequestException will be logged on the broker and the client will be disconnected. 
    • This is because the list of topics is modeled server side as a set
  3. When requesting to delete a topic that does not exist, a an InvalidTopic error will be returned for that topic.
  4. When requesting to delete a topic that is already marked for deletion, the request will wait for the wait for the timeout and return as usual.
    • This is to avoid errors due to concurrent delete requests. The end result is the same, the topic is deleted.
  5. The principal must be authorized to the "Delete" Operation on the "Topic" resource to delete the topic. 
    • Unauthorized requests will receive a TopicAuthorizationException
  6. Setting a timeout > 0 will allow the request to block until the delete is "complete" on the controller node.
    • Complete means the local topic metadata cache no longer contains the topic
      • The topic metadata is updated when the controller sends out update metadata requests to the brokers
    • If a timeout error occurs, the topic could still be deleted successfully at a later time. Its up to the client to query for the state at that point.
  7. Setting a timeout <= 0 will validate arguments and trigger the delete topics and return immediately. 
    • This is essentially the fully asynchronous mode we have in the Zookeeper tools today. 
    • The error code in the response will either contain an argument validation exception or a timeout exception. If you receive a timeout exception, because you asked for 0 timeout, you can assume the message was valid and the topic deletion was triggered. 
  8. The request is not transactional. 
    1. If an error occurs on one topic, the others could still be deleted.
    2. Errors are reported independently.

QA:

  • Why is DeleteTopicsRequest a batch request?
    • Scenarios where tools or admins want to delete many topics should be able to with fewer requests
    • Example: Removing all cluster topics
  • What happens if some topics error immediately? Will it return immediately? 
    • The request will block until all topics have either been deleted, errors, or the timeout has been hit
    • There is no "short circuiting" where 1 error stops the other topics from being deleted
  • Why have a timeout at all? Deletes could take a while?
    • True some deletes may take a while or never finish, however some admin tools may want extended blocking regardless. 
    • If you don't want any blocking setting a timeout of 0 works.
    • Future changes may make deletes much faster. See the Follow Up Changes section above.
  • Why implement "partial blocking" instead of fully async or fully consistent?
  • Why require the request to go to the controller?
    • The controller is responsible for the cluster metadata and its propagation 
    • See Request Forwarding below
Delete Topics Response

 

DeleteTopics Response (Version: 0) => [topic_error_codes] 
  topic_error_codes => topic error_code 
    topic => STRING
    error_code => INT16

 

CreateTopics Response (Version: 0) => [topic_error_codes] 
  topic_error_codes => topic error_code 
    topic => STRING
    error_code => INT16

CreateTopicsResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Delete Topics Request

 

DeleteTopicRequest => [TopicName]
TopicName => string

 

DeleteTopicsRequest requires only topic names which should be deleted.
Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Delete Topics Response

 

DeleteTopicsResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicsResponse is similar to CreateTopicsResponse.

Alter Topics Request

 

AlterTopicsRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment]
TopicName => string
Replicas => int32
Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AlterTopicsRequest is a batch asynchronous request to initiate topic alteration: replication parameters and replica assignment.
Request semantics:

1. If ReplicaAssignment is defined

    ReplicationFactor and Partitions arguments are ignored in this case.

    For each partition in ReplicaAssignment:

    1.1 If such partition exists and assignment is different from the current replica assignment

        It's a "reassign partition" request - add it to reassign-partitions json

    1.2 If such partition doesn't exist

        It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic

2. Else if ReplicationFactor is defined

    2.1 If Partitions is defined    

        Regenerate replica assignment for all existing and newly added partitions, goto 1.

    2.2 If Partitions is not defined     

        Regenerate replica assignment only for existing partitions, goto 1.

3. Else if Partitions is defined (ReplicaAssignment and ReplicationFactor are not defined):

    3.1 If Partitions is less than current number of partitions return error code InvalidPartitions (since increasing number of partitions is not allowed).

    3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.

 

Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.

Alter Topics Response

 

AlterTopicsResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicsResponse is similar to CreateTopicsResponse.

ACL Admin Schema

List ACLs Request

 

ListAclsRequest => principal resource 
  resource => resource_type resource_name 
    resource_type => INT8
    resource_name => STRING
  principal => NULLABLE_STRING
Request semantics:
  1. Can be sent to any broker
  2. If a non-null principal is provided the returned ACLs will be filtered by that principle, otherwise ACLs for all principals will be listed. 
  3. If a resource with a resource_type != -1 is provided ACLs will be filtered by that resource, otherwise ACLs for all resources will be listed.
  4. Any principle can list their own ACLs where the permission type is "Allow", Otherwise the principle must be authorized to the "All" Operation on the "Cluster" resource to list ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
List ACLs Response

 

ListAclsResponse => [responses] error_code 
responses => resource [acls] resource => resource_type resource_name resource_type => INT8 resource_name => STRING acls => acl_principle acl_permission_type acl_host acl_operation acl_principle => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 error_code => INT16
Alter ACLs Request

 

AlterAclsRequest => [requests]   requests => resource [actions] 
    resource => resource_type resource_name 
      resource_type => INT8
      resource_name => STRING
    actions => action acl 
      acl => acl_principle acl_permission_type acl_host acl_operation 
        acl_principle => STRING
        acl_permission_type => INT8
        acl_host => STRING
        acl_operation => INT8
      action => INT8
Request semantics:
  1. Can be sent to the controller broker
  2. Multiple instructions for the same resource in one request will be silently ignored, only the last from the list will be executed.
  3. ACLs with a delete action will be processed first

  4. The request is not transactional. One failure wont stop others from running.

  5. The principle must be authorized to the "All" Operation on the "Cluster" resource to alter ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
Alter ACLs Response

 

AlterAclsResponse => [responses]       
responses => resource [results]
resource => resource_type resource_name resource_type => INT8 resource_name => STRING results => action acl error_code acl => acl_principle acl_permission_type acl_host acl_operation acl_principle => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 action => INT8 error_code => INT16

 

...