You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 96 Next »

The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.

Status

Current state: Accepted

Discussion thread: here

JIRA Unable to render Jira issues macro, execution error.

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Users of Kafka have created dozens of different systems to work with Kafka. Providing a wire protocol that allows the brokers to execute administrative code and public api/client has many benefits including:

  • Allows clients in any language to administrate Kafka
    • Wire protocol is supported by any language
  • Provides public client for performing admin operations
    • Ensures integration test code in other projects and clients maintains compatibility
    • Prevents users from needing to use the Command classes and work around standard output and system exits
  • Removing the need for admin scripts (kafka-topics.sh, kafka-acls.sh, etc) to talk directly to Zookeeper. 
    • Allows ZNodes to be completely locked down via ACLs
    • Further hides the Zookeeper details of Kafka

Public Interfaces

  • Changes to Wire Protocol:
    • Adds the following new Request/Response messages:
      • CreateTopic
      • AlterTopic
      • DeleteTopic
      • ListAcls
      • AlterAcls
      • DescribeConfig
      • AlterConfig
    • Modifies Metadata Response to allowing polling for in-progress or complete admin operations. Added fields include:
  • New Java client: AdminClient - a Wire Protocol client for administrative operations

Proposed Changes

Proposed changes include 4 parts:

  1. Wire protocol additions and changes
  2. Server-side message handlers and authorization
  3. New Java AdminClient implementation

  4. Refactor admin scripts and code to use new client where appropriate

Follow Up Changes

Changes that should be done shortly after or are enabled by this KIP included:

  • Review privileges for listing and altering ACLs to be more fine grained.
  • Provide an Authorizer interface using the new Java classes used by the ACL requests/responses
    • Deprecate the old one to encourage transition

Details

1. Wire Protocol Extensions

Schema

Overall the idea is to extend Wire Protocol to cover all existing admin commands so that a user does not need to talk directly to Zookeeper and all commands can be authenticated via Kafka. At the same time, since the Wire Protocol is a public API to the Kafka cluster, it was agreed that the new Admin schema needs to be "orthogonal", i.e. new messages shouldn't duplicate each other or existing requests, if those already cover particular use cases.

Finally, admin requests are likely to be used not only in CLI tools, where the common use case is create/change/delete a single entity. Since Kafka is able to maintain a huge number of topics it is vital user can efficiently request many commands at one time. That's why all admin messages essentially are batch requests, i.e. it is possible to group commands of one type for many topics in one batch reducing network calls. At the same time to make Schema usage transparent and compliant with existing requests (such as Produce and Fetch) if batch request includes more than one instruction for a specific topic only the last from the list will be executed, others will be silently ignored.

 

New Protocol Errors

It is proposed to use existing / add these error codes to the protocol.

Error

Description

Requests
TopicAlreadyExistsTopic with this name already exists.Create
InvalidTopic (existing)Topic name contains invalid characters or doesn't exist.Create, Alter, Delete
InvalidPartitionsPartitions field is invalid (e.g. negative or increasing number of partitions in existing topic)Create, Alter
InvalidReplicationFactorReplicationFactor field is invalid (e.g. negative)Create, Alter
InvalidReplicaAssignmentReplicaAssignment field is invalid (e.g. contains duplicates)

Create, Alter

InvalidTopicConfiguration

Either topic-level config setting or value is incorrect.

Create
DecreasePartitionsNotAllowedInvalid Partitions argument: decreasing partitions is prohibited when altering topic.Alter
ReassignPartitionsInProgressReassign partitions procedure has been already started.Alter

Generally, a client should have enough context to provide descriptive error message.

The same notation as in  A Guide To The Kafka Protocol is used here. 

ACL Admin Schema

List ACLs Request

 

ListAclsRequest => principal resource 
  resource => resource_type resource_name 
    resource_type => INT8
    resource_name => STRING
  principal => NULLABLE_STRING
Request semantics:
  1. Can be sent to any broker
  2. If a non-null principal is provided the returned ACLs will be filtered by that principle, otherwise ACLs for all principals will be listed. 
  3. If a resource with a resource_type != -1 is provided ACLs will be filtered by that resource, otherwise ACLs for all resources will be listed.
  4. Any principle can list their own ACLs where the permission type is "Allow", Otherwise the principle must be authorized to the "All" Operation on the "Cluster" resource to list ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
List ACLs Response

 

ListAclsResponse => [responses] error_code 
responses => resource [acls] resource => resource_type resource_name resource_type => INT8 resource_name => STRING acls => acl_principle acl_permission_type acl_host acl_operation acl_principle => STRING acl_permission_type => INT8 acl_host => STRING acl_operation => INT8 error_code => INT16
Alter ACLs Request

 

