Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Solution

The proposal is to extend the concept of wildcard ACL (‘*’) to support wildcard-suffixed ACLs (‘name*’).
This means that it would be possible to create ACLs of type: User:clientA has READ access on topic orgA* from hostA,
i.e clientA has READ access to all topics that start with `orgA` from hostA.
The concept of wildcard-suffixed ACLs would be applicable to principal and resource names.

Storage model

Currently, ACLs are stored on ZK under path /kafka-acl/<resource-type>/<resource-name>.

For example:
ACLs for topic topicNamewould be stored under /kafka-acl/Topic/topicName.
ACLs for consumer group groupIdwould be stored under /kafka-acl/Group/groupId.

An example ACL definition looks like:

$ get /kafka-acl/Topic/topicName
{"version":1,"acls":[{"principal":"User:clientA","permissionType":"Allow","operation":"Read","host":"*"},{"principal":"User:clientA","permissionType":"Allow","operation":"Write","host":"*"},{"principal":"clientB","permissionType":"Allow","operation":"Write","host":"host1"}]}

Current supported resource names are either full resource names like topicName or a special wildcard '*'.

$ get /kafka-acl/Topic/*
{"version":1,"acls":[{"principal":"User:clientA","permissionType":"Allow","operation":"Read","host":"*"}]}
which means that clientA has read access to all topics from all hosts.

Now, we extend the same storage model to store ACLs under wildcard-suffixed resource names like orgName*.

$ get /kafka-acl/Topic/teamA*
{"version":1,"acls":[{"principal":"User:clientA","permissionType":"Allow","operation":"Read","host":"*"}]}

Wildcard-suffixed principal names are also supported
$ get /kafka-acl/Topic/teamA*
{"version":1,"acls":[{"principal":"User:clientA*","permissionType":"Allow","operation":"Read","host":"*"}]}

$ get /kafka-acl/Topic/topicName
{"version":1,"acls":[{"principal":"User:clientA*","permissionType":"Allow","operation":"Read","host":"*"}]}

ACLs write path

The above storage model ensures that there are NO changes on the ACL write path.

Input resourceName would be treated the same way as earlier i.e. the appropriate ACLs would be rewritten at /kafka-acl/<resource-type>/<resource-name> location.

$ get /kafka-acl/Topic/orgName*
{"version":1,"acls":[{"principal":"User:clientA","permissionType":"Allow","operation":"Read","host":"*"}]}

ACLs read path

On read path, we look for all matching ACLs when:

a) getMatchingAcls(resourceWithWildcardSuffix) or getMatchingAcls(principalWithWilcardSuffix) is called.
b) authorize(…) is called.

Access would be allowed if there is at least one ALLOW matching acl and no DENY matching acl (current behavior is maintained). Note that the length of the prefix doesn't play any role here.

Matching algorithm

 

Code Block
languagejava
titleMatching algorithm
  /**
    * Returns true if two strings match, both of which might be ending in a special wildcard which matches everything.
    * Examples:
    *   matchWildcardSuffixedString("rob", "rob") => true
    *   matchWildcardSuffixedString("*", "rob") => true
    *   matchWildcardSuffixedString("ro*", "rob") => true
    *   matchWildcardSuffixedString("rob", "bob") => false
    *   matchWildcardSuffixedString("ro*", "bob") => false
    *
    *   matchWildcardSuffixedString("rob", "*") => true
    *   matchWildcardSuffixedString("rob", "ro*") => true
    *   matchWildcardSuffixedString("bob", "ro*") => false
    *
    *   matchWildcardSuffixedString("ro*", "ro*") => true
    *   matchWildcardSuffixedString("rob*", "ro*") => false
    *   matchWildcardSuffixedString("ro*", "rob*") => true
    *
    * @param valueInZk Value stored in ZK in either resource name or Acl.
    * @param input Value present in the request.
    * @return true if there is a match (including wildcard-suffix matching).
    */
  def matchWildcardSuffixedString(valueInZk: String, input: String): Boolean = {
    if (valueInZk.equals(input) || valueInZk.equals(Acl.WildCardString) || input.equals(Acl.WildCardString)) {
      // if strings are equal or either of acl or input is a wildcard
      true
    } else if (valueInZk.endsWith(Acl.WildCardString)) {
      val aclPrefix = valueInZk.substring(0, valueInZk.length - Acl.WildCardString.length)
      if (input.endsWith(Acl.WildCardString)) {
        // when both acl and input ends with wildcard, non-wildcard prefix of input should start with non-wildcard prefix of acl
        val inputPrefix = input.substring(0, input.length - Acl.WildCardString.length)
        inputPrefix.startsWith(aclPrefix)
      } else {
        // when acl ends with wildcard but input doesn't, then input should start with non-wildcard prefix of acl
        input.startsWith(aclPrefix)
      }
    } else {
      if (input.endsWith(Acl.WildCardString)) {
        // when input ends with wildcard but acl doesn't, then acl should start with non-wildcard prefix of input
        val inputPrefix = input.substring(0, input.length - Acl.WildCardString.length)
        valueInZk.startsWith(inputPrefix)
      } else {
        // when neither acl nor input ends with wildcard, they have to match exactly.
        valueInZk.equals(input)
      }
    }
  }

...