The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.
Status
Current state: Accepted
Discussion thread: here
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Users of Kafka have created dozens of different systems to work with Kafka. Providing a wire protocol that allows the brokers to execute administrative code and public api/client has many benefits including:
- Allows clients in any language to administrate Kafka
- Wire protocol is supported by any language
- Provides public client for performing admin operations
- Ensures integration test code in other projects and clients maintains compatibility
- Prevents users from needing to use the Command classes and work around standard output and system exits
- Removing the need for admin scripts (kafka-topics.sh, kafka-acls.sh, etc) to talk directly to Zookeeper.
- Allows ZNodes to be completely locked down via ACLs
- Further hides the Zookeeper details of Kafka
Public Interfaces
- Changes to Wire Protocol:
- Adds the following new Request/Response messages:
- CreateTopic
- AlterTopic
- DeleteTopic
- ListAcls
- AlterAcls
- DescribeConfig
- AlterConfig
- Modifies Metadata Request/Response to allowing polling for in-progress or complete admin operations. Added fields include:
- Add the ability to request no topics with a null topics list
- Boolean indicating if a topic is marked for deletion
- Boolean indicating if a topic is an internal topic
- Rack information (if not added by KIP-36 Rack aware replica assignment)
- Boolean indicating if a broker is the controller
- Adds the following new Request/Response messages:
- New Java client: AdminClient - a Wire Protocol client for administrative operations
Proposed Changes
Proposed changes include 4 parts:
- Wire protocol additions and changes
- Server-side message handlers and authorization
New Java AdminClient implementation
- Refactor admin scripts and code to use new client where appropriate
Follow Up Changes
Changes that should be considered shortly after or are enabled by this KIP included:
- Review privileges for listing and altering ACLs to be more fine grained.
- Provide an Authorizer interface using the new Java classes used by the ACL requests/responses (KAFKA-3509)
- Deprecate the old one to encourage transition
- Define standard Exceptions that can be thrown by the Authorizer in the interface (KAFKA-3266)
- Otherwise all exceptions are unknown server exception to the client
- Consider building a sync call into the Authorizer to ensure changes are propagated
Details
1. Wire Protocol Extensions
Schema
Finally, admin requests are likely to be used not only in CLI tools, where the common use case is create/change/delete a single entity. Since Kafka is able to maintain a huge number of topics it is vital user can efficiently request many commands at one time. That's why all admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls. At the same time to make Schema usage transparent and compliant with existing requests (such as Produce and Fetch) if batch request includes more than one instruction for a specific topic only the last from the list will be executed, others will be silently ignored.
New Protocol Errors
It is proposed to use existing / add these error codes to the protocol.
Error | Description | Requests |
---|---|---|
TopicExists | Topic with this name already exists. | CreateTopic |
InvalidTopic (existing) | Topic name contains invalid characters or doesn't exist. | CreateTopic, AlterTopic, DeleteTopic |
InvalidPartitions | Partitions field is invalid (e.g. negative or increasing number of partitions in existing topic) | CreateTopic, AlterTopic |
InvalidReplicationFactor | ReplicationFactor field is invalid (e.g. negative) | CreateTopic, |
InvalidReplicaAssignment | ReplicaAssignment field is invalid (e.g. contains duplicates) |
|
InvalidConfiguration | Either topic-level config setting or value is incorrect. | CreateTopic |
Generally, a client should have enough context to provide descriptive error message.
The same notation as in A Guide To The Kafka Protocol is used here.
Metadata Schema
Metadata Request (version 1)
MetadataRequest => [topics] |
Metadata Response (version 1)
MetadataResponse => [brokers] controller_id [internal_topics] [topics_marked_for_deletion] [topic_metadata] |
Adds rack, controller_id, internal_topics and topics_marked_for_deletion to the version 0 response.
ACL Admin Schema
List ACLs Request
ListAclsRequest => principal resource resource => resource_type resource_name resource_type => INT8 resource_name => STRING principal => NULLABLE_STRING |
- Can be sent to any broker
- If a non-null principal is provided the returned ACLs will be filtered by that principle, otherwise ACLs for all principals will be listed.
- If a resource with a resource_type != -1 is provided ACLs will be filtered by that resource, otherwise ACLs for all resources will be listed.
- Any principle can list their own ACLs where the permission type is "Allow", Otherwise the principle must be authorized to the "All" Operation on the "Cluster" resource to list ACLs.
- Unauthorized requests will receive a ClusterAuthorizationException
- This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
List ACLs Response
ListAclsResponse => [responses] error_code |
Alter ACLs Request
AlterAclsRequest => [requests] requests => resource [actions] resource => resource_type resource_name resource_type => INT8 resource_name => STRING actions => action acl acl => acl_principle acl_permission_type acl_host acl_operation acl_principle => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 action => INT8 |
- Can be sent to the controller broker
- Multiple instructions for the same resource in one request will be silently ignored, only the last from the list will be executed.
ACLs with a delete action will be processed first
The request is not transactional. One failure wont stop others from running.
- The principle must be authorized to the "All" Operation on the "Cluster" resource to alter ACLs.
- Unauthorized requests will receive a ClusterAuthorizationException
- This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
Alter ACLs Response
AlterAclsResponse => [responses] |
Topic Admin Schema
Create Topic Request
CreateTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment [ConfigEntry]] TopicName => string Partitions => int32 Replicas => int32 ReplicaAssignment => [PartitionId [ReplicaId]] ConfigEntry => ConfigKey ConfigValue ConfigKey => string ConfigValue => string |
CreateTopicRequest
is a batch asynchronous request to initiate topic creation with either predefined or automatic replica assignment and optionally topic configuration.- Only one from (
Partitions
+ReplicationFactor
),ReplicaAssignment
can be defined in one instruction. If both parameters are specified -ReplicaAssignment
takes precedence. - In case
ReplicaAssignment
is defined number of partitions and replicas will be calculated from the suppliedReplicaAssignment
. In case of defined (Partitions
+ReplicationFactor
) replica assignment will be automatically generated by the server. - Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Create Topic Response
CreateTopicResponse => [TopicName ErrorCode] ErrorCode => int16 TopicName => string |
CreateTopicResponse
contains a map between topic and topic creation result error code (see New Protocol Errors).
Alter Topic Request
AlterTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment] TopicName => string Replicas => int32 Partitions => int32 ReplicaAssignment => [PartitionId [ReplicaId]] |
AlterTopicRequest
is a batch asynchronous request to initiate topic alteration: replication parameters and replica assignment.1. If ReplicaAssignment
is defined
ReplicationFactor
and Partitions arguments are ignored in this case.
For each partition in ReplicaAssignment
:
1.1 If such partition exists and assignment is different from the current replica assignment
It's a "reassign partition" request - add it to reassign-partitions json
1.2 If such partition doesn't exist
It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic
2. Else if ReplicationFactor
is defined
2.1 If Partitions
is defined
Regenerate replica assignment for all existing and newly added partitions, goto 1.
2.2 If Partitions
is not defined
Regenerate replica assignment only for existing partitions, goto 1.
3. Else if Partitions
is defined (ReplicaAssignment
and ReplicationFactor
are not defined):
3.1 If Partitions
is less than current number of partitions return error code InvalidPartitions
(since increasing number of partitions is not allowed).
3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.
Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Alter Topic Response
[TopicName ErrorCode]ErrorCode => int16 TopicName => string
|
AlterTopicResponse
is similar to CreateTopicResponse
.Delete Topic Request
DeleteTopicRequest => [TopicName] TopicName => string |
DeleteTopicRequest
requires only topic names which should be deleted.
Delete Topic Response
ErrorCode => int16 TopicName => string
|
DeleteTopicResponse
is similar to CreateTopicResponse
.
2. Server-side Admin Request handlers
At the highest level, admin requests will be handled on the brokers the same way that all message types are. However, because admin messages modify cluster metadata they should be handled by the controller. This allows the controller to propagate the changes to the rest of the cluster. However, because the messages need to be handled by the controller does not necessarily mean they need to be sent directly to the controller. A message forwarding mechanism can be used to forward the message from any broker to the correct broker for handling.
Because supporting all of this is quite the undertaking I will describe the "ideal functionality" and then the "intermediate functionality" that gets us some basic administrative support quickly while working towards the optimal state.
Ideal Functionality:
- A client sends an admin request to any broker
- The admin request is forwarded to the required broker (likely the controller)
- The request is handled and the server blocks until a timeout is reached or the requested operation is completed (failure or success)
- An operation is considered complete/successful when all required nodes have the correct/current state.
- Immediate follow up requests to any broker will succeed.
- Requests that timeout may still be completed after the timeout. The users would need to poll to check the state.
- The response is generated and forwarded back to the broker that received the request.
- A response is sent back to the client.
Intermediate Functionality:
- A client sends an admin write requests to the controller broker. Read requests can still go to any broker.
- As a follow up request forwarding can be added transparently. (see below)
- The request is handled and the server blocks until a timeout is reached or the requested operation is completed (failure or success)
- An operation is considered complete/successful when the controller node has the correct/current state.
- Immediate follow up requests to the controller will succeed. Others (not to the controller) are likely to succeed or cause a retriable exception that would eventually succeed.
- Requests that timeout may still be completed after the timeout. The users would need to poll to check the state.
- A response is sent back to the client.
The ideal functionality has 2 features that are more challenging initially. For that reason those features will be removed from the initial changes, but will be tracked as follow up improvements. However, this intermediate solution should allow for a relatively transparent transition to the ideal functionality.
Request Forwarding: KAFKA-1912
Request forwarding is relevant to any message the needs to be sent to the "correct" broker (ex: partition leader, group coordinator, etc). Though at first it may seam simple it has many technicall challenges that need to be decided in regards to connections, failure, retries, etc. Today, we depend on the client to choose the correct broker and clients that want to utilize the cluster "optimally" would likely continue to do so. For those reasons it can be handled it can be handled generically as an independent feature.
Cluster Consistent Blocking:
Blocking an admin request until the entire cluster is aware of the correct/current state is difficult based on Kafka's current approach for propagating metadata. This approach varies based on the the metadata changing.
- Topic metadata changes are propagated via UpdateMetadata and LeaderAndIsr requests
- Config changes are propagated via zookeeper and listeners
- ACL changes depend on the implementation of the Authorizer interface
- The default SimpleACLAuthorizer uses zookeeper and listeners
Though all of these mechanisms are different, they are all commonly "eventually consistent". None of the mechanisms, as currently implemented, will block until the metadata has been propagated successfully. Changing this behavior would require a large amount of change to the KafkaController, additional inter-broker messages, and potentially a change to the Authorizer interface. These are are all changes that should not block the implementation of KIP-4.
The intermediate changes in KIP-4 should allow an easy transition to "complete blocking" when the work can be done. This is supported by providing optional local blocking in the mean time. This local blocking only blocks until the local state on the controller is correct. We will still provide a polling mechanism for users that do not want to block at all. A polling mechanism is required in the optimal implementation too because users still need a way to check state after a timeout occurs because operations like "create topic" are not transactional. Local blocking has the added benefit of avoiding wasted poll requests to other brokers when its impossible for the request to be completed. If the controllers state is not correct, then the other brokers cant be either. Clients who don't want to validate the entire cluster state is correct can block on the controller and avoid polling all together with reasonable confidence that though they may get a retriable error on follow up requests, the requested change was successful and the cluster will be accurate eventually.
Because we already add a timeout field to the requests wire protocols, changing the behavior to block until the cluster is consistent in the future would not require a protocol change. Though the version could be bumped to indicate a behavior change.
3. Admin Client
This component is intended to be a Kafka out-of-box client implementation for Admin commands.
Admin client will use Kafka NetworkClient
facility from /clients
for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).
Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.
Proposed API:
Rejected Alternatives
If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.
TopicMetadataRequest/Response: After some debate we decided not to evolve the TopicMetadataResponse to remove the ISR field (which currently can return incorrect information). There is a use-case for this in KAFKA-2225, so we will treat this a bug and fix it going forward. See KAFKA-1367 for more details