AlterAclsRequest => ...
Request semantics:
  1. Can be sent to any broker
  2. If a non-null principal is provided the returned ACLs will be filtered by that principle, otherwise ACLs for all principals will be listed. 
  3. If a resource with a resource_type != -1 is provided ACLs will be filtered by that resource, otherwise ACLs for all resources will be listed.
  4. The principle must be authorized to the "All" Operation on the "Cluster" resource to alter ACLs.
    • Unauthorized requests will receive a ClusterAuthorizationException
    • This will be reviewed as a follow up ACLs review after this KIP. See Follow Up Changes.
Alter ACLs Response

 

AlterAclsResponse => ...

 

Topic Admin Schema

Create Topic Request

 

CreateTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment [ConfigEntry]]
TopicName => string
Partitions => int32
Replicas => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
ConfigEntry => ConfigKey ConfigValue
 ConfigKey => string
 ConfigValue => string
CreateTopicRequest is a batch asynchronous request to initiate topic creation with either predefined or automatic replica assignment and optionally topic configuration.
Request semantics:
  1. Only one from (Partitions + ReplicationFactor), ReplicaAssignment can be defined in one instruction. (Note: there is a special use case - automatic topic creation for TopicMetadataRequest, to trigger it user should set client_id=consumer and define only topic name). If both parameters are specified - ReplicaAssignment takes precedence.
  2. In case ReplicaAssignment is defined number of partitions and replicas will be calculated from the supplied ReplicaAssignment. In case of defined (Partitions + ReplicationFactor) replica assignment will be automatically generated by the server.
  3. Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Create Topic Response

 

CreateTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

CreateTopicResponse contains a map between topic and topic creation result error code (see New Protocol Errors). 

Alter Topic Request

 

AlterTopicRequest => [TopicName Partitions ReplicationFactor ReplicaAssignment]
TopicName => string
Replicas => int32
Partitions => int32
ReplicaAssignment => [PartitionId [ReplicaId]]
AlterTopicRequest is a batch asynchronous request to initiate topic alteration: replication parameters and replica assignment.
Request semantics:

1. If ReplicaAssignment is defined

    ReplicationFactor and Partitions arguments are ignored in this case.

    For each partition in ReplicaAssignment:

    1.1 If such partition exists and assignment is different from the current replica assignment

        It's a "reassign partition" request - add it to reassign-partitions json

    1.2 If such partition doesn't exist

        It's an "add partition" request - change topic metadata in zookeeper to trigger increase partition logic

2. Else if ReplicationFactor is defined

    2.1 If Partitions is defined    

        Regenerate replica assignment for all existing and newly added partitions, goto 1.

    2.2 If Partitions is not defined     

        Regenerate replica assignment only for existing partitions, goto 1.

3. Else if Partitions is defined (ReplicaAssignment and ReplicationFactor are not defined):

    3.1 If Partitions is less than current number of partitions return error code InvalidPartitions (since increasing number of partitions is not allowed).

    3.2 Otherwise, generate automatically replica assignment for newly added partitions, goto 1.

 

Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.

   

Alter Topic Response

 

AlterTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

AlterTopicResponse is similar to CreateTopicResponse.
Delete Topic Request

 

DeleteTopicRequest => [TopicName]
TopicName => string

 

DeleteTopicRequest requires only topic names which should be deleted.
Multiple instructions for the same topic in one request will be silently ignored, only the last from the list will be executed.
Delete Topic Response

 

DeleteTopicResponse => [TopicName ErrorCode]
ErrorCode => int16
TopicName => string

DeleteTopicResponse is similar to CreateTopicResponse.

2. Server-side Admin Request handlers

All incoming requests will be handled by a specific helper class called from KafkaApis - TopicCommandHelper.

All these commands are already implemented as standalone CLI tools, so there is no need to re-implement them. Most of command classes are strongly coupled with CLI logic and can hardly be refactored, so for now (before we remove standalone CLI commands) most likely the logic from those classes will be extracted and copied to separate class (as proposed - TopicCommandHelper).

3. Admin Client

This component is intended to be a Kafka out-of-box client implementation for Admin commands.

Admin client will use Kafka NetworkClient facility from /clients for cluster communication. Besides Admin commands, client will handle cluster metadata cache and will provide user with a convenient way of handling long running commands (e.g. reassign partitions).

Since Topic commands will support batching (and so will AdminClient) user besides Admin API will be provided with request builders which will help to create requests correctly.

Proposed API:

