...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.authorizer; import java.net.InetAddress; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.security.auth.KafkaPrincipal; import org.apache.kafka.common.security.auth.SecurityProtocol; /** * Request context interface that provides data from request header as well as connection * and authentication information to plugins. */ @InterfaceStability.Evolving public interface AuthorizableRequestContext { /** * Returns name of listener on which request was received. */ String listenerlistenerName(); /** * Returns the security protocol for the listener on which request was received. */ SecurityProtocol securityProtocol(); /** * Returns authenticated principal for the connection on which request was received. */ KafkaPrincipal principal(); /** * Returns client IP address from which request was sent. */ InetAddress clientAddress(); /** * 16-bit API key of the request from the request header. See * https://kafka.apache.org/protocol#protocol_api_keys for request types. */ int requestType(); /** * Returns the request version from the request header. */ int requestVersion(); /** * Returns the client id from the request header. */ String clientId(); /** * Returns the correlation id from the request header. */ int correlationId(); } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.common; import java.util.Objects; import java.util.ObjectsOptional; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.security.auth.SecurityProtocol; /** * Represents a broker endpoint. */ @InterfaceStability.Evolving public class Endpoint { private final String listenerlistenerName; private final SecurityProtocol securityProtocol; private final String host; private final int port; public Endpoint(String listenerlistenerName, SecurityProtocol securityProtocol, String host, int port) { this.listenerlistenerName = listenerlistenerName; this.securityProtocol = securityProtocol; this.host = host; this.port = port; } /** * Returns the listener name of this endpoint. This is non-empty for endpoints provided * to broker plugins, but may be empty when used in clients. */ public StringOptional<String> listenerlistenerName() { return listenerOptional.ofNullable(listenerName); } /** * Returns the security protocol of this endpoint. */ public SecurityProtocol securityProtocol() { return securityProtocol; } /** * Returns advertised host name of this endpoint. */ public String host() { return host; } /** * Returns the port to which the listener is bound. */ public int port() { return port; } @Override public boolean equals(Object o) { if (this == o) { return true; } if (!(o instanceof Endpoint)) { return false; } Endpoint that = (Endpoint) o; return Objects.equals(this.listenerlistenerName, that.listenerlistenerName) && Objects.equals(this.securityProtocol, that.securityProtocol) && Objects.equals(this.host, that.host) && this.port == that.port; } @Override public int hashCode() { return Objects.hash(listenerlistenerName, securityProtocol, host, port); } @Override public String toString() { return "Endpoint(" + "listenerlistenerName='" + listenerlistenerName + '\'' + ", securityProtocol=" + securityProtocol + ", host='" + host + '\'' + ", port=" + port + ')'; } } |
...
Code Block | ||||
---|---|---|---|---|
| ||||
package org.apache.kafka.server.authorizer; import java.util.Collections; import java.util.Collection; import java.util.Optional; import org.apache.kafka.common.acl.AclBinding; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.errors.ApiException; @InterfaceStability.Evolving public class AclDeleteResult { private final ApiException exception; private final Collection<AclBindingDeleteResult> aclBindingDeleteResults; public AclDeleteResult(ApiException exception) { this(Collections.emptySet(), exception); } public AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults) { this(deleteResults, null); } private AclDeleteResult(Collection<AclBindingDeleteResult> deleteResults, ApiException exception) { this.aclBindingDeleteResults = deleteResults; this.exception = exception; } /** * Returns any exception while attempting to match ACL filter to delete ACLs. */ public Optional<ApiException> exception() { return exception == null ? Optional.empty() : Optional.of(exception); } /** * Returns delete result for each matching ACL binding. */ public Collection<AclBindingDeleteResult> aclBindingDeleteResults() { return aclBindingDeleteResults; } /** * Delete result for each ACL binding that matched a delete filter. */ public static class AclBindingDeleteResult { private final AclBinding aclBinding; private final ApiException exception; public AclBindingDeleteResult(AclBinding aclBinding) { this(aclBinding, null); } public AclBindingDeleteResult(AclBinding aclBinding, ApiException exception) { this.aclBinding = aclBinding; this.exception = exception; } /** * Returns ACL binding that matched the delete filter. {@link #deleted()} indicates if * the binding was deleted. */ public AclBinding aclBinding() { return aclBinding; } /** * Returns any exception that resulted in failure to delete ACL binding. */ public Optional<ApiException> exception() { return exception == null ? Optional.empty() : Optional.of(exception); } } } |
Proposed Changes
Asynchronous update requests
kafka.server.KafkaApis
will be updated to handle CreateAcls
and DeleteAcls
requests asynchronously using a purgatory. If Authorizer.createAcls
or Authorizer.deleteAcls
returns any ComplettionStage
that is not complete, the request will be added to a purgatory and completed when all the stages complete. Authorizer implementations with low latency updates may continue to update synchronously and return a completed future. These requests will be completed in-line and will not be added to the purgatory.
Asynchronous updates are useful for Authorizer implementations that use external stores for ACLs, for example a database. Async handling of update requests will enable Kafka brokers to handle database outages without blocking request threads. As many databases now support async APIs (https://dev.mysql.com/doc/x-devapi-userguide/en/synchronous-vs-asynchronous-execution.html, https://blogs.oracle.com/java/jdbc-next:-a-new-asynchronous-api-for-connecting-to-a-database), async update API enables authorizers to take advantage of these APIs.
Purgatory metrics will be added for ACL updates, consistent with metrics from other purgatories. Two new metrics will be added:
kafka.server:type=DelayedOperationPurgatory,name=NumDelayedOperations,delayedOperation=acl-update
kafka.server:type=DelayedOperationPurgatory,name=PurgatorySize,delayedOperation=acl-update
In addition to these metrics, existing request metrics for CreateAcls
and DeleteAcls
can be used to track the portion of time spent on async operations since local time is updated before the async wait and remote time is updated when async wait completes:
kafka.network:type=RequestMetrics,name=LocalTimeMs,request=CreateAcls
kafka.network:type=RequestMetrics,name=RemoteTimeMs,request=CreateAcls
kafka.network:type=RequestMetrics,name=LocalTimeMs,request=DeleteAcls
kafka.network:type=RequestMetrics,name=RemoteTimeMs,request=
DeleteAcls
Deprecate existing Scala Authorizer Trait
...