Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

Table of Contents

Status

Current stateUnder Discussion

Discussion thread: here 

APPROVED

Discussion thread: https://lists.apache.org/thread.html/r56f658f1d1de2b09465d70be69a8bebfd4518663be5a88fba2e9e7c0%40%3Cdev.kafka.apache.org%3E

Vote thread: https://lists.apache.org/thread.html/r1e912a4e6b6def9fbaf8e0aeb7bbcfd612f3100df31782a307268a5c%40%3Cdev.kafka.apache.org%3E

JIRA:

Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-10619
 
Jira
serverASF JIRA
serverId5aa69414-a9e9-3523-82ec-879b028fb15b
keyKAFKA-13598
JIRA: here [Change the link from KAFKA-1 to your own ticket]

Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).

...

Code Block
languagejava
firstline144
titleclients/src/main/java/org/apache/kafka/server/authorizer/Authorizer.java
linenumberstrue
    /**
     * Check if the caller is authorized to perform the given ACL operation on at least one
     * resource satisfying the filter of the given type.
     *
     * 1. Filter out all the resource pattern corresponding to the requestContext, AclOperation,
     *    and ResourceType
     * 2. If wildcard deny exists, return deny directly
     * 3. For any literal allowed resource, if there's no dominant literal denied resource, and
     *    no dominant prefixed denied resource, return allow
     * 4. For any prefixed allowed resource, if there's no dominant denied resource, return allow
     * 5. For any other cases, return deny
     *
     * It is important to override this interface default in implementations because
     * 1. The interface default iterates all AclBindings multiple times, without any indexing,
     *    which is a CPU intense work.
     * 2. The interface default rebuild several sets of strings, which is a memory intense work.
     *
     * @param requestContext Request context including request typeresourceType, security protocol, and listener name
     * @param op             The ACL operation to check
     * @param fresourceType   The resource type         The resource filterto check
     * @return               Return true{@link AuthorizationResult#ALLOWED} if the caller is authorized to perform the giventhe
 ACL operation
   *  *                     given ACL operation on at least one resource satisfyingof the filter. Return falsegiven type.
     *                       Return {@link AuthorizationResult#DENIED} otherwise.
     */
    default AuthorizationResult authorizeAnyauthorizeByResourceType(AuthorizableRequestContext requestContext, AclOperation op, ResourceType resourceType) {
        SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType);

        ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter(
            resourceType, null,   AclOperation op,PatternType.ANY);
        AclBindingFilter aclFilter = new AclBindingFilter(
            resourceTypeFilter, AccessControlEntryFilter.ANY);

        EnumMap<PatternType, Set<String>> denyPatterns =
                new EnumMap<PatternType, Set<String>>(PatternType.class){{
        ResourcePatternFilter f) {
    put(PatternType.LITERAL, new HashSet<>());
          ResourcePatternFilter resourceFilter = put(PatternType.PREFIXED, new ResourcePatternFilter(type, null, PatternType.ANY);
HashSet<>());
        }};
        EnumMap<PatternType, Set<String>> allowPatterns =
                new EnumMap<PatternType, Set<String>>(PatternType.class){{
          AclBindingFilter aclFilter = put(PatternType.LITERAL, new AclBindingFilterHashSet<>());
            resourceFilterput(PatternType.PREFIXED, new AccessControlEntryFilter(
HashSet<>());
        }};

        boolean hasWildCardAllow = false;

        KafkaPrincipal principal = new KafkaPrincipal(
            requestContext.principal().toStringgetPrincipalType(),
            requestContext.principal().getName());
        String hostAddr = requestContext.clientAddress().getHostAddress();

        for (AclBinding binding : acls(),aclFilter)) {
            if (!binding.entry().host().equals(hostAddr) && !binding.entry().host().equals("*"))
                continue;

            if (!SecurityUtils.parseKafkaPrincipal(binding.entry().principal()).equals(principal)
                    && !binding.entry().principal().equals("User:*"))
                op,continue;

            if (binding.entry().operation() != op
                    && binding.entry().operation() != AclOperation.ALL)
                AclPermissionType.ANY));
continue;

            if (binding.entry().permissionType() == AclPermissionType.DENY) {
                switch (binding.pattern().patternType()) {
                    case LITERAL:
                        if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE))
                            return AuthorizationResult.DENIED;
         for (AclBinding binding : acls(aclFilter)) {               denyPatterns.get(PatternType.LITERAL).add(binding.pattern().name());
                        break;
                    case PREFIXED:
                        denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
                        break;
                    default:
                }
                continue;
            }

            if (binding.entry().permissionType() != AclPermissionType.ALLOW)
                continue;

             List<Action> action = Collections.singletonList(new Action(
switch (binding.pattern().patternType()) {
                case LITERAL:
                    if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) {
                        hasWildCardAllow = true;
                        continue;
                    }
                  op,   allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name(), 1, false, false));
    ));
                    break;
                case PREFIXED:
                    allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name());
                    break;
                default:
            }
        }

        if (authorize(requestContext, action).get(0) == AuthorizationResult.ALLOWED) {hasWildCardAllow) {
            return AuthorizationResult.ALLOWED;
        }

        for (Map.Entry<PatternType, Set<String>> entry : allowPatterns.entrySet()) {
            for (String allowStr : entry.getValue()) {
                if (entry.getKey() == PatternType.LITERAL
                        && denyPatterns.get(PatternType.LITERAL).contains(allowStr))
                    continue;
                StringBuilder sb = new StringBuilder();
                boolean hasDominatedDeny = false;
                for (char ch : allowStr.toCharArray()) {
                    sb.append(ch);
                    if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) {
                        hasDominatedDeny = true;
                        break;
                    }
                }
                if (!hasDominatedDeny)
                    return AuthorizationResult.ALLOWED;
            }
        }

        return AuthorizationResult.DENIED;
    }

...

AclAuthorizer and SimpleAclAuthorizer

AclAuthorizer and SimpleAclAuthorizer and AuthorizerWrapper will override the new interface `org.apache.kafka.server.authorizer.Authorizer#authorizeAny` to 

...

Besides the public interface changes above, we will deprecate `IDEMPOTENT_WRITE` in release version 23.8 0 because it's kind of trivial by practice.

...

`IDEMPOTENT_WRITE` will be deprecated in 23.8 0 but won't be removed in a short term, in order to give the community enough time to upgrade their `authorizer` implementation.


`request-required-acks` option in kafka-console-producer.sh will change default to -1

In kafka-console-producer.sh, we have a option: `request-required-acks` that can configure the acks  setting in Producer. It was originally default to 1. But after this KIP, we set the default enable.idempotence  to true, so we have to also set the default acks  config here to -1 for this change.

Compatibility, Deprecation, and Migration Plan

...