Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current state: "Under DiscussionAccepted"

Discussion thread: here [Change the link from the KIP proposal email archive to your own email thread]

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-7922

...

The version of the relevant requests will be bumped up. Authorized operations will be returned as an array of INT8 values, each of which is consistent with AclOperation used in ACL requests and responsesINT32 with bits set for each permitted operation. The bitfield corresponding to each operation with be the existing operation code used in AclOperation. If in future we exceed 32 operations and need more bits, the field can be made into INT64 with a version bump. The highest code currently in use is 12. Broker will set the bitfields corresponding to actual supported operations on the resource and the bits corresponding to AclOperation.ALL, AclOperation.ANY and AclOperation.UNKNOWN will never be set by the broker.

Metadata Request and Response: v8

Code Block
languagejava
titleMetadata v8
Metadata Request => [topics] allow_auto_topic_creation include_cluster_authorized_operations include_topic_authorized_operations <== ADDED include_cluster_authorized_operations, include_topic_authorized_operations
  topics => STRING
  allow_auto_topic_creation => BOOLEAN
  include_cluster_authorized_operations => BOOLEAN  <== NEW
  include_topic_authorized_operations => BOOLEAN  <== NEW

Metadata Response => throttle_time_ms [brokers] cluster_id controller_id [topic_metadata] [authorized_operations] <== ADDED authorized_operations
  throttle_time_ms => INT32
  brokers => node_id host port rack 
    node_id => INT32
    host => STRING
    port => INT32
    rack => NULLABLE_STRING
  cluster_id => NULLABLE_STRING
  controller_id => INT32
  topic_metadata => error_code topic is_internal [partition_metadata] [authorized_operations]  <== ADDED authorized_operations
    error_code => INT16
    topic => STRING
    is_internal => BOOLEAN
    partition_metadata => error_code partition leader leader_epoch [replicas] [isr] [offline_replicas] 
      error_code => INT16
      partition => INT32
      leader => INT32
      leader_epoch => INT32
      replicas => INT32
      isr => INT32
      offline_replicas => INT32
    authorized_operations => INT8 INT32      <== NEW
  authorized_operations => INT32        <== NEW


DescribeGroups Request and Response v3

Code Block
languagejava
titleDescribeGroups v3
DescribeGroups Request => [group_ids] include_authorized_operations  <== ADDED include_authorized_operations
  group_ids => STRING
  include_authorized_operations => BOOLEAN  <== NEW

DescribeGroups Response => throttle_time_ms [groups]
  throttle_time_ms => INT32
  groups => error_code group_id state protocol_type protocol [members] [authorized_operations] <== ADDED authorized_operations
    error_code => INT16
    group_id => STRING
    state => STRING
    protocol_type => STRING
    protocol => STRING
    members => member_id client_id client_host member_metadata member_assignment 
      member_id => STRING
      client_id => STRING
      client_host => STRING
      member_metadata => BYTES
      member_assignment => BYTES
    authorized_operations => INT8INT32        <== NEW


AdminClient API Changes

...

The corresponding resource description or result classes returned by AdminClient will be extended to provide an optional set of authorized operations. The bits set in the new INT32 field in the response will be used to generate a set of AclOperation entries, where each operation code is derived from the position of the bit field. When new operations are added, older clients will ignore any authorized operations returned by the broker that is not supported by the client. The returned AclOperation set will never contain AclOperation.ANY, AclOperation.ALL or AclOperation.UNKNOWN.

Example: ConsumerGroupDescription

Code Block
languagejava
titleConsumerGroupDescription
public class ConsumerGroupDescription {
    private Set<AclOperation> authorizedOperations;
    /** Retain existing code here */
 
    public Set<AclOperation> authorizedOperations() {
        if (authorizedOperations.isEmpty())
            throw new IllegalArgumentException("Authorized operations were not provided by the broker");
        return authorizedOperations;
    }
}

...

ResourceType API Changes

A new method will be added to the kafkathe kafka.security.auth.Authorizer interface ResourceType trait to obtain the collection of authorized supported operations associated with a resource type. Default implementation of this method will use the existing `authorize()` API to check every supported operation on the resource. This ensures that custom authorizers will continue to work without change. The built-in authorizer implementation SimpleAclAuthorizer will include a more performant implementation that traverses ACLs once to retrieve all the authorized operations for the user. All permitted operations on the resource including any that are implicitly allowed by ACLs will be included in the returned set. For example, if a Read ACL is found, both Read and Describe will be included since both are permitted.This will be used to maintain supported operations for a resourceType.

