...
New Protocol Errors
It is proposed to use existing / add these error codes to the protocol.
Error | Description |
---|---|
TopicExistsException | Topic with this name already exists |
InvalidTopic (existing) | Topic name contains invalid characters or doesn't exist |
InvalidPartitionsException | Partitions field is invalid (e.g. negative or increasing number of partitions in existing topic) |
InvalidReplicationFactorException | ReplicationFactor field is invalid (e.g. negative) |
InvalidReplicaAssignmentException | ReplicaAssignment field is invalid (e.g. contains duplicates) |
InvalidConfigurationException | Configuration setting or value is incorrect |
NotControllerException | The request was routed to a broker that wasn't the active controller |
InvalidRequestException | Thrown when a request breaks basic wire protocol rules. (Existing but not mapped) |
Generally, a client should have enough context to provide descriptive error message.
The same notation as in A Guide To The Kafka Protocol is used here.
...
CreateTopics Request (Version: 0) => [create_topic_requests] timeout create_topic_requests => topic num_partitions replication_factor [replica_assignment] [configs] topic => STRING num_partitions => INT32 replication_factor => INT16 replica_assignment => partition_id [replicas] partition_id => INT32 replicas => INT32 configs => config_key config_value config_key => STRING config_value => STRING timeout => INT32 |
CreateTopicsRequest
is a batch request to initiate topic creation with either predefined or automatic replica assignment and optionally topic configuration.
Request semantics:
- Must be sent to the controller broker
- If there are multiple instructions for the same topic in one request an InvalidRequestException will be logged on the broker and a single error code for that topic will be returned to the client will be disconnected.
- This is because the list of topics is modeled server side as a map with TopicName as the key
- The principal must be authorized to the "Create" Operation on the "Cluster" resource to create topics.
- Unauthorized requests will receive a ClusterAuthorizationException
Only one from
ReplicaAssignment or (num_partitions +
replication_factor),
can be defined in one instruction.- If both parameters are specified an InvalidRequestException will be logged on the broker and the client will be disconnected. broker and an error code for that topic will be returned to the client
- In the case
ReplicaAssignment
is defined number of partitions and replicas will be calculated from the supplied replica_assignment. - In the case of defined (num_partitions + replication_factor) replica assignment will be automatically generated by the server.
- One or the other must be defined. The existing broker side auto create defaults will not be used (default.replication.factor, num.partitions). The client implementation can have defaults for these options when generating the messages.
- The first replica in [replicas] is assumed to be the preferred leader. This matches current behavior elsewhere.
- Setting a timeout > 0 will allow the request to block until the topic metadata is "complete" on the controller node.
- Complete means the local topic metadata cache been completely populated and all partitions have leaders
- The topic metadata is updated when the controller sends out update metadata requests to the brokers
- If a timeout error occurs, the topic could still be created successfully at a later time. Its up to the client to query for the state at that point.
- Complete means the local topic metadata cache been completely populated and all partitions have leaders
- Setting a timeout <= 0 will validate arguments and trigger the create topics and return immediately.
- This is essentially the fully asynchronous mode we have in the Zookeeper tools today.
- The error code in the response will either contain an argument validation exception or a timeout exception. If you receive a timeout exception, because you asked for 0 timeout, you can assume the message was valid and the topic creation was triggered.
- The request is not transactional.
- If an error occurs on one topic, the others could still be created.
- Errors are reported independently.
QA:
- Why is CreateTopicsRequest a batch request?
- Scenarios where tools or admins want to create many topics should be able to with fewer requests
- Example: MirrorMaker may want to create the topics downstream
- What happens if some topics error immediately? Will it return immediately?
- The request will block until all topics have either been created, errors, or the timeout has been hit
- There is no "short circuiting" where 1 error stops the other topics from being created
- Why implement "partial blocking" instead of fully async or fully consistent?
- See Cluster Consistent Blocking below
- Why require the request to go to the controller?
- The controller is responsible for the cluster metadata and its propagation
- See Request Forwarding below
...
CreateTopics Response (Version: 0) => [topic_error_codes] topic_error_codes => topic error_code topic => STRING error_code => INT16 |
CreateTopicsResponse
contains a map between topic and topic creation result error code (see New Protocol Errors).
Delete Topics Request
DeleteTopics Request (Version: 0) => [topics] timeout topics => STRING timeout => INT32 |
DeleteTopicsRequest is a batch request to initiate topic deletion.
Request semantics:
- Must be sent to the controller broker
- If there are multiple instructions for the same topic in one request an InvalidRequestException will be logged on the broker and the client will be disconnected. request the extra request will be ingnored
- This is because the list of topics is modeled server side as a set
- Multiple deletes results in the same end goal, so handling this error for the user should be okay
- When requesting to delete a topic that does not exist, a an InvalidTopic error will be returned for that topic.
- When requesting to delete a topic that is already marked for deletion, the request will wait for the wait for the timeout and return as usual.
- This is to avoid errors due to concurrent delete requests. The end result is the same, the topic is deleted.
- The principal must be authorized to the "Delete" Operation on the "Topic" resource to delete the topic.
- Unauthorized requests will receive a TopicAuthorizationException
- Setting a timeout > 0 will allow the request to block until the delete is "complete" on the controller node.
- Complete means the local topic metadata cache no longer contains the topic
- The topic metadata is updated when the controller sends out update metadata requests to the brokers
- If a timeout error occurs, the topic could still be deleted successfully at a later time. Its up to the client to query for the state at that point.
- Complete means the local topic metadata cache no longer contains the topic
- Setting a timeout <= 0 will validate arguments and trigger the delete topics and return immediately.
- This is essentially the fully asynchronous mode we have in the Zookeeper tools today.
- The error code in the response will either contain an argument validation exception or a timeout exception. If you receive a timeout exception, because you asked for 0 timeout, you can assume the message was valid and the topic deletion was triggered.
- The request is not transactional.
- If an error occurs on one topic, the others could still be deleted.
- Errors are reported independently.
QA:
- Why is DeleteTopicsRequest a batch request?
- Scenarios where tools or admins want to delete many topics should be able to with fewer requests
- Example: Removing all cluster topics
- What happens if some topics error immediately? Will it return immediately?
- The request will block until all topics have either been deleted, errors, or the timeout has been hit
- There is no "short circuiting" where 1 error stops the other topics from being deleted
- Why have a timeout at all? Deletes could take a while?
- True some deletes may take a while or never finish, however some admin tools may want extended blocking regardless.
- If you don't want any blocking setting a timeout of 0 works.
- Future changes may make deletes much faster. See the Follow Up Changes section above.
- Why implement "partial blocking" instead of fully async or fully consistent?
- See Cluster Consistent Blocking below
- Why require the request to go to the controller?
- The controller is responsible for the cluster metadata and its propagation
- See Request Forwarding below
Delete Topics Response
DeleteTopics Response (Version: 0) => [topic_error_codes] topic_error_codes => topic error_code topic => STRING error_code => INT16 |
DeleteTopicsResponse
is similar to CreateTopicsResponse
.
Alter Topics Request
AlterTopicsRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment] TopicName => string Replicas => int32 Partitions => int32 ReplicaAssignment => [PartitionId [ReplicaId]] |
AlterTopicsRequest
is a batch asynchronous request to initiate topic alteration: replication parameters and replica assignment.1. If ReplicaAssignment
is defined
ReplicationFactor
and Partitions arguments are ignored in this case.
For each partition in ReplicaAssignment
:
1.1 If such partition exists and assignment is different from the current replica assignment
It's a "reassign partition" request - add it to reassign-partitions json
1.2 If such partition doesn't exist
It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic
2. Else if ReplicationFactor
is defined
2.1 If Partitions
is defined
Regenerate replica assignment for all existing and newly added partitions, goto 1.
2.2 If Partitions
is not defined
Regenerate replica assignment only for existing partitions, goto 1.
3. Else if Partitions
is defined (ReplicaAssignment
and ReplicationFactor
are not defined):
3.1 If Partitions
is less than current number of partitions return error code InvalidPartitions
(since increasing number of partitions is not allowed).
3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.
Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Alter Topics Response
AlterTopicsResponse => [TopicName ErrorCode]ErrorCode => int16 TopicName => string
|
AlterTopicsResponse
is similar to CreateTopicsResponse
.ACL Admin Schema
List ACLs Request
ListAclsRequest => principal resource resource => resource_type resource_name resource_type => INT8 resource_name => STRING principal => NULLABLE_STRING |
- Can be sent to any broker
- If a non-null principal is provided the returned ACLs will be filtered by that principle, otherwise ACLs for all principals will be listed.
- If a resource with a resource_type != -1 is provided ACLs will be filtered by that resource, otherwise ACLs for all resources will be listed.
- Any principle can list their own ACLs where the permission type is "Allow", Otherwise the principle must be authorized to the "All" Operation on the "Cluster" resource to list ACLs.
- Unauthorized requests will receive a ClusterAuthorizationException
- This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
List ACLs Response
ListAclsResponse => [responses] error_code |
Alter ACLs Request
AlterAclsRequest => [requests] requests => resource [actions] resource => resource_type resource_name resource_type => INT8 resource_name => STRING actions => action acl acl => acl_principle acl_permission_type acl_host acl_operation acl_principle => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 action => INT8 |
- Can be sent to the controller broker
- Multiple instructions for the same resource in one request will be silently ignored, only the last from the list will be executed.
ACLs with a delete action will be processed first
The request is not transactional. One failure wont stop others from running.
- The principle must be authorized to the "All" Operation on the "Cluster" resource to alter ACLs.
- Unauthorized requests will receive a ClusterAuthorizationException
- This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
Alter ACLs Response
AlterAclsResponse => [responses] |
...