Status
Current state: Under Discussion
Discussion thread:
JIRA:
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
Motivation
Background
The KRaft controller was designed to be isolated from Kafka clients. This isolation helps prevent misbehaving clients from compromising the performance of the system. It also clarifies node roles: brokers are responsible for client traffic. However, there are certain edge cases where it is reasonable for clients to communicate with KRaft controllers.
Sometimes, we would like to target controllers directly. Typically this is so that we can perform an administrative operation without involving the brokers. DESCRIBE_QUORUM is a great example. This operation has nothing to do with the brokers, and may indeed be useful for debugging when other parts of the system are down. Another good example is using INCREMENTAL_ALTER_CONFIGS to make log4j level changes on a KRaft controller.
Proposed Changes
New Kafka clients which support KIP-919 will be able to target KRaft controllers directly. This only applies to admin clients, since it is not possible to produce or consume from a quorum controller.
Public Interfaces
bootstrap.controllers configuration
There will be a new AdminClient
configuration, bootstrap.controllers
. This configuration contains a comma-separated series of hostname:port entries. When this configuration is specified, the AdminClient will talk directly to the controller quorum and the brokers will not be involved.
KafkaProducer
and KafkaConsumer
will not support bootstrap.controllers.
Only AdminClient
will support it.
It is an error to set both bootstrap.controllers
and bootstrap.servers
. Only one can be set at a time. It is also an error to include broker endpoints in --bootstrap-controllers
. If we contact a broker via this mechanism, the command will fail.
Just as with bootstrap.servers
, the supplied server list doesn't need to be exhaustive. As long as we can contact one of the provided controllers the RPC can proceed.
Command-line Changes
New Arguments
The following command-line tools will get a new --bootstrap-controllers
argument:
- kafka-acls.sh
- kafka-cluster.sh
- kafka-configs.sh
- kafka-delegation-tokens.sh
- kafka-features.sh
- kafka-metadata-quorum.sh
kafka-metadata-shell.sh
- kafka-reassign-partitions.sh
When the --bootstrap-controllers
argument is used --bootstrap-servers
must not be specified.
The --bootstrap-controllers
flag will set the bootstrap.controllers
configuration described above. It will also clear the bootstrap.servers
configuration if that has been set in some other way (for example, via a configuration file provided to the command-line tool).
Changes to kafka-metadata-shell.sh
The metadata shell will now have these arguments:
The Apache Kafka metadata tool positional arguments: command The command to run. optional arguments: -h, --help show this help message and exit --directory DIRECTORY, -d DIRECTORY The __cluster_metadata-0 directory to read. --bootstrap-controllers CONTROLLERS, -q CONTROLLERS The bootstrap.controllers, used to communicate directly with the metadata quorum. --config CONFIG Path to a property file containing a Kafka configuration
Note that:
- The
--snapshot
argument has been replaced by a--directory
argument that reads the whole directory, not just a snapshot file - There is no need for a
--cluster-id
flag, since we will query the controller for its cluster ID prior to creating the Raft client. - There is now a
--config
argument which can be used to pass a configuration file.
Since kafka-metadata-shell.sh
is at an "evolving" level of interface stability, these changes should be OK to make without a deprecation period.
MetadataRequest Changes
There will be a new version of MetadataRequest. It will contain an additional field, DirectToKRaftControllerQuorum
.
This field defaults to false.
diff --git a/clients/src/main/resources/common/message/MetadataRequest.json b/clients/src/main/resources/common/message/MetadataRequest.json index 5da95cfed6..8e5e765d11 100644 --- a/clients/src/main/resources/common/message/MetadataRequest.json +++ b/clients/src/main/resources/common/message/MetadataRequest.json @@ -18,7 +18,7 @@ "type": "request", "listeners": ["zkBroker", "broker"], "name": "MetadataRequest", - "validVersions": "0-12", + "validVersions": "0-13", "flexibleVersions": "9+", "fields": [ // In version 0, an empty array indicates "request metadata for all topics." In version 1 and @@ -50,6 +50,8 @@ { "name": "IncludeClusterAuthorizedOperations", "type": "bool", "versions": "8-10", "about": "Whether to include cluster authorized operations." }, { "name": "IncludeTopicAuthorizedOperations", "type": "bool", "versions": "8+", - "about": "Whether to include topic authorized operations." } + "about": "Whether to include topic authorized operations." }, + { "name": "DirectToKRaftControllerQuorum", "type": "bool", "versions": "13+", + "about": "Whether to target the KRaft controller quorum." } ] } diff --git a/clients/src/main/resources/common/message/MetadataResponse.json b/clients/src/main/resources/common/message/MetadataResponse.json index 928d905152..085c0d919f 100644 --- a/clients/src/main/resources/common/message/MetadataResponse.json +++ b/clients/src/main/resources/common/message/MetadataResponse.json @@ -42,7 +42,7 @@ // Version 11 deprecates ClusterAuthorizedOperations. This is now exposed // by the DescribeCluster API (KIP-700). // Version 12 supports topicId. - "validVersions": "0-12", + "validVersions": "0-13", "flexibleVersions": "9+", "fields": [ { "name": "ThrottleTimeMs", "type": "int32", "versions": "3+", "ignorable": true, @@ -94,6 +94,8 @@ "about": "32-bit bitfield to represent authorized operations for this topic." } ]}, { "name": "ClusterAuthorizedOperations", "type": "int32", "versions": "8-10", "default": "-2147483648", - "about": "32-bit bitfield to represent authorized operations for this cluster." } + "about": "32-bit bitfield to represent authorized operations for this cluster." }, + { "name": "FromKRaftController", "type": "bool", "versions": "13+", "default": "false", + "taggedVersions": "13+", "tag": 0, "about": "Whether the response was sent back from a KRaft controller." } ] }
DirectToKRaftControllerQuorum Handling Matrix
DirectToKRaftControllerQuorum | If Received by Broker | If Received by KRaft Controller |
---|---|---|
false | traditional broker MetadataResponse | UNSUPPORTED_VERSION MetadataResponse |
true | NOT_CONTROLLER MetadataResponse | controller MetadataResponse described below |
KRaft Controller MetadataRequest
When sending a METADATA request to the controller, the following request fields must be set to false:
- AllowAutoTopicCreation
- IncludeClusterAuthorizedOperations
- IncludeTopicAuthorizedOperations
If any of them are set to true, an INVALID_REQUEST MetadataResponse will be returned.
Topics must be set to an empty array, indicating no interest in any topics.
KRaft Controller MetadataResponse
The KRaft controller MetadataResponse will always set:
- FromKRaftController to true
- ClusterId to the Kafka cluster ID
- The ControllerId to the true Raft leader ID
Response type | Topics Section | "Brokers" Section | Comments |
---|---|---|---|
Successful response from KRaft controller | empty | Controller endpoint information as given in controller.quorum.voters | The "direct to controller" case. |
Error response from if topics were given in request | the given topics, with an INVALID_REQUEST error for each | Empty | It is an error to ask about specific topics. |
Error response if no topics were given in request | the __cluster_metadata topic with the expected error code | Empty | There is no top-level error code in MetadataResponse, so we use the __cluster_metadata topic to send back our error. |
Compatibility, Deprecation, and Migration Plan
There should be no compatibility impact since current controllers don't handle MetadataRequest.
It's worth noting here that we continue to recommend putting KRaft controllers on a separate network from Kafka clients. This is always a good idea in order to have the best security and isolation.
Rejected Alternatives
bootstrap.controllers versus direct.to.controller configuration
Rather than having a bootstrap.controllers
configuration, we could have a separate configuration like direct.to.controller
and put the controller servers into bootstrap.servers
. Similarly, we could reuse --bootstrap-server
erather than adding --bootstrap-controllers
.
We decided to go with the scheme proposed above to make it clearer when a tool was going directly to the controller. This also makes it clearer which command-line tools have this capability and which do not.
For example, kafka-console-consumer.sh
does not have the capability to go direct to the controller, since the controller does not handle produces. Therefore, it's intuitive that kafka-console-consumer.sh
lacks the --bootstrap-controllers
flag.
Another issue is that in the future, we may want to support using the controllers as bootstrap servers for the brokers. The scheme above leaves the door open for this, whereas a scheme that reused existing configurations would not.