...
Code Block |
---|
language | java |
---|
title | Java Authorizer Interface |
---|
|
package org.apache.kafka.server.authorizer;
import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import org.apache.kafka.common.Configurable;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.acl.AclBindingFilter;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.KafkaRequestContext;
import org.apache.kafka.server.BrokerInfoServerInfo;
import org.apache.kafka.server.BrokerInfoServerInfo.Endpoint;
/**
*
* Pluggable authorizer interface for Kafka brokers.
*
* Startup sequence in brokers:
* <ol>
* <li>Broker creates authorizer instance if configured in `authorizer.class.name`.</li>
* <li>Broker configures and starts authorizer instance. Authorizer implementation starts loading its metadata.</li>
* <li>Broker starts SocketServer to accept connections and process requests.</li>
* <li>For each listener, SocketServer waits for authorization metadata to be available in the
* authorizer before accepting connections. The future returned by {@link #start(BrokerInfoServerInfo)}
* must return only when authorizer is ready to authorize requests on the listener.</li>
* <li>Broker accepts connections. For each connection, broker performs authentication and then accepts Kafka requests.
* For each request, broker invokes {@link #authorize(KafkaRequestContext, List)} to authorize
* actions performed by the request.</li>
* </ol>
*
* Authorizer implementation class may optionally implement @{@link org.apache.kafka.common.Reconfigurable}
* to enable dynamic reconfiguration without restarting the broker.
* <p>
* <b>Thread safety:</b> All authorizer operations including authorization and ACL updates must be thread-safe.
* </p>
*/
@InterfaceStability.Evolving
public interface Authorizer extends Configurable, Closeable {
/**
* Starts loading authorization metadata and returns futures that can be used to wait until
* metadata for authorizing requests on each listener is available. Each listener will be
* started only after its metadata is available and authorizer is ready to start authorizing
* requests on that listener.
*
* @param brokerInfoserverInfo Metadata for the broker including broker id and listener endpoints
* @return CompletableFutures for each endpoint that completes when authorizer is ready to
* start authorizing requests on that listener.
*/
Map<Endpoint, CompletableFuture<Void>> start(BrokerInfoServerInfo brokerInfoserverInfo);
/**
* Authorizes the specified action. Additional metadata for the action is specified
* in `requestContext`.
*
* @param requestContext Request context including request type, security protocol and listener name
* @param actions Actions being authorized including resource and operation for each action
* @return List of authorization results for each action in the same order as the provided actions
*/
List<AuthorizationResult> authorize(KafkaRequestContext requestContext, List<Action> actions);
/**
* Creates new ACL bindings.
*
* @param requestContext Request context if the ACL is being created by a broker to handle
* a client request to create ACLs. This may be null if ACLs are created directly in ZooKeeper
* using AclCommand.
* @param aclBindings ACL bindings to create
*
* @return Create result for each ACL binding in the same order as in the input list
*/
List<AclCreateResult> createAcls(KafkaRequestContext requestContext, List<AclBinding> aclBindings);
/**
* Deletes all ACL bindings that match the provided filters.
*
* @param requestContext Request context if the ACL is being deleted by a broker to handle
* a client request to delete ACLs. This may be null if ACLs are deleted directly in ZooKeeper
* using AclCommand.
* @param aclBindingFilters Filters to match ACL bindings that are to be deleted
*
* @return Delete result for each filter in the same order as in the input list.
* Each result indicates which ACL bindings were actually deleted as well as any
* bindings that matched but could not be deleted.
*/
List<AclDeleteResult> deleteAcls(KafkaRequestContext requestContext, List<AclBindingFilter> aclBindingFilters);
/**
* Returns ACL bindings which match the provided filter.
*
* @return Iterator for ACL bindings, which may be populated lazily.
*/
Iterable<AclBinding> acls(AclBindingFilter filter);
} |
...
Code Block |
---|
language | java |
---|
title | Request Context |
---|
|
package org.apache.kafka.common;
import java.net.InetAddress;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.KafkaPrincipal;
import org.apache.kafka.common.security.auth.SecurityProtocol;
/**
* Request context interface that provides data from request header as well as connection
* and authentication information to plugins.
*/
@InterfaceStability.Evolving
public interface KafkaRequestContext {
/**
* Returns name of listener on which request was received.
*/
String listener();
/**
* Returns the security protocol for the listener on which request was received.
*/
SecurityProtocol securityProtocol();
/**
* Returns authenticated principal for the connection on which request was received.
*/
KafkaPrincipal principal();
/**
* Returns client IP address from which request was sent.
*/
InetAddress clientAddress();
/**
* 16-bit API key of the request from the request header. See
* https://kafka.apache.org/protocol#protocol_api_keys for request types.
*/
int requestType();
/**
* Returns a name for the request type. For fetch requests, the metrics names
* FetchFollower and FetchConsumer will be used to distinguish between
* replica fetch requests and client fetch requests.
*/
String requestName();
/**
* Returns the request version from the request header.
*/
int requestVersion();
/**
* Returns the client id from the request header.
*/
String clientId();
/**
* Returns the correlation id from the request header.
*/
int correlationId();
}
|
BrokerInfo
ServerInfo
provides runtime broker configuration to authorization plugins including broker id, cluster id and endpoint information.
Code Block |
---|
language | java |
---|
title | Broker Runtime Config |
---|
|
package org.apache.kafka.server;
import java.util.Collection;
import org.apache.kafka.common.ClusterResource;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.security.auth.SecurityProtocol;
/**
* Runtime broker configuration metadata for broker-side plugins.
*/
@InterfaceStability.Evolving
public interface BrokerInfoServerInfo {
interface Endpoint {
String listener();
SecurityProtocol securityProtocol();
String host();
int port();
}
ClusterResource clusterResource();
int brokerId();
Collection<Endpoint> endpoints();
Endpoint interBrokerEndpoint();
}
|
...
Code Block |
---|
language | java |
---|
title | Authorizable Action |
---|
|
package org.apache.kafka.server.authorizer;
import java.util.Objects;
import org.apache.kafka.common.acl.AclOperation;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.resource.PatternType;
import org.apache.kafka.common.resource.ResourcePattern;
import org.apache.kafka.common.resource.ResourceType;
@InterfaceStability.Evolving
public class Action {
private final ResourcePattern resourcePattern;
private final AclOperation operation;
private final int resourceReferenceCount;
private final AuditFlagboolean auditFlaglogIfAllowed;
private final intboolean resourceReferenceCountlogIfDenied;
public Action(AclOperation operation,
ResourceType resourceType,
StringResourcePattern resourceNameresourcePattern,
AuditFlag auditFlag,
int resourceReferenceCount) {,
this.operation = operation;
this.resourcePattern = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL);
boolean logIfAllowed,
this.auditFlag = auditFlag boolean logIfDenied) {
this.operation = operation;
this.resourcePattern = resourcePattern;
this.logIfAllowed = logIfAllowed;
this.logIfDenied = logIfDenied;
this.resourceReferenceCount = resourceReferenceCount;
}
/**
* Resource on which action is being performed.
*/
public ResourcePattern resourcePattern() {
return resourcePattern;
}
/**
* Operation being performed.
*/
public AclOperation operation() {
return operation;
}
/**
* AuthorizationIndicates usageif flagaudit to enable authorization logs to distinguish between attemptslogs tracking ALLOWED access should include this action if result is
* ALLOWED. toThe accessflag unauthorizedis resourcestrue andif otheraccess filteringto operationsa performedresource byis thegranted broker.
while processing the request as */a
public* AuditFlagresult auditFlag() {
return auditFlag;
}
of this authorization. The flag is false only for requests used to describe access where
/**
* Numberno ofoperation timeson the resource beingis authorizedactually isperformed referencedbased withinon the authorization requestresult.
For example, a single */
public *boolean request may reference `n` topic partitions of the same topic. Brokers will authorize the topic oncelogIfAllowed() {
return logIfAllowed;
}
/**
with `resourceReferenceCount=n`. Authorizers may include* theIndicates countif in audit logs.
tracking DENIED access should */
include this action if publicresult intis
resourceReferenceCount() {
* DENIED. The flag is returntrue resourceReferenceCount;
if access to a }
resource was explicitly requested and @Overriderequest
public boolean* equals(Object o) {
if (this == o) {is denied as a result of this authorization request. The flag is false if request was
* filtering out authorized resources (e.g. to returnsubscribe true;
to regex pattern). The flag is also
}
* false if this is if (!(o instanceof Action)) {
an optional authorization where an alternative resource authorization is
* applied if returnthis false;
}
fails (e.g. Cluster:Create which is subsequently overridden by Topic:Create).
*/
Action thatpublic =boolean logIfDenied(Action) o;{
return Objects.equals(this.resourcePattern, that.resourcePattern) &&
logIfDenied;
}
/**
Objects.equals(this.operation, that.operation) &&
Objects.equals(this.auditFlag, that.auditFlag) &&* Number of times the resource being authorized is referenced within the request. For example, a single
* request may reference `n` topic Objects.equals(this.resourceReferenceCount, that.resourceReferenceCount);
}
@Overridepartitions of the same topic. Brokers will authorize the topic once
* with `resourceReferenceCount=n`. Authorizers may include the count in audit logs.
*/
public int hashCoderesourceReferenceCount() {
return Objects.hash(resourcePattern, operation, auditFlag)resourceReferenceCount;
}
@Override
public Stringboolean toStringequals(Object o) {
return "Action(" +if (this == o) {
", resourcePattern='" + resourcePattern + '\'' +
return true;
}
", operation='" + operation + '\'' +
if (!(o instanceof Action)) {
return false;
", auditFlag='" + auditFlag + '\'' + }
Action that = ", resourceReferenceCount='" + resourceReferenceCount + '\'' +
')';
}
} |
Audit flag provides additional context for audit logging:
Code Block |
---|
language | java |
---|
title | Audit Flag |
---|
|
package org.apache.kafka.server.authorizer;
public enum AuditFlag {
/**
* Access was requested to resource. If authorization result is ALLOWED, access is granted to
* the resource to perform the request. If DENY, request is failed with authorization failure.
* <p>
* Audit logs tracking ALLOWED access should include this if result is ALLOWED.
* Audit logs tracking DENIED access should include this if result is DENIED.
* </p>
*/
MANDATORY_AUTHORIZE,
/**
* Access was requested to resource. If authorization result is ALLOWED, access is granted to
* the resource to perform the request. If DENY, alternative authorization rules are applied
* to determine if access is allowed.
* <p>
* For example, topic creation is allowed if user has Cluster:Create
* permission to create any topic or the fine-grained Topic:Create permission to create topics
* of the requested name. Cluster:Create is an optional ACL in this case.
* </p><p>
* Audit logs tracking ALLOWED access should include this if result is ALLOWED.
* Audit logs tracking DENIED access can omit this if result is DENIED, since an alternative
* authorization is used to determine access.
* </p>
*/
OPTIONAL_AUTHORIZE,
/**
* Access was requested to authorized resources (e.g. to subscribe to regex pattern).
* Request is performed on resources whose authorization result is ALLOWED and the rest of
* the resources are filtered out.
* <p>
* Audit logs tracking ALLOWED access should include this if result is ALLOWED.
* Audit logs tracking DENIED access can omit this if result is DENIED since access was not
* actually requested for the specified resource and it is filtered out.
* </p>
*/
FILTER,
/**
* Request to list authorized operations. No access is actually performed by this request
* based on the authorization result.
* <p>
* Audit logs tracking ALLOWED/DENIED access can omit these since no access is performed
* as a result of this.
* </p>
*/
LIST_AUTHORIZED
}
(Action) o;
return Objects.equals(this.resourcePattern, that.resourcePattern) &&
Objects.equals(this.operation, that.operation) &&
this.resourceReferenceCount == that.resourceReferenceCount &&
this.logIfAllowed == that.logIfAllowed &&
this.logIfDenied == that.logIfDenied;
}
@Override
public int hashCode() {
return Objects.hash(resourcePattern, operation, resourceReferenceCount, logIfAllowed, logIfDenied);
}
@Override
public String toString() {
return "Action(" +
", resourcePattern='" + resourcePattern + '\'' +
", operation='" + operation + '\'' +
", resourceReferenceCount='" + resourceReferenceCount + '\'' +
", logIfAllowed='" + logIfAllowed + '\'' +
", logIfDenied='" + logIfDenied + '\'' +
')';
}
} |
Authorize method returns individual allowed/denied results for every action. ACL create and delete operations will return any exceptions from each access control entry requested.
Code Block |
---|
language | java |
---|
title | Authorizer Operation Results |
---|
|
package org.apache.kafka.server.authorizer;
public enum AuthorizationResult {
ALLOWED,
DENIED
}
|
ACL create operation returns any exception from each ACL binding requested.
Code Block |
---|
language | java |
---|
title | Authorizer Operation Results |
---|
|
package org.apache.kafka.server.authorizer;
import org.apache.kafka.common.annotation.InterfaceStability;
@InterfaceStability.Evolving
public class AclCreateResult {
private final Throwable exception;
public AclCreateResult() {
this(null);
}
public AclCreateResult(Throwable exception) {
this.exception = exception;
}
/**
* Returns any exception during create. If exception is null, the request has succeeded.
*/
public Throwable exception() {
return exception;
}
/**
* Returns true if the request failed.
*/
public boolean failed() {
return exception != null;
}
}
|
ACL delete operation returns any exception from each ACL filter requested. Matching ACL bindings for each filter are returned along with any delete failure.
Code Block |
---|
language | java |
---|
title | Delete Results |
---|
|
package org.apache.kafka.server.authorizer;
import java.util.Collections;
import java.util.Collection;
import org.apache.kafka.common.acl.AclBinding;
import org.apache.kafka.common.errors.ApiException;
public class AclDeleteResult {
private final ApiException exception;
private final Collection<DeletionResult> deletionResults;
public AclDeleteResult(ApiException exception) {
this(Collections.emptySet(), exception);
}
public AclDeleteResult(Collection<DeletionResult> deleteResults) {
this(deleteResults, null);
}
private AclDeleteResult(Collection<DeletionResult> deleteResults, ApiException exception) {
this.deletionResults = deleteResults;
this.exception = exception;
}
/**
* Returns any exception while attempting to match ACL filter to delete ACLs.
*/
public ApiException exception() {
return exception;
}
/**
* Returns delete result for each matching ACL binding.
*/
public Collection<DeletionResult> deletionResults() {
return deletionResults;
}
/**
* Delete result for each ACL binding that matched a delete filter.
*/
public static class DeletionResult {
private final AclBinding aclBinding;
private final ApiException exception;
public DeletionResult(AclBinding aclBinding) {
this(aclBinding, null);
}
public DeletionResult(AclBinding aclBinding, ApiException exception) {
this.aclBinding = aclBinding;
this.exception = exception;
}
/**
* Returns ACL binding that matched the delete filter. {@link #deleted()} indicates if
* the binding was deleted.
*/
public AclBinding aclBinding() {
return aclBinding;
}
/**
* Returns exception that resulted in failure to delete ACL binding.
*/
public ApiException exception() {
return exception;
}
/**
* Returns true if ACL binding was deleted, false otherwise.
*/
public boolean deleted() {
return exception == null;
}
}
} |
...