Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

Discussion thread: here 

APPROVED

Discussion thread: https://lists.apache.org/thread.html/r56f658f1d1de2b09465d70be69a8bebfd4518663be5a88fba2e9e7c0%40%3Cdev.kafka.apache.org%3E

Vote thread: https://lists.apache.org/thread.html/r1e912a4e6b6def9fbaf8e0aeb7bbcfd612f3100df31782a307268a5c%40%3Cdev.kafka.apache.org%3E

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10619
 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-13598
JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
firstline144
titleclients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
linenumberstrue
    /**
     * Check if the caller is authorized to perform the given ACL operation on at least one
     * resource of the given type.
     *
     * @param1. requestContextFilter Requestout contextall includingthe requestresource resourceType,pattern securitycorresponding protocol,to andthe listenerrequestContext, nameAclOperation,
     * @param op  and ResourceType
     * 2. If wildcard deny Theexists, ACLreturn operationdeny to checkdirectly
     * @param3. resourceTypeFor any literal Theallowed resource, type
if there's no dominant literal *denied @returnresource, and
     *    no dominant prefixed denied resource, Returnreturn {@linkallow
 AuthorizationResult#ALLOWED} if the caller is* authorized4. toFor performany the
prefixed allowed resource, if there's *no dominant denied resource, return allow
     * 5. For any other cases, return deny
     *
     given ACL* operationIt onis atimportant leastto oneoverride resourcethis ofinterface thedefault givenin type.implementations because
     * 1. The interface default iterates all AclBindings multiple times, without any indexing,
     *    which is Returna {@linkCPU AuthorizationResult#DENIED}intense otherwisework.
     */
 2. The interface default AuthorizationResultrebuild authorizeAny(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
   several sets of strings, which is a memory intense work.
     if*
 (resourceType == ResourceType.ANY) {
 * @param requestContext Request context including request resourceType, security protocol, and throwlistener newname
 IllegalArgumentException(
    * @param op          "Must specify a non-filterThe resourceACL typeoperation for authorizeAny");to check
     * @param resourceType }

  The resource type to check
  if (resourceType == ResourceType.UNKNOWN) {
 * @return            throw new IllegalArgumentException(
 Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the
     * "Unknown resource type");
        }

        if (op == AclOperation.ANY) {
            throw new IllegalArgumentException(
   given ACL operation on at least one resource of the given type.
     *               "Must specify a non-filter operation type for authorizeAny");
 Return {@link AuthorizationResult#DENIED} otherwise.
     }
*/
    default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, ifAclOperation (op, == AclOperation.UNKNOWNResourceType resourceType) {
        SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);

   throw new IllegalArgumentException(
   ResourcePatternFilter resourceTypeFilter =           "Unknown operation type");
        }

        ResourcePatternFilter resourceTypeFilter = new new ResourcePatternFilter(
            resourceType, null, PatternType.ANY);
        AclBindingFilter aclFilter = new AclBindingFilter(
            resourceTypeFilter, AccessControlEntryFilter.ANY);

        EnumMap<PatternType, Set<String>Set<String>> denyLiteralsdenyPatterns =
 new HashSet<>();
        Set<String> denyPrefixes    = new EnumMap<PatternType, HashSet<>Set<String>>(PatternType.class);{{
        Set<String>   allowLiterals = put(PatternType.LITERAL, new HashSet<>());
        Set<String>   allowPrefixes = put(PatternType.PREFIXED, new HashSet<>());
        boolean}};
 hasWildCardAllow = false;

     EnumMap<PatternType, Set<String>> allowPatterns for=
 (AclBinding binding : acls(aclFilter)) {
           new ifEnumMap<PatternType, Set<String>>(!bindingPatternType.entry().host().equals(requestContext.clientAddress().getHostAddress())class){{
            put(PatternType.LITERAL, new HashSet<>());
          && !binding.entry().host().equals("*"))  put(PatternType.PREFIXED, new HashSet<>());
        }};

        continueboolean hasWildCardAllow = false;

        KafkaPrincipal principal = new KafkaPrincipal(
           if (!bindingrequestContext.entryprincipal().principal().equals(getPrincipalType(),
            requestContext.principal().toStringgetName());
        String hostAddr = requestContext.clientAddress().getHostAddress();

        for &&(AclBinding !binding.entry().principal().equals("User:*"))binding : acls(aclFilter)) {
                continue;

            if (!binding.entry().operationhost().equals(hostAddr) != op&& !binding.entry().host().equals("*"))
                continue;

      && binding.      if (!SecurityUtils.parseKafkaPrincipal(binding.entry().operationprincipal()) != AclOperation.ALL.equals(principal)
                continue;

            if (&& !binding.entry().permissionTypeprincipal() == AclPermissionType.DENY) {
.equals("User:*"))
                continue;

           switch if (binding.patternentry().patternTypeoperation()) != {op
                    case LITERAL:&& binding.entry().operation() != AclOperation.ALL)
                continue;

            if (binding.patternentry().namepermissionType().equals(ResourcePattern.WILDCARD_RESOURCE)) == AclPermissionType.DENY) {
                switch (binding.pattern().patternType()) {
                return  AuthorizationResult.DENIED;
  case LITERAL:
                        if denyLiterals.add(binding.pattern().name());.equals(ResourcePattern.WILDCARD_RESOURCE))
                        break;
                    case PREFIXED:return AuthorizationResult.DENIED;
                        denyPrefixesdenyPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
                        break;
                }
                continue;case PREFIXED:
            }

            if denyPatterns.get(PatternType.PREFIXED).add(binding.entrypattern().permissionTypename()) != AclPermissionType.ALLOW)
;
                        continuebreak;

            switch (binding.pattern().patternType()) {
      default:
          case LITERAL:
     }
               if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
 continue;
            }

             hasWildCardAllow = true;
        if (binding.entry().permissionType() != AclPermissionType.ALLOW)
                continue;

                    }switch (binding.pattern().patternType()) {
                case LITERAL:
                    allowLiterals.addif (binding.pattern().name());
.equals(ResourcePattern.WILDCARD_RESOURCE)) {
                     break;
   hasWildCardAllow = true;
           case PREFIXED:
            continue;
        allowPrefixes            }
                    allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
                    break;
            }
        }
