Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

Discussion thread: here 

APPROVED

Discussion thread: https://lists.apache.org/thread.html/r56f658f1d1de2b09465d70be69a8bebfd4518663be5a88fba2e9e7c0%40%3Cdev.kafka.apache.org%3E

Vote thread: https://lists.apache.org/thread.html/r1e912a4e6b6def9fbaf8e0aeb7bbcfd612f3100df31782a307268a5c%40%3Cdev.kafka.apache.org%3E

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10619
 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-13598
JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
firstline144
titleclients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
linenumberstrue
    /**
     * Check if the caller is authorized to perform the given ACL operation on at least one
     * resource of the given type.
     *
     * @param1. requestContextFilter Requestout contextall includingthe requestresource resourceType,pattern securitycorresponding protocol,to andthe listenerrequestContext, nameAclOperation,
     * @param op  and ResourceType
     * 2. If wildcard deny Theexists, ACLreturn operationdeny to checkdirectly
     * @param3. resourceTypeFor any literal Theallowed resource, type
if there's no dominant literal *denied @returnresource, and
     *    no dominant prefixed denied resource, Returnreturn {@linkallow
 AuthorizationResult#ALLOWED} if the caller is* authorized4. toFor performany the
prefixed allowed resource, if there's *no dominant denied resource, return allow
     * 5. For any other cases, return deny
      given*
 ACL operation on at least* oneIt resourceis ofimportant theto givenoverride type.this 
interface default in implementations because
 *    * 1. The interface default iterates all AclBindings multiple times, without any indexing,
     *    Return {@link AuthorizationResult#DENIED} otherwisewhich is a CPU intense work.
     */
 2. The interface default rebuild several AuthorizationResultsets authorizeAny(AuthorizableRequestContextof requestContextstrings, which AclOperationis op,a ResourceTypememory resourceType)intense {work.
     *
   ResourcePatternFilter resourceFilter = new ResourcePatternFilter(resourceType, null, PatternType.ANY);
   * @param requestContext Request context including request resourceType, security protocol, and listener name
     * AclBindingFilter@param aclFilterop = new AclBindingFilter(
          The ACL resourceFilter,operation newto AccessControlEntryFilter(check
     * @param resourceType   The resource type    requestContext.principal().toString(),to check
     * @return          requestContext.clientAddress().getHostAddress(),
     Return {@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the
 op,
    *            AclPermissionType.ANY));

        Set<String> denyPrefixes = given new HashSet<>();
        Set<String> allowPrefixes = new HashSet<>();

  ACL operation on at least one resource of the given type.
     *      for (AclBinding binding : acls(aclFilter)) {
            if (binding.entry().permissionType() != AclPermissionType.ALLOW) {Return {@link AuthorizationResult#DENIED} otherwise.
     */
    default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op,  if (binding.entry().permissionType() == AclPermissionType.DENY) ResourceType resourceType) {
        SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);

        ResourcePatternFilter resourceTypeFilter = switchnew (binding.pattern().patternType()) {ResourcePatternFilter(
            resourceType, null, PatternType.ANY);
        AclBindingFilter aclFilter = new AclBindingFilter(
            case LITERAL:resourceTypeFilter, AccessControlEntryFilter.ANY);

        EnumMap<PatternType, Set<String>> denyPatterns =
                new ifEnumMap<PatternType, Set<String>>(bindingPatternType.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))class){{
            put(PatternType.LITERAL, new HashSet<>());
            put(PatternType.PREFIXED, new HashSet<>());
    return AuthorizationResult.DENIED;
   }};
        EnumMap<PatternType, Set<String>> allowPatterns =
              break;
  new EnumMap<PatternType, Set<String>>(PatternType.class){{
            put(PatternType.LITERAL, new HashSet<>());
      case PREFIXED:
     put(PatternType.PREFIXED, new HashSet<>());
        }};

        boolean hasWildCardAllow = false;

  if (binding.pattern().name().isEmpty())
     KafkaPrincipal principal = new KafkaPrincipal(
            requestContext.principal().getPrincipalType(),
           return AuthorizationResult.DENIEDrequestContext.principal().getName());
        String hostAddr                   denyPrefixes.add(binding.pattern= requestContext.clientAddress().namegetHostAddress());

        for (AclBinding binding : acls(aclFilter)) {
            if   break;(!binding.entry().host().equals(hostAddr) && !binding.entry().host().equals("*"))
                continue;

    }
         if (!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
      }
              &&  continue;
!binding.entry().principal().equals("User:*"))
                }continue;

            switchif (binding.patternentry().patternTypeoperation()) != {op
                case LITERAL:
   && binding.entry().operation() != AclOperation.ALL)
              List<Action> action = Collections.singletonList(new Action( continue;

                        op, binding.pattern(), 1, false, false));if (binding.entry().permissionType() == AclPermissionType.DENY) {
                    ifswitch (authorize(requestContext, actionbinding.pattern().getpatternType(0) == AuthorizationResult.ALLOWED) {
                    case LITERAL:
    return AuthorizationResult.ALLOWED;
                    }if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
                    break;
        return AuthorizationResult.DENIED;
        case PREFIXED:
                    allowPrefixesdenyPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
                    break;
    break;
        }
        }

    case PREFIXED:
   for (String allowed : allowPrefixes) {
            StringBuilder sb = new StringBuilder();
denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
                    boolean  hasDominatedDeny = falsebreak;
            for (int pos = 0; pos < allowed.length(); pos++) { default:
                sb.append(allowed.charAt(pos));
}
                continue;
 if (denyPrefixes.contains(sb.toString())) {
           }

             hasDominatedDeny = true;if (binding.entry().permissionType() != AclPermissionType.ALLOW)
                continue;

    break;
        switch (binding.pattern().patternType()) {
            }
    case LITERAL:
        }
            if (!hasDominatedDeny)(binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
                return AuthorizationResult.ALLOWED        hasWildCardAllow = true;
        }
               return AuthorizationResult.DENIEDcontinue;
              }