This can be used by custom authorizers to determine authorized operations. 

Code Block
languagescala
titleAuthorizer API ResourceType changes
	  
Sealed trait AuthorizerResourceType extends Configurable BaseEnum with Ordered[ ResourceType ] {
  def authorizedOperations(session: Session, resource: Resource) .....
   
  // this method output will not include "All" Operation type
  def supportedOperations: Set[Operation]
}


case object Topic extends ResourceType {
  ...
  val supportedOperations = Set(Read, Write, Create, Describe, Delete, Alter, DescribeConfigs, AlterConfigs)
}

case object Group extends ResourceType {
  ...
  // Use authorize() to obtain permitted operations for the `session` on `resource`
  }
  ....val supportedOperations = Set(Read, Describe, Delete)
}

case object Cluster extends ResourceType {
  ...
  val supportedOperations = Set(Create, ClusterAction, DescribeConfigs, AlterConfigs, IdempotentWrite, Alter, Describe)
}

case object TransactionalId extends ResourceType {
  ...
  val supportedOperations = Set(Describe, Write)
}

case object DelegationToken extends ResourceType {
  ...
  val supportedOperations = Set(Describe)
}

Proposed Changes

As described above, Kafka protocol for requests and responses to describe broker resources will be extended to request authorized operations and return the set of authorized operations if requested. Broker will use its pluggable Authorizer to obtain the set of permitted operations for the Session performing the Describe operation. SimpleAclAuthorizer will be updated to traverse through ACLs once and return all the matching operations. Custom authorizers may be extended to do the same, but a default implementation that uses the existing `authorize()` method to authorize every supported operation ensures that existing authorizers can be used without change. If  If no authorizer is configured on the broker, the full set of supported operations on each resource will be returned.

...

  • Existing clients using older versions will not request authorized operations in Describe requests since the default is to disable this feature. This keeps older clients compatible with newer brokers.
  • Newer clients connecting to older brokers will use the older protocol version and hence will not request authorized operations.
  • When the AdminClient is talking to a broker which does not support KIP-430, it will fill in either null or UnsupportedVersionException for the returned ACL operations fields in objects.  For example, ConsumerGroupDescription#authorizedOperations will be null if the broker did not supply this information.  DescribeClusterResult#authorizedOperations will throw an exception if the option is requested but not supported by brokers UnsupportedVersionException if the broker did not supply this information.
  • When new operations are added, newer brokers may return operations that are not known to older clients. AdminClient will ignore any bit that is set in authorized_operations that is not known to the client. The Set<AclOperation> created by the client from the bits returned by the broker will only include operations that the client client knows about.

Changes dropped from the Proposal

Initially we proposed below API changes to the scala `Authorizer` trait with a default implementation so that existing implementations continue to work. But Scala 2.11 doesn't convert the default implementation in a trait to a default implementation in Java. So this breaks existing Java authorizer implementations when building with scala 2.11 version of core. Due to this we dropped below Authorizer API related changes. These changes can be implemented in future when we drop support for Scala 2.11.

Authorizer API Changes

A new method will be

...

added to the kafka.security.auth.Authorizer interface to obtain the collection of authorized operations associated with a resource. Default implementation of this method will use the existing `authorize()` API to check every supported operation on the resource. This ensures that custom authorizers will continue to work without change. The built-in authorizer implementation SimpleAclAuthorizer will include a more performant implementation that traverses ACLs once to retrieve all the authorized operations for the user. All permitted operations on the resource including any that are implicitly allowed by ACLs will be included in the returned set. For example, if a Read ACL is found, both Read and Describe will be included since both are permitted. If an ACL is found for all operations (AclOperation.ALL) on a resource, broker will explicitly list all supported operations of the resource, so that clients always receive the full set of actual permitted operations.

Code Block
languagescala
titleAuthorizer API changes
trait Authorizer extends Configurable {
  def authorizedOperations(session: Session, resource: Resource): Set[Operation] = {
    // Use authorize() to obtain permitted operations for the `session` on `resource`
  }
  ....
}


Rejected Alternatives

Add a new request to obtain authorized operations for different resources

...