Table of Contents |
---|
Status
Current state: Under Discussion
Discussion thread: here
APPROVED
Discussion thread: https://lists.apache.org/thread.html/r56f658f1d1de2b09465d70be69a8bebfd4518663be5a88fba2e9e7c0%40%3Cdev.kafka.apache.org%3E
JIRA:
Jira | ||||||
---|---|---|---|---|---|---|
|
Jira | ||||||
---|---|---|---|---|---|---|
|
Please keep the discussion on the mailing list rather than commenting on the wiki (wiki discussions get unwieldy fast).
...
Code Block | ||||||||
---|---|---|---|---|---|---|---|---|
| ||||||||
/** * Check if the caller is authorized to perform the given ACL operation on at least one * resource of the given type. * * @param1. requestContextFilter Requestout contextall includingthe requestresource resourceType,pattern securitycorresponding protocol,to andthe listenerrequestContext, nameAclOperation, * @param op and ResourceType * 2. If wildcard deny Theexists, ACLreturn operationdeny to checkdirectly * @param3. resourceTypeFor any literal Theallowed resource, type if there's no dominant literal *denied @returnresource, and * no dominant prefixed denied resource, Returnreturn {@linkallow AuthorizationResult#ALLOWED} if the caller is* authorized4. toFor performany the prefixed allowed resource, if there's *no dominant denied resource, return allow * 5. For any other cases, return deny given ACL operation on at least one resource of the given type. * * It is important to override this interface default in implementations because * 1. The interface default iterates all AclBindings multiple times, without any indexing, * which is Returna {@linkCPU AuthorizationResult#DENIED}intense otherwisework. */ 2. The interface default rebuild several AuthorizationResultsets authorizeByResourceType(AuthorizableRequestContextof requestContextstrings, AclOperation op, ResourceType resourceType) { which is a memory intense work. * if* (resourceType@param == ResourceType.ANY) { requestContext Request context including request resourceType, security protocol, and listener name throw new* IllegalArgumentException( @param op The ACL "Mustoperation specifyto acheck non-filter resource type for authorizeByResourceType"); * @param resourceType The resource type }to check * @return if (resourceType == ResourceType.UNKNOWN) { Return {@link AuthorizationResult#ALLOWED} if throwthe newcaller IllegalArgumentException( is authorized to perform the * "Unknown resource type"); } given ACL operation ifon (opat == AclOperation.ANY) { least one resource of the given type. * throw new IllegalArgumentException( "Must specify a non-filter operation type for authorizeByResourceType");Return {@link AuthorizationResult#DENIED} otherwise. } */ default AuthorizationResult authorizeByResourceType(AuthorizableRequestContext requestContext, ifAclOperation (op, == AclOperation.UNKNOWNResourceType resourceType) { throw new IllegalArgumentException( "Unknown operation type"); } SecurityUtils.authorizeByResourceTypeCheckArgs(op, resourceType); ResourcePatternFilter resourceTypeFilter = new ResourcePatternFilter( resourceType, null, PatternType.ANY); AclBindingFilter aclFilter = new AclBindingFilter( resourceTypeFilter, AccessControlEntryFilter.ANY); EnumMap<PatternType, Set<String>Set<String>> denyLiteralsdenyPatterns = new HashSet<>(); Set<String> denyPrefixes = new EnumMap<PatternType, HashSet<>Set<String>>(PatternType.class);{{ Set<String> allowLiterals = put(PatternType.LITERAL, new HashSet<>()); Set<String> allowPrefixes = put(PatternType.PREFIXED, new HashSet<>()); boolean hasWildCardAllow = false}}; forEnumMap<PatternType, (AclBinding binding : acls(aclFilter)) { Set<String>> allowPatterns = new ifEnumMap<PatternType, Set<String>>(!bindingPatternType.entry().host().equals(requestContext.clientAddress().getHostAddress())class){{ put(PatternType.LITERAL, new HashSet<>()); && !binding.entry().host().equals("*")) put(PatternType.PREFIXED, new HashSet<>()); }}; boolean hasWildCardAllow = continuefalse; KafkaPrincipal principal = new KafkaPrincipal( if (!binding.entry requestContext.principal().principalgetPrincipalType().equals(requestContext.principal().toString()), && !binding.entry()requestContext.principal().equalsgetName("User:*")); String hostAddr continue= requestContext.clientAddress().getHostAddress(); for (AclBinding binding : if (binding.entry().operation() != opacls(aclFilter)) { if (!binding.entry().host().equals(hostAddr) && !binding.entry().operationhost() != AclOperation.ALL.equals("*")) continue; if (!SecurityUtils.parseKafkaPrincipal(binding.entry().permissionType() == AclPermissionType.DENY) { principal()).equals(principal) switch (binding.pattern&& !binding.entry().principal().patternTypeequals("User:*")) { continue; case LITERAL: if (binding.entry().operation() != op if ( && binding.patternentry().nameoperation().equals(ResourcePattern.WILDCARD_RESOURCE) != AclOperation.ALL) continue; return AuthorizationResult.DENIED; if (binding.entry().permissionType() == AclPermissionType.DENY) { denyLiterals.add(binding.patternswitch (binding.pattern().namepatternType()); break; { case PREFIXEDLITERAL: if denyPrefixes.add(binding.pattern().name());.equals(ResourcePattern.WILDCARD_RESOURCE)) break; return AuthorizationResult.DENIED; } continuedenyPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); } if (binding.entry().permissionType() != AclPermissionType.ALLOW) break; continue; case PREFIXED: switch denyPatterns.get(PatternType.PREFIXED).add(binding.pattern().patternTypename()) {; case LITERAL: break; if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)) { default: } hasWildCardAllow = true; continue; continue; } if (binding.entry().permissionType() != AclPermissionType.ALLOW) } continue; switch allowLiterals.add(binding.pattern().namepatternType()); { case LITERAL: break; case PREFIXED: allowPrefixes.add if (binding.pattern().name().equals(ResourcePattern.WILDCARD_RESOURCE)); { break; hasWildCardAllow = true; } } if (hasWildCardAllow) {continue; return AuthorizationResult.ALLOWED; } for (String allowPrefix : allowPrefixes) { allowPatterns.get(PatternType.LITERAL).add(binding.pattern().name()); StringBuilder sb = new StringBuilder(); boolean hasDominatedDeny = falsebreak; for (char ch : allowPrefix.toCharArray()) { case PREFIXED: sb.append(ch); if (denyPrefixes.contains(sb.toString())) {allowPatterns.get(PatternType.PREFIXED).add(binding.pattern().name()); hasDominatedDeny = truebreak; default: break; } } if (!hasDominatedDenyhasWildCardAllow) { return AuthorizationResult.ALLOWED; } for (String allowLiteralMap.Entry<PatternType, Set<String>> entry : allowLiteralsallowPatterns.entrySet()) { iffor (denyLiterals.contains(allowLiteral))String allowStr : entry.getValue()) { continue; if (entry.getKey() == PatternType.LITERAL StringBuilder sb = new StringBuilder(); boolean hasDominatedDeny = false; && denyPatterns.get(PatternType.LITERAL).contains(allowStr)) for (char ch : allowLiteral.toCharArray()) { continue; sb.append(chStringBuilder sb = new StringBuilder(); boolean if (denyPrefixes.contains(sb.toString())) {hasDominatedDeny = false; for (char ch hasDominatedDeny = true;: allowStr.toCharArray()) { breaksb.append(ch); } } if (denyPatterns.get(PatternType.PREFIXED).contains(sb.toString())) { if (!hasDominatedDeny) hasDominatedDeny = true; return AuthorizationResult.ALLOWEDbreak; } return AuthorizationResult.DENIED;} } |
Proposed Changes
AclAuthorizer and SimpleAclAuthorizer
AclAuthorizer and AuthorizerWrapper will override the new interface `org.apache.kafka.server.authorizer.Authorizer#authorizeAny` to
- improve the performance
- implement the `allow.everyone.if.no.acl.found` logic
`IDEMPOTENT_WRITE` Deprecation
Besides the public interface changes above, we will deprecate `IDEMPOTENT_WRITE` in release version 2.8 because it's kind of trivial by practice.
We are relaxing the ACL restriction from `IDEMPOTENT_WRITE` to `WRITE` earlier (release version 2.8) and changing the producer defaults later (release version 3.0) in order to give the community users enough time to upgrade their broker first. So their later client-side upgrading, which enables idempotence by default, won't get blocked by the `IDEMPOTENT_WRITE` ACL required by the old version brokers.
}
if (!hasDominatedDeny)
return AuthorizationResult.ALLOWED;
}
}
return AuthorizationResult.DENIED;
} |
Proposed Changes
AclAuthorizer and SimpleAclAuthorizer
AclAuthorizer and AuthorizerWrapper will override the new interface `org.apache.kafka.server.authorizer.Authorizer#authorizeAny` to
- improve the performance
- implement the `allow.everyone.if.no.acl.found` logic
`IDEMPOTENT_WRITE` Deprecation
Besides the public interface changes above, we will deprecate `IDEMPOTENT_WRITE` in release version 3.0 because it's kind of trivial by practice.
We are relaxing the ACL restriction from `IDEMPOTENT_WRITE` to `WRITE` earlier (release version 2.8) and changing the producer defaults later (release version 3.0) in order to give the community users enough time to upgrade their broker first. So their later client-side upgrading, which enables idempotence by default, won't get blocked by the `IDEMPOTENT_WRITE` ACL required by the old version brokers.
`IDEMPOTENT_WRITE` will be deprecated in 3.0 but won't be removed in a short term, in order to give the community enough time to upgrade their `authorizer` implementation.
`request-required-acks` option in kafka-console-producer.sh will change default to -1
In kafka-console-producer.sh, we have a option: `request-required-acks` that can configure the acks
setting in Producer. It was originally default to 1. But after this KIP, we set the default enable.idempotence
to true, so we have to also set the default acks
config here to -1 for this change`IDEMPOTENT_WRITE` will be deprecated in 2.8 but won't be removed in a short term, in order to give the community enough time to upgrade their `authorizer` implementation.
Compatibility, Deprecation, and Migration Plan
...