public class AdminClient {

/**
* A client is instantiated by providing a set of key-value pairs as configuration. Most
* of the settings will be related to NetworkClient
*
* @param properties settings related to Network client and at least one broker from KafkaCluster to connect to
*/
public AdminClient(Properties properties);

/**
* Initiates topics creation.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata about newly created topics was propagated
* to all brokers
*
* @param createTopicRequestBody holder (built by means of respective Builder) of all required arguments to create topics
* @return java.util.concurrent.Future which holds topics creation result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> createTopics(CreateTopicRequestBody createTopicRequestBody) throws ApiException;

/**
* Initiates topics alteration.
* This is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated metadata about altered topics was propagated
* to all brokers
*
* @param alterTopicRequestBody holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic creation was not even started
*/
public Future<Map<String, Errors>> alterTopics(AlterTopicRequestBody alterTopicRequestBody) throws ApiException;

/**
* Initiates topic deletion.
* This is an asynchronous call, it returns immediately once server has accepted request and marked requested topics for deletion in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that metadata with updated topic list was propagated to
* all brokers
*
* @param topics topic names to be deleted
* @return java.util.concurrent.Future which holds topics deletion result - a map topic-name - error code
*
* @throws ApiException in case of global error, which means topic deletion was not even started
*/
public Future<Map<String, Errors>> deleteTopics(List<String> topics) throws ApiException;

/**
* Lists all available topics in Kafka cluster.
* Topic is considered available if all brokers in cluster have received and cached metadata about it
*
* @return list of topic names
*
* @throws ApiException
*/
public List<String> listTopics() throws ApiException;

/**
* TODO: not finalized yet
* Request replication information about Kafka topics
*
* @return a mapping between topic name and topic description
* @throws ApiException in case of global error, which means topic description cannot be fetched for all topics
*/

public Map<String, DescribeTopicOutput> describeTopics(List<String> topicNames) throws ApiException;

/**
* Initiates config alteration. This is an asynchronous call, it returns immediately once the server has accepted request and stored/changed respective data in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure that updated configs were persisted
*

* @param entityType Type of entity being described (topic, client etc..)
* @param alterConfigRequest holder (built by means of respective Builder) of all required arguments to alter topics
* @return java.util.concurrent.Future which holds topics alteration result - a map topic-name - complete after image of the entity configs
*
* @throws ApiException in case of the configs could be altered for all topics
*/
public Future<Map<String, EntityConfig>> alterConfig(String entityType, AlterConfigRequest) throws ApiException;

/**

* Describes config for any entity

* @param entityType Type of entity being described (topic, client etc..)

* @param entityNames Array of entity names to describe (topic names, client id's etc)

* @return a mapping between entity name and it's config. If config cannot be fetched for a particular entity, an error value of EntityConfig is returned
* @throws ApiException in case config cannot be fetched for all topics
*/
public Map<String, EntityConfig> describeConfig(String entityType, List<String> entityNames) throws ApiException;


/**
* Initiates long-running reassign partitions procedure.
* This is an asynchronous call, it returns immediately once server has accepted request, and created admin path in zookeeper.
* To simulate a simple blocking call Future.get can be called. This will ensure all that all partitions reassignments have completed.
* Note: currently there are only two possible states for reassigned partition: Completed, In Progress.
*
* @param reassignmentData schema among which replicas partitions will be reassigned
*
* @return java.util.concurrent.Future which is completed once all partitions have been reassigned
*
* @throws ApiException in case partition reassignment wasn't initiated on server
*/
public Future<Void> reassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException;


/**
* Checks the interim status of the partitions reassignment.
* Reassignment for concrete partition is considered completed if partition has been removed from
* admin zookeeper path and all cluster brokers have received and cached relevant AR metadata for the
* given partition
*
* @param reassignmentData schema same as was used for reassign partitions request
*
* @return two maps - completed and partitions for which reassignment is still in progress
* @throws ApiException in case reassignment verification wasn't initiated on server
*/
public ReassignmentResult verifyReassignPartitions(PartitionReassignmentData reassignmentData) throws ApiException;

}

Compatibility, Deprecation, and Migration Plan

 

Rejected Alternatives

If there are alternative ways of accomplishing the same thing, what were they? The purpose of this section is to motivate why the design is the way it is and not some other way.

TopicMetadataRequest/Response: After some debate we decided not to evolve the TopicMetadataResponse to remove the ISR field (which currently can return incorrect information). There is a use-case for this in KAFKA-2225, so we will treat this a bug and fix it going forward. See KAFKA-1367 for more details


  • No labels