case PREFIXED:
        if (hasWildCardAllow) {
            return AuthorizationResult.ALLOWEDallowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
        }

        for (String allowPrefix : allowPrefixes) { break;
            StringBuilder sb = new StringBuilder(); default:
            boolean}
 hasDominatedDeny = false;
     }

       for (char ch : allowPrefix.toCharArray() if (hasWildCardAllow) {
            return    sb.append(ch)AuthorizationResult.ALLOWED;
        }

        iffor (denyPrefixes.contains(sb.toStringMap.Entry<PatternType, Set<String>> entry : allowPatterns.entrySet())) {
            for (String allowStr : entry.getValue()) {
    hasDominatedDeny = true;
          if (entry.getKey() == PatternType.LITERAL
       break;
                }
 && denyPatterns.get(PatternType.LITERAL).contains(allowStr))
          }
          continue;
  if (!hasDominatedDeny)
             StringBuilder sb = return AuthorizationResult.ALLOWEDnew StringBuilder();
        }

        boolean hasDominatedDeny for (String allowLiteral : allowLiterals) {
= false;
                iffor (denyLiterals.contains(allowLiteral))char ch : allowStr.toCharArray()) {
                continue;
     sb.append(ch);
       StringBuilder sb = new StringBuilder();
            boolean hasDominatedDeny = false;
if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) {
                     for (char ch : allowLiteral.toCharArray()) {
hasDominatedDeny = true;
                 sb.append(ch);
       break;
         if (denyPrefixes.contains(sb.toString())) {
         }
           hasDominatedDeny = true;
   }
                 break;if (!hasDominatedDeny)
                }
            }return AuthorizationResult.ALLOWED;
            if (!hasDominatedDeny)}
        }

        return AuthorizationResult.ALLOWEDDENIED;
        }

        return AuthorizationResult.DENIED;
    }}

Proposed Changes

AclAuthorizer and SimpleAclAuthorizer

...

Besides the public interface changes above, we will deprecate `IDEMPOTENT_WRITE` in release version 23.8 0 because it's kind of trivial by practice.

...

`IDEMPOTENT_WRITE` will be deprecated in 23.8 0 but won't be removed in a short term, in order to give the community enough time to upgrade their `authorizer` implementation.


`request-required-acks` option in kafka-console-producer.sh will change default to -1

In kafka-console-producer.sh, we have a option: `request-required-acks` that can configure the acks  setting in Producer. It was originally default to 1. But after this KIP, we set the default enable.idempotence  to true, so we have to also set the default acks  config here to -1 for this change.

Compatibility, Deprecation, and Migration Plan

...