You are viewing an old version of this page. View the current version.

Compare with Current View Page History

« Previous Version 9 Next »

Status

Current state"Under Discussion"

Discussion thread: here

JIRA: here

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

Motivation

Kafka supports authorize access to resources like topics, consumer groups etc. by way of ACLs. The current supported semantic of resource name and principal name in ACL definition is either full resource/principal name or special wildcard '*', which matches everything.

Kafka should support a way of defining bulk ACLs instead of specifying individual ACLs.
Example use cases:

  • Principal “com.company.product1.client” has access to all topics that start with “com.company.product1.”.
  • Principal “com.company.client1” has access to all consumer groups that start with “com.company.client1.”.
  • All principals start with “com.company.” can write to topic “com.company.public.topic”.

This support would greatly simplify ACL operational story in a multi-tenant environment.

Public Interfaces

  • We would add new methods getMatchingAcls(resource) and getMatchingAcls(principal) to the Authorizerinterface.

    Authorizer
      /**
        * Get set of all acls that match this resource.
        * @param resource regular or wildcard-suffixed resource name.
        * @return empty set if no acls are found, otherwise the acls for the matching resources.
        */
      def getMatchingAcls(resource: Resource): Set[Acl]
    
      /**
        * Get set of all acls that match this principal.
        * @param principal regular or wild-suffixed principal name.
        * @return empty Map if no acls exist, otherwise a map of resource -> acls.
        */
      def getMatchingAcls(principal: KafkaPrincipal): Map[Resource, Set[Acl]]

Proposed Changes

Solution

The proposal is to extend the concept of wildcard ACL (‘*’) to support wildcard-suffixed ACLs (‘name*’).
This means that it would be possible to create ACLs of type: User:clientA has READ access on topic orgA* from hostA,
i.e clientA has READ access to all topics that start with `orgA` from hostA.
The concept of wildcard-suffixed ACLs would be applicable to principal and resource names.

Storage model

Currently, ACLs are stored on ZK under path /kafka-acl/<resource-type>/<resource-name>.

For example:
ACLs for topic topicNamewould be stored under /kafka-acl/Topic/topicName.
ACLs for consumer group groupIdwould be stored under /kafka-acl/Group/groupId.

An example ACL definition looks like:

$ get /kafka-acl/Topic/topicName
{"version":1,"acls":[{"principal":"User:clientA","permissionType":"Allow","operation":"Read","host":"*"},{"principal":"User:clientA","permissionType":"Allow","operation":"Write","host":"*"},{"principal":"clientB","permissionType":"Allow","operation":"Write","host":"host1"}]}

Current supported resource names are either full resource names like topicName or a special wildcard '*'.

$ get /kafka-acl/Topic/*
{"version":1,"acls":[{"principal":"User:clientA","permissionType":"Allow","operation":"Read","host":"*"}]}
which means that clientA has read access to all topics from all hosts.

Now, we extend the same storage model to store ACLs under wildcard-suffixed resource names like orgName*.

$ get /kafka-acl/Topic/teamA*
{"version":1,"acls":[{"principal":"User:clientA","permissionType":"Allow","operation":"Read","host":"*"}]}

Wildcard-suffixed principal names are also supported
$ get /kafka-acl/Topic/teamA*
{"version":1,"acls":[{"principal":"User:clientA*","permissionType":"Allow","operation":"Read","host":"*"}]}

$ get /kafka-acl/Topic/topicName
{"version":1,"acls":[{"principal":"User:clientA*","permissionType":"Allow","operation":"Read","host":"*"}]}

ACLs write path

The above storage model ensures that there are NO changes on the ACL write path.

Input resourceName would be treated the same way as earlier i.e. the appropriate ACLs would be rewritten at /kafka-acl/<resource-type>/<resource-name> location.

$ get /kafka-acl/Topic/orgName*
{"version":1,"acls":[{"principal":"User:clientA","permissionType":"Allow","operation":"Read","host":"*"}]}

ACLs read path

On read path, we look for all matching ACLs when:

a) getMatchingAcls(resourceWithWildcardSuffix) or getMatchingAcls(principalWithWilcardSuffix) is called.
b) authorize(…) is called.

Access would be allowed if there is at least one ALLOW acl and no DENY acl (current behavior is maintained).

Matching algorithm

 

Matching algorithm
  /**
    * Returns true if two strings match, both of which might be ending in a special wildcard which matches everything.
    * Examples:
    *   matchWildcardSuffixedString("rob", "rob") => true
    *   matchWildcardSuffixedString("*", "rob") => true
    *   matchWildcardSuffixedString("ro*", "rob") => true
    *   matchWildcardSuffixedString("rob", "bob") => false
    *   matchWildcardSuffixedString("ro*", "bob") => false
    *
    *   matchWildcardSuffixedString("rob", "*") => true
    *   matchWildcardSuffixedString("rob", "ro*") => true
    *   matchWildcardSuffixedString("bob", "ro*") => false
    *
    *   matchWildcardSuffixedString("ro*", "ro*") => true
    *   matchWildcardSuffixedString("rob*", "ro*") => false
    *   matchWildcardSuffixedString("ro*", "rob*") => true
    *
    * @param valueInZk Value stored in ZK in either resource name or Acl.
    * @param input Value present in the request.
    * @return true if there is a match (including wildcard-suffix matching).
    */
  def matchWildcardSuffixedString(valueInZk: String, input: String): Boolean = {
    if (valueInZk.equals(input) || valueInZk.equals(Acl.WildCardString) || input.equals(Acl.WildCardString)) {
      // if strings are equal or either of acl or input is a wildcard
      true
    } else if (valueInZk.endsWith(Acl.WildCardString)) {
      val aclPrefix = valueInZk.substring(0, valueInZk.length - Acl.WildCardString.length)
      if (input.endsWith(Acl.WildCardString)) {
        // when both acl and input ends with wildcard, non-wildcard prefix of input should start with non-wildcard prefix of acl
        val inputPrefix = input.substring(0, input.length - Acl.WildCardString.length)
        inputPrefix.startsWith(aclPrefix)
      } else {
        // when acl ends with wildcard but input doesn't, then input should start with non-wildcard prefix of acl
        input.startsWith(aclPrefix)
      }
    } else {
      if (input.endsWith(Acl.WildCardString)) {
        // when input ends with wildcard but acl doesn't, then acl should start with non-wildcard prefix of input
        val inputPrefix = input.substring(0, input.length - Acl.WildCardString.length)
        valueInZk.startsWith(inputPrefix)
      } else {
        // when neither acl nor input ends with wildcard, they have to match exactly.
        valueInZk.equals(input)
      }
    }
  }

 

 

Compatibility, Deprecation, and Migration Plan

None.

Rejected Alternatives

.

  • No labels