...
When resources are described using AdminClient
(e.g. AdminClient#describeTopics
), the metadata returned for the resource includes all the metadata available in the broker to enable the client to perform various operations on the resource, but it does not include any information related to which operations the user requesting the metadata is allowed authorized to perform.
Kafka protocol currently allows users with Describe
access on the Cluster
to obtain describe ACLs, but even users with this access need to parse ACLs to determine what operations are permitted for the user on each resource. This requires complex logic since resources may be wildcarded or prefixed and operations or hosts may also be wildcarded.
Users with Describe
access can determine which operations are authorized by invoking the operation and processing any resulting AuthorizationException
. Since this information is accessible to users with Describe
access any way, it will be good to have a simpler option way for users to determine what operations they are authorized to perform on a resource that they describe. Users without Describe
access will continue to receive errors that don’t leak any information about the existence of the resource.
...
Code Block | ||||
---|---|---|---|---|
| ||||
Metadata Request => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations <== ADDED include_cluster_authorized_operations, include_topic_authorized_operations topics => STRING allow_auto_topic_creation => BOOLEAN include_cluster_authorized_operations => BOOLEAN <== NEW include_topic_authorized_operations => BOOLEAN <== NEW Metadata Response => throttle_time_ms [brokers] cluster_id controller_id [topic_metadata] [authorized_operations] <== ADDED authorized_operations throttle_time_ms => INT32 brokers => node_id host port rack node_id => INT32 host => STRING port => INT32 rack => NULLABLE_STRING cluster_id => NULLABLE_STRING controller_id => INT32 topic_metadata => error_code topic is_internal [partition_metadata] [authorized_operations] <== ADDED authorized_operations error_code => INT16 topic => STRING is_internal => BOOLEAN partition_metadata => error_code partition leader leader_epoch [replicas] [isr] [offline_replicas] error_code => INT16 partition => INT32 leader => INT32 leader_epoch => INT32 replicas => INT32 isr => INT32 offline_replicas => INT32 authorized_operations => INT8 <== NEW |
...
All relevant DescribeXxxOptions
classes will include a new field and corresponding accessor to request authorized operations. By default the option is disabled. The classes affected are:
We want to include ops for topics, is there a place to include for Cluster?DescribeClusterOptions
: This is a metadata request.Operations returned will be a subset of
{Describe, Alter, DescribeConfigs, AlterConfigs}
, DescribeConfigsDescribeTopicsOptions
: Also a metadata request, Operations returned will be a subset of{Create, Delete, Read, Write, Describe, Alter, DescribeConfigs
, AlterConfigs
}
DescribeConsumerGroupsOptions
: Operations returned will be a subset of{Describe, Read, Delete}
Example: DescribeTopicsOptions
Code Block | ||||
---|---|---|---|---|
| ||||
public class DescribeTopicsOptions extends AbstractOptions<DescribeTopicsOptions> {
private boolean includeAuthorizedOperations;
/** Retain existing code here */
public DescribeTopicsOptions includeAuthorizedOperations(boolean includeAuthorizedOperations) {
this.includeAuthorizedOperations = includeAuthorizedOperations;
return this;
}
public boolean includeAuthorizedOperations() {
return includeAuthorizedOperations;
}
} |
The corresponding resource description or result classes will be extended to provide an optional set of authorized operations.
Example: ConsumerGroupDescription
Code Block | ||||
---|---|---|---|---|
| ||||
public class ConsumerGroupDescription { private Set<AclOperation> authorizedOperations; /** Retain existing code here */ public Set<AclOperation> authorizedOperations() { if (authorizedOperations.isEmpty()) throw new IllegalArgumentException("Authorized operations were not obtained from the broker"); return authorizedOperations; } } |
Authorizer API Changes
Proposed Changes
A new method will be added to the kafka.security.auth.Authorizer
interface to obtain the collection of authorized operations associated with a resource. Default implementation of this method will use the existing `authorize
` API to check every supported operation on the resource. This ensures that custom authorizers will continue to work without change. The built-in authorizer implementation SimpleAclAuthorizer
will include a more performant implementation that traverses ACLs once to retrieve all the authorized operations for the user.
Code Block | ||||
---|---|---|---|---|
| ||||
trait Authorizer extends Configurable {
def authorizedOperations(session: Session, resource: Resource): Set[Operation] = {
// Use authorize() to obtain permitted operations for the `session` on `resource`
}
....
} |
Proposed Changes
As described above, Kafka protocol for requests and responses to describe broker resources will be extended to request authorized resources and return the set of authorized resources if requested. Broker will use the pluggable Authorizer to obtain the set of permitted operations for the Session performing the Describe
operation. SimpleAclAuthorizer
will be updated to traverse through ACLs once and return all the matching operations. Custom authorizers may be extended to do the same, but a default implementation that uses the existing `authorize` method to authorize every supported operation ensures that existing authorizers can be used without change.
Broker will check Describe
access on the resources before returning any metadata, so only users authorized for Describe
may obtain the additional information provided by this KIP. Users without Describe access continue to get errors that dont leak information about the existence of resourcesDescribe the new thing you want to do in appropriate detail. This may be fairly extensive and have large subsections of its own. Or it may be a few sentences. Use judgement based on the scope of the change.
Compatibility, Deprecation, and Migration Plan
...