The goals behind the command line shell are fundamentally to provide a centralized management for Kafka operations.
There are a lot of different kafka tools. Right now I think 5-6 of them are being used commonly. I was thinking we could start by taking https://cwiki.apache.org/confluence/display/KAFKA/System+Tools and https://cwiki.apache.org/confluence/display/KAFKA/Replication+tools and exposing them in a plugin type way for a command line shell interface. This would also include a new global broker configuration management and access to the tools we already have outside of the scripts both through a new wire protocol message type.
1) We need to add a new Admin message to the wire protocol that will be able to deal with passing the command line utility calls to the tools and global configuration manager on the broker (any broker). Any tool performing a task will (should be able to, need to flesh this out more) be able to execute but instead of on the command line will be on a broker thread. The controller will continue to-do the tasks it is doing today such as "create topic" however; the TopicCommand will be called from within the handleAdminRequestTools.
2) We need to implement the handleAdminRequest.
3) We need to build a client for the wire protocol. I think should be a simple CLI
It would be both: command line and shell.
so
kafka -b brokerlist -a reasign-partition status
would run from the cli and
kafka shell -b brokerlist
kafka>describe;
... kafka-topics.sh --describe
kafka>set topic_security['pci','profile','dss'] = true
...etc
An important item is that folks that are using existing tools we should have an easy api type way so they can keep doing that and get benefit too.
This interface should also have some monitoring stats too, e.g visualize in text the consumer lag trending between offset committed and log end offset.
kafka>use topic name;
kafka>stats;
not sure right this minute if we should do this in python, java or scala. I think whoever works on it can decide we can support accross the committers I think whatever it is. My thoughts are it should be in the ./clients folder.
... we "may" also want to have the CLI expose and run via HTTP REST too however; I think this can be quickly easily done by someone simply if we build it right.
4) The Global Configuration Manager. This is VERY important to the goals of these changes. MANY configuration (much like topic level) are actually global to EVERY broker and brokers (through server.properties) should be able to override but the "default" should come from storage (for now zookeeper).
a) setting this goes through the CLI and the handleAdminRequest()
b) using this is layered like this ... within KafkaApi we implement a new class that can flatten/figure out the right property. Check server.properties (we can make that level 1) and if not found use what we get from storage (for now zookeeper) and if not found then use what is default in the code. This will be very nice because you can set a default for EVERY broker for xyz configuration and not have to manage it accorss brokers with properties file. Centralized configuration
The top level for this work will be https://issues.apache.org/jira/browse/KAFKA-1694 and broken into sub tickets.
Potential Gotchas
- using RQ/RP wire protocol to the controller instead of the current way (via ZK admin path) may expose concurrency on the admin requests, which may not be supported yet. https://issues.apache.org/jira/browse/KAFKA-1305
Proposed RQ/RP Format
For each type of Admin Request a separate type of Wire protocol message is created.
Currently there are 5 types of messages which support TopicCommand - CreateTopic(Request | Response)
, AlterTopic
, DeleteTopic
, DescribeTopic
, ListTopics
. And a special message type to identify cluster info - ClusterMetadata
(read Kafka Admin Command Line Internals for details).
The same notation as in A Guide To The Kafka Protocol is used here. The only difference - new Kafka Protocol metatype - MaybeOf
("
?"
in notation), when used means value is optional in message. To define value existence special control byte is prepended before each value (0
- field is absent, otherwise - read value normally).
Create Topic Request
CreateTopicRequest => TopicName ?(Partitions) ?(Replicas) ?(ReplicaAssignment) [Config] TopicName => string Partitions => int32 Replicas => int32 ReplicaAssignment => string |
Create Topic Response
CreateTopicResponse => ErrorCode ?(ErrorDescription) ErrorCode => int16 ErrorDescription => string |
CreateTopicRequest
requires topic name and either (partitions+replicas) or replicas assignment to create topic (validation is done on server side). You can also specify topic-level configs to create topic with (to use default set an empty array), format key=value
.
CreateTopicResponse
is fairly simple - you receive error code (0
as always identifies NO_ERROR
) and optionally error description. Usually it will hold the higher level exception that happened during command execution.
Alter Topic Request
AlterTopicRequest => TopicName ?(Partitions) ?(ReplicaAssignment) [AddedConfig] [DeletedConfig] TopicName => string Partitions => int32 Replicas => int32
Deleted Config => string |
Alter Topic Response
AlterTopicResponse => ErrorCode ?(ErrorDescription) ErrorCode => int16 ErrorDescription => string
|
AlterTopicRequest
is similar to previous, to specify topic level settings that should be removed, use DeletedConfig
array (just setting keys).AlterTopicResponse
is similar to CreateTopicResponse
.
Delete Topic Request
DeleteTopicRequest => TopicName TopicName => string |
Delete Topic Response
DeleteTopicResponse => ErrorCode ?(ErrorDescription) ErrorCode => int16 ErrorDescription => string
|
DeleteTopicRequest
requires only topic name which should be deleted.
DeleteTopicResponse
is similar to CreateTopicResponse
.