Proposed Changes

AclAuthorizer and SimpleAclAuthorizer

AclAuthorizer and SimpleAclAuthorizer will override the new interface `org.apache.kafka.server.authorizer.Authorizer#authorizeAny` to 

  1. improve the performance
  2. implement the `allow.everyone.if.no.acl.found` logic

`IDEMPOTENT_WRITE` Deprecation

Besides the public interface changes above, we will deprecate `IDEMPOTENT_WRITE` in release version 2.8 because it's kind of trivial by practice.

We are relaxing the ACL restriction from `IDEMPOTENT_WRITE` to `WRITE` earlier (release version 2.8) and changing the producer defaults later (release version 3.0) in order to give the community users enough time to upgrade their broker first. So their later client-side upgrading, which enables idempotence by default, won't get blocked by the `IDEMPOTENT_WRITE` ACL required by the old version brokers.

      }
                    allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
                    break;
                case PREFIXED:
                    allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
                    break;
                default:
            }
        }

        if (hasWildCardAllow) {
            return AuthorizationResult.ALLOWED;
        }

        for (Map.Entry<PatternType, Set<String>> entry : allowPatterns.entrySet()) {
            for (String allowStr : entry.getValue()) {
                if (entry.getKey() == PatternType.LITERAL
                        && denyPatterns.get(PatternType.LITERAL).contains(allowStr))
                    continue;
                StringBuilder sb = new StringBuilder();
                boolean hasDominatedDeny = false;
                for (char ch : allowStr.toCharArray()) {
                    sb.append(ch);
                    if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) {
                        hasDominatedDeny = true;
                        break;
                    }
                }
                if (!hasDominatedDeny)
                    return AuthorizationResult.ALLOWED;
            }
        }

        return AuthorizationResult.DENIED;
    }

Proposed Changes

AclAuthorizer and SimpleAclAuthorizer

AclAuthorizer and AuthorizerWrapper will override the new interface `org.apache.kafka.server.authorizer.Authorizer#authorizeAny` to 

  1. improve the performance
  2. implement the `allow.everyone.if.no.acl.found` logic

`IDEMPOTENT_WRITE` Deprecation

Besides the public interface changes above, we will deprecate `IDEMPOTENT_WRITE` in release version 3.0 because it's kind of trivial by practice.

We are relaxing the ACL restriction from `IDEMPOTENT_WRITE` to `WRITE` earlier (release version 2.8) and changing the producer defaults later (release version 3.0) in order to give the community users enough time to upgrade their broker first. So their later client-side upgrading, which enables idempotence by default, won't get blocked by the `IDEMPOTENT_WRITE` ACL required by the old version brokers.

`IDEMPOTENT_WRITE` will be deprecated in 3.0 but won't be removed in a short term, in order to give the community enough time to upgrade their `authorizer` implementation.


`request-required-acks` option in kafka-console-producer.sh will change default to -1

In kafka-console-producer.sh, we have a option: `request-required-acks` that can configure the acks  setting in Producer. It was originally default to 1. But after this KIP, we set the default enable.idempotence  to true, so we have to also set the default acks  config here to -1 for this change`IDEMPOTENT_WRITE` will be deprecated in 2.8 but won't be removed in a short term, in order to give the community enough time to upgrade their `authorizer` implementation.

Compatibility, Deprecation, and Migration Plan

...